MB-22180: Ensure all pendingBGFetches are terminated on VBbucket delete 11/73811/20
authorDaniel Owen <owend@couchbase.com>
Mon, 19 Dec 2016 04:17:33 +0000 (04:17 +0000)
committerDave Rigby <daver@couchbase.com>
Wed, 1 Mar 2017 09:45:45 +0000 (09:45 +0000)
On a VBucket delete we need to ensure that all pendingBGFetches are
terminated and any connections waiting for a BGFetch to complete are
notified with ENGINE_NOT_MY_VBUCKET.

Although we previously deleted the pendingBGFetches in the vbucket
destructor we did not send any notifications to waiting connections.

This patch moves the deletion of pendingBGFetches into the
notifyAllPendingConnsFailed function and in addition notifies all the
connections awaiting a BGFetch with an ENGINE_NOT_MY_VBUCKET.

Change-Id: I13a99fe01153a4ba8786aaf608b25ed31ace5a0c
Reviewed-on: http://review.couchbase.org/73811
Well-Formed: Build Bot <build@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
src/ep.cc
src/tasks.cc
src/tasks.h
src/vbucket.cc
tests/module_tests/evp_store_single_threaded_test.cc
tests/module_tests/evp_store_test.cc

index 5541f35..fdcbe9f 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -47,6 +47,7 @@
 #include "warmup.h"
 #include "connmap.h"
 #include "replicationthrottle.h"
+#include "tasks.h"
 
 class StatsValueChangeListener : public ValueChangedListener {
 public:
@@ -215,33 +216,6 @@ private:
     bool residentRatioLessThanThreshold;
 };
 
-class VBucketMemoryDeletionTask : public GlobalTask {
-public:
-    VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
-                              RCPtr<VBucket> &vb, double delay) :
-                              GlobalTask(&eng,
-                              TaskId::VBucketMemoryDeletionTask, delay, true),
-                              e(eng), vbucket(vb), vbid(vb->getId()) { }
-
-    std::string getDescription() {
-        std::stringstream ss;
-        ss << "Removing (dead) vbucket " << vbid << " from memory";
-        return ss.str();
-    }
-
-    bool run(void) {
-        vbucket->notifyAllPendingConnsFailed(e);
-        vbucket->ht.clear();
-        vbucket.reset();
-        return false;
-    }
-
-private:
-    EventuallyPersistentEngine &e;
-    RCPtr<VBucket> vbucket;
-    uint16_t vbid;
-};
-
 class PendingOpsNotification : public GlobalTask {
 public:
     PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
index 7d6d0c5..1dd82ae 100644 (file)
@@ -20,8 +20,8 @@
 #include "ep_engine.h"
 #include "flusher.h"
 #include "tasks.h"
+#include "vbucket.h"
 #include "warmup.h"
-#include "ep_engine.h"
 
 #include <climits>
 
@@ -171,6 +171,22 @@ bool SingleBGFetcherTask::run() {
     return false;
 }
 
+VBucketMemoryDeletionTask::VBucketMemoryDeletionTask(
+        EventuallyPersistentEngine& eng, RCPtr<VBucket>& vb, double delay)
+    : GlobalTask(&eng, TaskId::VBucketMemoryDeletionTask, delay, true),
+      e(eng),
+      vbucket(vb) {
+    desc = "Removing (dead) vb:" + std::to_string(vbucket->getId()) +
+           " from memory";
+}
+
+bool VBucketMemoryDeletionTask::run() {
+    vbucket->notifyAllPendingConnsFailed(e);
+    vbucket->ht.clear();
+    vbucket.reset();
+    return false;
+}
+
 WorkLoadMonitor::WorkLoadMonitor(EventuallyPersistentEngine *e,
                                  bool completeBeforeShutdown) :
     GlobalTask(e, TaskId::WorkLoadMonitor, WORKLOAD_MONITOR_FREQ,
index 968e214..a528b9e 100644 (file)
@@ -58,6 +58,7 @@ class EventuallyPersistentEngine;
 class Flusher;
 class Warmup;
 class Taskable;
+class VBucket;
 
 class GlobalTask : public RCValue {
 friend class CompareByDueDate;
@@ -409,6 +410,29 @@ private:
     hrtime_t                   init;
 };
 
+/*
+ * This is a NONIO task called as part of VB deletion.  The task is responsible
+ * for clearing all the VBucket's pending operations and for clearing the
+ * VBucket's hash table.
+ */
+class VBucketMemoryDeletionTask : public GlobalTask {
+public:
+    VBucketMemoryDeletionTask(EventuallyPersistentEngine& eng,
+                              RCPtr<VBucket>& vb,
+                              double delay);
+
+    std::string getDescription() {
+        return desc;
+    }
+
+    bool run();
+
+private:
+    EventuallyPersistentEngine& e;
+    RCPtr<VBucket> vbucket;
+    std::string desc;
+};
+
 /**
  * A task that monitors if a bucket is read-heavy, write-heavy, or mixed.
  */
index 5056fff..a6c7ae4 100644 (file)
@@ -128,19 +128,6 @@ VBucket::~VBucket() {
 
     stats.decrDiskQueueSize(dirtyQueueSize.load());
 
-    size_t num_pending_fetches = 0;
-    vb_bgfetch_queue_t::iterator itr = pendingBGFetches.begin();
-    for (; itr != pendingBGFetches.end(); ++itr) {
-        vb_bgfetch_item_ctx_t &bg_itm_ctx = itr->second;
-        std::list<VBucketBGFetchItem *> &bgitems = bg_itm_ctx.bgfetched_list;
-        std::list<VBucketBGFetchItem *>::iterator vit = bgitems.begin();
-        for (; vit != bgitems.end(); ++vit) {
-            delete (*vit);
-            ++num_pending_fetches;
-        }
-    }
-    stats.numRemainingBgJobs.fetch_sub(num_pending_fetches);
-    pendingBGFetches.clear();
     delete failovers;
 
     // Clear out the bloomfilter(s)
@@ -399,6 +386,23 @@ void VBucket::notifyAllPendingConnsFailed(EventuallyPersistentEngine &e) {
     }
     lh.unlock();
 
+    // Add all the pendingBGFetches to the toNotify map
+    {
+        LockHolder lh(pendingBGFetchesLock);
+        size_t num_of_deleted_pending_fetches = 0;
+        for (auto& bgf : pendingBGFetches) {
+            vb_bgfetch_item_ctx_t& bg_itm_ctx = bgf.second;
+            for (auto& bgitem : bg_itm_ctx.bgfetched_list) {
+                toNotify[bgitem->cookie] = ENGINE_NOT_MY_VBUCKET;
+                e.storeEngineSpecific(bgitem->cookie, nullptr);
+                delete bgitem;
+                ++num_of_deleted_pending_fetches;
+            }
+        }
+        stats.numRemainingBgJobs.fetch_sub(num_of_deleted_pending_fetches);
+        pendingBGFetches.clear();
+    }
+
     std::map<const void*, ENGINE_ERROR_CODE>::iterator itr = toNotify.begin();
     for (; itr != toNotify.end(); ++itr) {
         e.notifyIOComplete(itr->first, itr->second);
index fe8034d..5a74523 100644 (file)
@@ -208,7 +208,7 @@ TEST_F(SingleThreadedEPStoreTest, MB19695_doTapVbTakeoverStats) {
     // disconnected).
     EXPECT_TRUE(store->resetVBucket(vbid));
 
-    runNextTask(lpNonioQ, "Removing (dead) vbucket 0 from memory");
+    runNextTask(lpNonioQ, "Removing (dead) vb:0 from memory");
     runNextTask(lpWriterQ, "Deleting VBucket:0");
 
     // [[2]] Ok, let's see if we can get TAP takeover stats. This will
index f11018b..7858ae1 100644 (file)
@@ -35,6 +35,7 @@
 #include "flusher.h"
 #include "../mock/mock_dcp_producer.h"
 #include "replicationthrottle.h"
+#include "tasks.h"
 
 #include "programs/engine_testapp/mock_server.h"
 
@@ -377,6 +378,39 @@ TEST_P(EPStoreEvictionTest, GetKeyStatsNMVB) {
                                   /*wantsDeleted*/false));
 }
 
+// Test to ensure all pendingBGfetches are deleted when the
+// VBucketMemoryDeletionTask is run
+TEST_P(EPStoreEvictionTest, MB_21976) {
+    // Store an item, then eject it.
+    std::string key("key");
+    auto item = store_item(vbid, key, "value");
+    flush_vbucket_to_disk(vbid);
+    const char* msg;
+    size_t msg_size{sizeof(msg)};
+    EXPECT_EQ(ENGINE_SUCCESS, store->evictKey(key, 0, &msg, &msg_size));
+
+    // Perform a get, which should EWOULDBLOCK
+    get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
+                                                       HONOR_STATES |
+                                                       TRACK_REFERENCE |
+                                                       DELETE_TEMP |
+                                                       HIDE_LOCKED_CAS |
+                                                       TRACK_STATISTICS);
+    GetValue gv = store->get(key, vbid, cookie, options);
+    EXPECT_EQ(ENGINE_EWOULDBLOCK,gv.getStatus());
+    // Mark the status of the cookie so that we can see if notify is called
+    struct mock_connstruct* c = (struct mock_connstruct *)cookie;
+    c->status = ENGINE_E2BIG;
+
+    // Manually run the VBucketMemoryDeletionTask task
+    RCPtr<VBucket> vb = store->getVBucket(vbid);
+    VBucketMemoryDeletionTask deletionTask(*engine, vb, /*delay*/0.0);
+    deletionTask.run();
+
+    // Check the status of the cookie to see if the cookie status has changed
+    // to ENGINE_NOT_MY_VBUCKET, which means the notify was sent
+    EXPECT_EQ(ENGINE_NOT_MY_VBUCKET, c->status);
+}
 
 // Test cases which run in both Full and Value eviction
 INSTANTIATE_TEST_CASE_P(FullAndValueEviction,