Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / connmap.cc
index 09d8b35..0555933 100644 (file)
 #include "executorthread.h"
 #include "tapconnection.h"
 #include "connmap.h"
+#include "dcp-backfill-manager.h"
 #include "dcp-consumer.h"
 #include "dcp-producer.h"
 
 size_t ConnMap::vbConnLockNum = 32;
 const double ConnNotifier::DEFAULT_MIN_STIME = 1.0;
+const uint32_t DcpConnMap::dbFileMem = 10 * 1024;
+const uint16_t DcpConnMap::numBackfillsThreshold = 4096;
+const uint8_t DcpConnMap::numBackfillsMemThreshold = 1;
 
 /**
  * NonIO task to free the resource of a tap connection.
@@ -41,11 +45,8 @@ class ConnectionReaperCallback : public GlobalTask {
 public:
     ConnectionReaperCallback(EventuallyPersistentEngine &e, ConnMap& cm,
                              connection_t &conn)
-        : GlobalTask(&e, Priority::TapConnectionReaperPriority),
+        : GlobalTask(&e, TaskId::ConnectionReaperCallback),
           connMap(cm), connection(conn) {
-        std::stringstream ss;
-        ss << "Reaping tap or dcp connection: " << connection->getName();
-        descr = ss.str();
     }
 
     bool run(void) {
@@ -58,13 +59,12 @@ public:
     }
 
     std::string getDescription() {
-        return descr;
+        return "Reaping tap or dcp connection: " + connection->getName();
     }
 
 private:
     ConnMap &connMap;
     connection_t connection;
-    std::string descr;
 };
 
 /**
@@ -73,7 +73,7 @@ private:
 class ConnNotifierCallback : public GlobalTask {
 public:
     ConnNotifierCallback(EventuallyPersistentEngine *e, ConnNotifier *notifier)
-    : GlobalTask(e, Priority::TapConnNotificationPriority),
+    : GlobalTask(e, TaskId::ConnNotifierCallback),
       connNotifier(notifier) { }
 
     bool run(void) {
@@ -141,7 +141,7 @@ bool ConnNotifier::notifyConnections() {
 class ConnManager : public GlobalTask {
 public:
     ConnManager(EventuallyPersistentEngine *e, ConnMap *cmap)
-        : GlobalTask(e, Priority::TapConnMgrPriority, MIN_SLEEP_TIME, false),
+        : GlobalTask(e, TaskId::ConnManager, MIN_SLEEP_TIME, false),
           engine(e), connmap(cmap) { }
 
     bool run(void) {
@@ -877,7 +877,7 @@ void TapConnMap::removeTapCursors_UNLOCKED(TapProducer *tp) {
                 LOG(EXTENSION_LOG_INFO,
                     "%s Remove the TAP cursor from vbucket %d",
                     tp->logHeader(), vbid);
-                vb->checkpointManager.removeTAPCursor(tp->getName());
+                vb->checkpointManager.removeCursor(tp->getName());
             }
         }
     }
@@ -928,8 +928,18 @@ void TAPSessionStats::clearStats(const std::string &name) {
 }
 
 DcpConnMap::DcpConnMap(EventuallyPersistentEngine &e)
-    : ConnMap(e) {
-
+    : ConnMap(e),
+      aggrDcpConsumerBufferSize(0) {
+    numActiveSnoozingBackfills = 0;
+    updateMaxActiveSnoozingBackfills(engine.getEpStats().getMaxDataSize());
+
+    // Note: these allocations are deleted by ~Configuration
+    engine.getConfiguration().
+        addValueChangedListener("dcp_consumer_process_buffered_messages_yield_limit",
+                                new DcpConfigChangeListener(*this));
+    engine.getConfiguration().
+        addValueChangedListener("dcp_consumer_process_buffered_messages_batch_size",
+                                new DcpConfigChangeListener(*this));
 }
 
 DcpConsumer *DcpConnMap::newConsumer(const void* cookie,
@@ -991,6 +1001,45 @@ ENGINE_ERROR_CODE DcpConnMap::addPassiveStream(ConnHandler* conn,
     return conn->addStream(opaque, vbucket, flags);
 }
 
+
+DcpConnMap::DcpConfigChangeListener::DcpConfigChangeListener(DcpConnMap& connMap)
+    : myConnMap(connMap){}
+
+void DcpConnMap::DcpConfigChangeListener::sizeValueChanged(const std::string &key,
+                                                           size_t value) {
+    if (key == "dcp_consumer_process_buffered_messages_yield_limit") {
+        myConnMap.consumerYieldConfigChanged(value);
+    } else if (key == "dcp_consumer_process_buffered_messages_batch_size") {
+        myConnMap.consumerBatchSizeConfigChanged(value);
+    }
+}
+
+/*
+ * Find all DcpConsumers and set the yield threshold
+ */
+void DcpConnMap::consumerYieldConfigChanged(size_t newValue) {
+    LockHolder lh(connsLock);
+    for (auto it : all) {
+        DcpConsumer* dcpConsumer = dynamic_cast<DcpConsumer*>(it.get());
+        if (dcpConsumer) {
+            dcpConsumer->setProcessorYieldThreshold(newValue);
+        }
+    }
+}
+
+/*
+ * Find all DcpConsumers and set the processor batchsize
+ */
+void DcpConnMap::consumerBatchSizeConfigChanged(size_t newValue) {
+    LockHolder lh(connsLock);
+    for (auto it : all) {
+        DcpConsumer* dcpConsumer = dynamic_cast<DcpConsumer*>(it.get());
+        if (dcpConsumer) {
+            dcpConsumer->setProcessBufferedMessagesBatchSize(newValue);
+        }
+    }
+}
+
 DcpProducer *DcpConnMap::newProducer(const void* cookie,
                                      const std::string &name,
                                      bool notifyOnly)
@@ -1057,7 +1106,7 @@ void DcpConnMap::closeAllStreams_UNLOCKED() {
         DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
         if (producer) {
             producer->closeAllStreams();
-            producer->cancelCheckpointProcessorTask();
+            producer->clearCheckpointProcessorTaskQueues();
         } else {
             static_cast<DcpConsumer*>(itr->second.get())->closeAllStreams();
         }
@@ -1098,7 +1147,7 @@ void DcpConnMap::disconnect(const void *cookie) {
         DcpProducer* producer = dynamic_cast<DcpProducer*> (conn.get());
         if (producer) {
             producer->closeAllStreams();
-            producer->cancelCheckpointProcessorTask();
+            producer->clearCheckpointProcessorTaskQueues();
         } else {
             static_cast<DcpConsumer*>(conn.get())->closeAllStreams();
         }
@@ -1185,8 +1234,7 @@ void DcpConnMap::removeVBConnections(connection_t &conn) {
     }
 }
 
-void DcpConnMap::notifyVBConnections(uint16_t vbid, uint64_t bySeqno)
-{
+void DcpConnMap::notifyVBConnections(uint16_t vbid, uint64_t bySeqno) {
     size_t lock_num = vbid % vbConnLockNum;
     SpinLockHolder lh(&vbConnLocks[lock_num]);
 
@@ -1198,8 +1246,47 @@ void DcpConnMap::notifyVBConnections(uint16_t vbid, uint64_t bySeqno)
     }
 }
 
-void DcpConnMap::addStats(ADD_STAT add_stat, const void *c) {
+void DcpConnMap::notifyBackfillManagerTasks() {
     LockHolder lh(connsLock);
-    add_casted_stat("ep_dcp_dead_conn_count", deadConnections.size(),
-                    add_stat, c);
+    std::map<const void*, connection_t>::iterator itr = map_.begin();
+    for (; itr != map_.end(); ++itr) {
+        DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
+        if (producer) {
+            producer->getBackfillManager()->wakeUpTask();
+        }
+    }
+}
+
+bool DcpConnMap::canAddBackfillToActiveQ()
+{
+    SpinLockHolder lh(&numBackfillsLock);
+    if (numActiveSnoozingBackfills < maxActiveSnoozingBackfills) {
+        ++numActiveSnoozingBackfills;
+        return true;
+    }
+    return false;
+}
+
+void DcpConnMap::decrNumActiveSnoozingBackfills()
+{
+    SpinLockHolder lh(&numBackfillsLock);
+    if (numActiveSnoozingBackfills > 0) {
+        --numActiveSnoozingBackfills;
+    } else {
+        LOG(EXTENSION_LOG_WARNING, "ActiveSnoozingBackfills already zero!!!");
+    }
+}
+
+void DcpConnMap::updateMaxActiveSnoozingBackfills(size_t maxDataSize)
+{
+    double numBackfillsMemThresholdPercent =
+                         static_cast<double>(numBackfillsMemThreshold)/100;
+    size_t max = maxDataSize * numBackfillsMemThresholdPercent / dbFileMem;
+    /* We must have atleast one active/snoozing backfill */
+    SpinLockHolder lh(&numBackfillsLock);
+    maxActiveSnoozingBackfills =
+        std::max(static_cast<size_t>(1),
+                 std::min(max, static_cast<size_t>(numBackfillsThreshold)));
+    LOG(EXTENSION_LOG_DEBUG, "Max active snoozing backfills set to %d",
+        maxActiveSnoozingBackfills);
 }