MB-16632: As part of queueDirty schedule a DCP connections notifier task 01/56301/28
authorabhinavdangeti <abhinav@couchbase.com>
Thu, 22 Oct 2015 21:54:28 +0000 (14:54 -0700)
committerChiyoung Seo <chiyoung@couchbase.com>
Wed, 2 Dec 2015 00:34:38 +0000 (00:34 +0000)
This is how things are done for TAP.
This pretty much removed the notifications' lock overhead on
store/delete/(front-end) OP latencies.

Change-Id: I32c3c26daf6ea8cebeecc2a81fb1f0e957ba3e3d
Reviewed-on: http://review.couchbase.org/56301
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
configuration.json
docs/stats.org
src/connmap.cc
src/connmap.h
src/ep_engine.cc
tests/ep_testsuite.cc

index 2a29881..b233cd4 100644 (file)
             "dynamic": false,
             "type": "size_t"
         },
+        "dcp_producer_notifier_yield_limit": {
+            "default": "50",
+            "descr": "The number of notifications before DcpProducerNotifier::run yields.",
+            "type": "size_t"
+        },
         "vb0": {
             "default": "false",
             "type": "bool"
index 2aa7f09..b76bd84 100644 (file)
@@ -307,6 +307,9 @@ For introductory information on stats within membase, start with the
 |                                    | during warmup                          |
 | ep_warmup_thread                   | The status of the warmup thread        |
 | ep_warmup_time                     | The amount of time warmup took         |
+| ep_dcp_pending_notifications       | Shows true/false if notifications are  |
+|                                    | pending for the DcpProducerNotifier    |
+|                                    | task.                                  |
 | ep_workload_pattern                | Workload pattern (mixed, read_heavy,   |
 |                                    | write_heavy) monitored at runtime      |
 
index 23a2467..bf7b8ae 100644 (file)
@@ -194,6 +194,9 @@ void ConnMap::initialize(conn_notifier_type ntype) {
     connNotifier_->start();
     ExTask connMgr = new ConnManager(&engine, this);
     ExecutorPool::get()->schedule(connMgr, NONIO_TASK_IDX);
+    if (ntype == DCP_CONN_NOTIFIER) {
+        dynamic_cast<DcpConnMap *>(this)->startProducerNotifier();
+    }
 }
 
 ConnMap::~ConnMap() {
@@ -926,10 +929,13 @@ void TAPSessionStats::clearStats(const std::string &name) {
 }
 
 DcpConnMap::DcpConnMap(EventuallyPersistentEngine &e)
-    : ConnMap(e) {
-
+  : ConnMap(e),
+    producerNotifier(NULL) {
 }
 
+DcpConnMap::~DcpConnMap() {
+    stopProducerNotifier();
+}
 
 DcpConsumer *DcpConnMap::newConsumer(const void* cookie,
                                      const std::string &name)
@@ -1172,13 +1178,98 @@ void DcpConnMap::removeVBConnections(connection_t &conn) {
 
 void DcpConnMap::notifyVBConnections(uint16_t vbid, uint64_t bySeqno)
 {
-    size_t lock_num = vbid % vbConnLockNum;
-    SpinLockHolder lh(&vbConnLocks[lock_num]);
+    addNotification(vbid, bySeqno);
+    wakeProducerNotifier();
+}
 
-    std::list<connection_t> &conns = vbConns[vbid];
-    std::list<connection_t>::iterator it = conns.begin();
-    for (; it != conns.end(); ++it) {
-        DcpProducer *conn = static_cast<DcpProducer*>((*it).get());
-        conn->notifySeqnoAvailable(vbid, bySeqno);
+void DcpConnMap::addNotification(uint16_t vbid, uint64_t bySeqno) {
+    LockHolder lh(notificationsLock);
+    std::deque<DcpProducerNotification>::iterator it = notifications.begin();
+    for (; it != notifications.end(); it++) {
+        if ((*it).vbid == vbid) {
+            (*it).seqno = bySeqno;
+            return;
+        }
     }
+
+    notifications.push_back({vbid, bySeqno});
 }
+
+bool DcpConnMap::getNextNotification(uint16_t& vbid, uint64_t& seqno) {
+    LockHolder lh(notificationsLock);
+    if (!notifications.empty()) {
+        vbid = notifications.front().vbid;
+        seqno = notifications.front().seqno;
+        notifications.pop_front();
+        return true;
+    }
+    return false;
+}
+
+bool DcpConnMap::notifyProducers() {
+    uint16_t vbid = 0;
+    uint64_t seqno = 0;
+
+    // NB: returns via reference parameters
+    if (getNextNotification(vbid, seqno)) {
+        size_t lock_num = vbid % vbConnLockNum;
+        SpinLockHolder lh(&vbConnLocks[lock_num]);
+        std::list<connection_t> &conns = vbConns[vbid];
+        if (!conns.empty()) {
+            std::list<connection_t>::iterator connection = conns.begin();
+            for (; connection != conns.end(); ++connection) {
+                DcpProducer *conn = static_cast<DcpProducer*>((*connection).get());
+                if (conn) {
+                    conn->notifySeqnoAvailable(vbid, seqno);
+                }
+            }
+        }
+        return true;
+    }
+    return false;
+}
+
+bool DcpConnMap::DcpProducerNotifier::run() {
+    if (engine->getEpStats().isShutdown) {
+        return false;
+    }
+
+    // Indidicate that we will snooze forever
+    snooze(INT_MAX);
+
+    // Clear the notification flag.
+    notified.store(false);
+
+    // Process the incoming notification(s)
+    size_t iterations = 0;
+
+    while (dcpConnMap.notifyProducers()
+           && iterations < iterationsBeforeYield) {
+        iterations++;
+    }
+
+    // check if another notify came in or if there's more todo.
+    bool expected = true;
+    if (notified.compare_exchange_strong(expected, false)
+        || dcpConnMap.notificationsPending()) {
+        // sleep for 0 - yielding this task
+        snooze(0.0);
+    }
+
+    return true;
+}
+
+void DcpConnMap::startProducerNotifier() {
+    producerNotifier = new DcpProducerNotifier(&engine, *this);
+    ExecutorPool::get()->schedule(producerNotifier, AUXIO_TASK_IDX);
+}
+
+void DcpConnMap::wakeProducerNotifier() {
+    if (producerNotifier->wakeMeUp()) {
+        ExecutorPool::get()->wake(producerNotifier->getId());
+    }
+}
+
+void DcpConnMap::stopProducerNotifier() {
+    ExecutorPool::get()->cancel(producerNotifier->getId());
+}
\ No newline at end of file
index d5f451b..46be290 100644 (file)
@@ -41,7 +41,7 @@ class TapProducer;
 class Item;
 class EventuallyPersistentEngine;
 
-typedef SingleThreadedRCPtr<ConnHandler> connection_t;
+typedef RCPtr<ConnHandler> connection_t;
 /**
  * Base class for operations performed on tap connections.
  *
@@ -429,13 +429,14 @@ private:
 
 };
 
-
 class DcpConnMap : public ConnMap {
 
 public:
 
     DcpConnMap(EventuallyPersistentEngine &engine);
 
+    ~DcpConnMap();
+
     /**
      * Find or build a dcp connection for the given cookie and with
      * the given name.
@@ -466,8 +467,55 @@ public:
 
     ENGINE_ERROR_CODE addPassiveStream(ConnHandler* conn, uint32_t opaque,
                                        uint16_t vbucket, uint32_t flags);
+
+    bool notifyProducers();
+    bool notificationsPending() {
+        LockHolder lh(notificationsLock);
+        return !notifications.empty();
+    }
+
+    void startProducerNotifier();
+    void wakeProducerNotifier();
+    void stopProducerNotifier();
+
 private:
 
+    class DcpProducerNotifier : public GlobalTask {
+    public:
+        DcpProducerNotifier(EventuallyPersistentEngine *e,
+                            DcpConnMap &dcm) :
+            GlobalTask(e, Priority::TapConnNotificationPriority, INT_MAX, true),
+            dcpConnMap(dcm),
+            notified(false),
+            iterationsBeforeYield(e->getConfiguration()
+                                  .getDcpProducerNotifierYieldLimit()) {}
+
+        std::string getDescription() {
+            std::string rv("Notifying DCP producers on store operations");
+            return rv;
+        }
+
+        bool run();
+
+        bool wakeMeUp() {
+            bool expected = false;
+            return notified.compare_exchange_strong(expected, true);
+        }
+
+    private:
+        DcpConnMap &dcpConnMap;
+        AtomicValue<bool> notified;
+        size_t iterationsBeforeYield;
+    };
+
+    struct DcpProducerNotification {
+        uint16_t vbid;
+        uint64_t seqno;
+    };
+
+    void addNotification(uint16_t vbid, uint64_t bySeqno);
+    bool getNextNotification(uint16_t& vbid, uint64_t& seqno);
+
     bool isPassiveStreamConnected_UNLOCKED(uint16_t vbucket);
 
     void disconnect_UNLOCKED(const void *cookie);
@@ -475,6 +523,11 @@ private:
     void closeAllStreams_UNLOCKED();
 
     std::list<connection_t> deadConnections;
+
+    DcpProducerNotifier* producerNotifier;
+
+    std::deque<DcpProducerNotification> notifications;
+    Mutex notificationsLock;
 };
 
 
index 255461b..d90d8cf 100644 (file)
@@ -3395,6 +3395,9 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
         add_casted_stat("ep_warmup_dups", epstats.warmDups, add_stat, cookie);
     }
 
+    add_casted_stat("ep_dcp_pending_notifications",
+                    dcpConnMap_->notificationsPending() ? "true" : "false",
+                    add_stat, cookie);
     add_casted_stat("ep_num_ops_get_meta", epstats.numOpsGetMeta,
                     add_stat, cookie);
     add_casted_stat("ep_num_ops_set_meta", epstats.numOpsSetMeta,
index e36fde1..c383506 100644 (file)
@@ -471,8 +471,18 @@ static enum test_result test_set(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     }
 
     wait_for_flusher_to_settle(h, h1);
-    checkeq(num_keys, get_int_stat(h, h1, "ep_total_persisted"),
-            "Expected ep_total_persisted equals 4");
+
+    std::stringstream error1, error2;
+    error1 << "Expected ep_total_persisted >= num_keys (" << num_keys << ")";
+    error2 << "Expected ep_total_persisted <= num_sets*num_keys ("
+           << num_sets*num_keys << ")";
+
+    // The flusher could of ran > 1 times. We can only assert
+    // that we persisted between num_keys and upto num_keys*num_sets
+    check(get_int_stat(h, h1, "ep_total_persisted") >= num_keys,
+          error1.str().c_str());
+    check(get_int_stat(h, h1, "ep_total_persisted") <= num_sets*num_keys,
+          error2.str().c_str());
     return SUCCESS;
 }
 
@@ -3215,6 +3225,7 @@ static enum test_result test_dcp_notifier(ENGINE_HANDLE *h,
         h1->release(h, NULL, i);
     }
 
+    wait_for_str_stat_to_be(h, h1, "ep_dcp_pending_notifications", "false", NULL);
     // Should get a stream end
     dcp_step(h, h1, cookie);
     check(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_STREAM_END,