MB-23767: Notify vb high priority requests in a separate task 59/76659/12
authorManu Dhundi <manu@couchbase.com>
Thu, 13 Apr 2017 16:31:19 +0000 (09:31 -0700)
committerManu Dhundi <manu@couchbase.com>
Thu, 13 Apr 2017 17:34:39 +0000 (17:34 +0000)
Vbucket high priority requests are made by ns server during rebalance
wherein it expects a notification when the vbucket receives upto a
particular seqno.

We must send this notification in a separate task rather than in
front end thread to avoid deadlocks due to lock inversion.
That is, a front end thread generally makes call down the stack and
acquires locks in the order top to down (in our software stack).
But a notification path is up the software stack and hence must be
done as a separate task.

This commit creates a daemon task per Ephemeral Bucket to notify
vb high priority reuqests. When the task runs it notifies all the
connections to be notified as goes to sleep. When a new connection
is to be notified the task is woken again.

The current deadlock that was hit is captured in MB-23767.

Change-Id: Id114bedb5cd4de8b493ae6885734c3d440c8bea3
Reviewed-on: http://review.couchbase.org/76659
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
src/ep_vb.cc
src/ephemeral_bucket.cc
src/ephemeral_bucket.h
src/ephemeral_vb.cc
src/tasks.def.h
src/vbucket.cc
src/vbucket.h

index 8daa1cc..5de7507 100644 (file)
@@ -205,7 +205,7 @@ HighPriorityVBReqStatus EPVBucket::checkAddHighPriorityVBEntry(
 void EPVBucket::notifyHighPriorityRequests(EventuallyPersistentEngine& engine,
                                            uint64_t idNum,
                                            HighPriorityVBNotify notifyType) {
-    auto toNotify = getHighPriorityNotifies(engine, idNum, notifyType);
+    auto toNotify = getHighPriorityNotifications(engine, idNum, notifyType);
 
     if (shard) {
         shard->highPriorityCount.fetch_sub(toNotify.size());
index fa0a6d6..54ba4e6 100644 (file)
@@ -89,7 +89,8 @@ private:
 };
 
 EphemeralBucket::EphemeralBucket(EventuallyPersistentEngine& theEngine)
-    : KVBucket(theEngine) {
+    : KVBucket(theEngine),
+      notifyHpReqTask(make_STRCPtr<NotifyHighPriorityReqTask>(theEngine)) {
     /* We always have VALUE_ONLY eviction policy because a key not
        present in HashTable implies key not present at all.
        Note: This should not be confused with the eviction algorithm
@@ -97,6 +98,10 @@ EphemeralBucket::EphemeralBucket(EventuallyPersistentEngine& theEngine)
     eviction_policy = VALUE_ONLY;
 }
 
+EphemeralBucket::~EphemeralBucket() {
+    ExecutorPool::get()->cancel(notifyHpReqTask->getId());
+}
+
 bool EphemeralBucket::initialize() {
     KVBucket::initialize();
     auto& config = engine.getConfiguration();
@@ -123,6 +128,9 @@ bool EphemeralBucket::initialize() {
     config.addValueChangedListener("ephemeral_metadata_purge_interval",
                                    new EphemeralValueChangedListener(*this));
 
+    // High priority vbucket request notification task
+    ExecutorPool::get()->schedule(notifyHpReqTask);
+
     return true;
 }
 
@@ -227,8 +235,13 @@ void EphemeralBucket::notifyNewSeqno(const uint16_t vbid,
     /* In ephemeral buckets we must notify high priority requests as well.
        We do not wait for persistence to notify high priority requests */
     VBucketPtr vb = getVBucket(vbid);
-    vb->notifyHighPriorityRequests(
+
+    auto toNotify = vb->getHighPriorityNotifications(
             engine, notifyCtx.bySeqno, HighPriorityVBNotify::Seqno);
+
+    if (toNotify.size() && notifyHpReqTask) {
+        notifyHpReqTask->wakeup(std::move(toNotify));
+    }
 }
 
 // Protected methods //////////////////////////////////////////////////////////
@@ -298,3 +311,68 @@ void EphemeralBucket::appendAggregatedVBucketStats(VBucketCountVisitor& active,
             ephPending.seqlistStaleMetadataBytes);
 #undef DO_STAT
 }
+
+EphemeralBucket::NotifyHighPriorityReqTask::NotifyHighPriorityReqTask(
+        EventuallyPersistentEngine& e)
+    : GlobalTask(&e,
+                 TaskId::NotifyHighPriorityReqTask,
+                 std::numeric_limits<int>::max(),
+                 false) {
+}
+
+bool EphemeralBucket::NotifyHighPriorityReqTask::run() {
+    std::map<const void*, ENGINE_ERROR_CODE> notifyQ;
+    {
+        /* It is necessary that the toNotifyLock is not held while
+           actually notifying. */
+        std::lock_guard<std::mutex> lg(toNotifyLock);
+        notifyQ = std::move(toNotify);
+    }
+
+    for (auto& notify : notifyQ) {
+        LOG(EXTENSION_LOG_NOTICE,
+            "%s for cookie :%p and status %d",
+            to_string(getDescription()).c_str(),
+            notify.first,
+            notify.second);
+        engine->notifyIOComplete(notify.first, notify.second);
+    }
+
+    /* Lets assume that the task will be explicitly woken */
+    snooze(std::numeric_limits<int>::max());
+
+    /* But, also check if another thread already tried to wake up the task */
+    bool scheduleSoon = false;
+    {
+        std::lock_guard<std::mutex> lg(toNotifyLock);
+        if (toNotify.size()) {
+            scheduleSoon = true;
+        }
+    }
+
+    if (scheduleSoon) {
+        /* Good to call snooze without holding toNotifyLock */
+        snooze(0);
+    }
+
+    /* Run the task again after snoozing */
+    return true;
+}
+
+cb::const_char_buffer
+EphemeralBucket::NotifyHighPriorityReqTask::getDescription() {
+    return "Ephemeral: Notify HighPriority Request";
+}
+
+void EphemeralBucket::NotifyHighPriorityReqTask::wakeup(
+        std::map<const void*, ENGINE_ERROR_CODE> notifies) {
+    {
+        /* Add the connections to be notified */
+        std::lock_guard<std::mutex> lg(toNotifyLock);
+        toNotify.insert(make_move_iterator(begin(notifies)),
+                        make_move_iterator(end(notifies)));
+    }
+
+    /* wake up the task */
+    ExecutorPool::get()->wake(getId());
+}
index 221d41c..56fd5c0 100644 (file)
@@ -32,6 +32,8 @@ class EphemeralBucket : public KVBucket {
 public:
     EphemeralBucket(EventuallyPersistentEngine& theEngine);
 
+    ~EphemeralBucket();
+
     bool initialize() override;
 
     /// Eviction not supported for Ephemeral buckets - without some backing
@@ -130,4 +132,36 @@ protected:
 
     /// Task responsible for purging in-memory tombstones.
     ExTask tombstonePurgerTask;
+
+private:
+    /**
+     * Task responsible for notifying high priority requests (usually during
+     * rebalance)
+     */
+    class NotifyHighPriorityReqTask : public GlobalTask {
+    public:
+        NotifyHighPriorityReqTask(EventuallyPersistentEngine& e);
+
+        bool run() override;
+
+        cb::const_char_buffer getDescription() override;
+
+        /**
+         * Adds the connections to be notified by the task and then wakes up
+         * the task.
+         *
+         * @param notifies Map of connections to be notified
+         */
+        void wakeup(std::map<const void*, ENGINE_ERROR_CODE> notifies);
+
+    private:
+        /* All the notifications to be called by the task */
+        std::map<const void*, ENGINE_ERROR_CODE> toNotify;
+
+        /* Serialize access to write/read of toNotify */
+        std::mutex toNotifyLock;
+    };
+
+    // Private member variables ///////////////////////////////////////////////
+    SingleThreadedRCPtr<NotifyHighPriorityReqTask> notifyHpReqTask;
 };
index e2f570f..6f225c5 100644 (file)
@@ -210,11 +210,10 @@ void EphemeralVBucket::notifyHighPriorityRequests(
         EventuallyPersistentEngine& engine,
         uint64_t idNum,
         HighPriorityVBNotify notifyType) {
-    auto toNotify = getHighPriorityNotifies(engine, idNum, notifyType);
-
-    for (auto& notify : toNotify) {
-        engine.notifyIOComplete(notify.first, notify.second);
-    }
+    throw std::logic_error(
+            "EphemeralVBucket::notifyHighPriorityRequests() is not valid. "
+            "Called on vb " +
+            std::to_string(getId()));
 }
 
 void EphemeralVBucket::notifyAllPendingConnsFailed(
index 0ec1e2f..747a8c0 100644 (file)
@@ -60,6 +60,7 @@ TASK(StatSnap, WRITER_TASK_IDX, 9)
 
 // Non-IO tasks
 TASK(PendingOpsNotification, NONIO_TASK_IDX, 0)
+TASK(NotifyHighPriorityReqTask, NONIO_TASK_IDX, 0)
 TASK(Processor, NONIO_TASK_IDX, 0)
 TASK(FlushAllTask, NONIO_TASK_IDX, 3)
 TASK(ConnNotifierCallback, NONIO_TASK_IDX, 5)
index 238f60c..874d3a4 100644 (file)
@@ -2289,7 +2289,7 @@ void VBucket::addHighPriorityVBEntry(uint64_t seqnoOrChkId,
         cookie);
 }
 
-std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifies(
+std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifications(
         EventuallyPersistentEngine& engine,
         uint64_t idNum,
         HighPriorityVBNotify notifyType) {
index 1e66784..ec201f2 100644 (file)
@@ -488,6 +488,21 @@ public:
 
     virtual void notifyAllPendingConnsFailed(EventuallyPersistentEngine& e) = 0;
 
+    /**
+     * Get high priority notifications for a seqno or checkpoint persisted
+     *
+     * @param engine Ref to ep-engine
+     * @param id seqno or checkpoint id for which notifies are to be found
+     * @param notifyType indicating notify for seqno or chk persistence
+     *
+     * @return map of notifications with conn cookie as the key and notify
+     *         status as the value
+     */
+    std::map<const void*, ENGINE_ERROR_CODE> getHighPriorityNotifications(
+            EventuallyPersistentEngine& engine,
+            uint64_t idNum,
+            HighPriorityVBNotify notifyType);
+
     size_t getHighPriorityChkSize() {
         return numHpVBReqs.load();
     }
@@ -1306,21 +1321,6 @@ protected:
                                 HighPriorityVBNotify reqType);
 
     /**
-     * Get high priority notifications for a seqno or checkpoint persisted
-     *
-     * @param engine Ref to ep-engine
-     * @param id seqno or checkpoint id for which notifies are to be found
-     * @param notifyType indicating notify for seqno or chk persistence
-     *
-     * @return map of notifies with conn cookie as the key and notify status as
-     *         the value
-     */
-    std::map<const void*, ENGINE_ERROR_CODE> getHighPriorityNotifies(
-            EventuallyPersistentEngine& engine,
-            uint64_t idNum,
-            HighPriorityVBNotify notifyType);
-
-    /**
      * Get all high priority notifications as temporary failures because they
      * could not be completed.
      *