Merge remote-tracking branch 'couchbase/3.0.x' into sherlock 82/65582/1
authorDave Rigby <daver@couchbase.com>
Thu, 7 Jul 2016 13:59:04 +0000 (14:59 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 7 Jul 2016 13:59:04 +0000 (14:59 +0100)
* couchbase/3.0.x:
  MB-19982: Don't hold connsLock for duration of dcp stats
  MB-19982: Fix potential deadlock between DcpConsumer::bufMutex & connsLock
  MB-14859: Handle quick successive BG Fetch of a key interleaved with exp pager

Change-Id: Ie192ce93370c3218948434794b335732a6a7ff18

src/connmap.cc
src/connmap.h
src/ep.cc
src/ep_engine.cc
tests/ep_testsuite.cc

index b41591e..0555933 100644 (file)
@@ -1114,29 +1114,36 @@ void DcpConnMap::closeAllStreams_UNLOCKED() {
 }
 
 void DcpConnMap::disconnect(const void *cookie) {
-    LockHolder lh(connsLock);
-    disconnect_UNLOCKED(cookie);
-}
-
-void DcpConnMap::disconnect_UNLOCKED(const void *cookie) {
-    std::list<connection_t>::iterator iter;
-    for (iter = all.begin(); iter != all.end(); ++iter) {
-        if ((*iter)->getCookie() == cookie) {
-            (*iter)->setDisconnect(true);
-            all.erase(iter);
-            break;
+    // Move the connection matching this cookie from the `all` and map_
+    // data structures (under connsLock).
+    connection_t conn;
+    {
+        LockHolder lh(connsLock);
+        std::list<connection_t>::iterator iter;
+        for (iter = all.begin(); iter != all.end(); ++iter) {
+            if ((*iter)->getCookie() == cookie) {
+                (*iter)->setDisconnect(true);
+                all.erase(iter);
+                break;
+            }
         }
-    }
-
-    std::map<const void*, connection_t>::iterator itr(map_.find(cookie));
-    if (itr != map_.end()) {
-        connection_t conn = itr->second;
-        if (conn.get()) {
-            LOG(EXTENSION_LOG_INFO, "%s Removing connection",
-                conn->logHeader());
-            map_.erase(itr);
+        std::map<const void*, connection_t>::iterator itr(map_.find(cookie));
+        if (itr != map_.end()) {
+            conn = itr->second;
+            if (conn.get()) {
+                LOG(EXTENSION_LOG_INFO, "%s Removing connection",
+                    conn->logHeader());
+                map_.erase(itr);
+            }
         }
+    }
 
+    // Note we shutdown the stream *not* under the connsLock; this is
+    // because as part of closing a DcpConsumer stream we need to
+    // acquire PassiveStream::buffer.bufMutex; and that could deadlock
+    // in EventuallyPersistentStore::setVBucketState, via
+    // PassiveStream::processBufferedMessages.
+    if (conn) {
         DcpProducer* producer = dynamic_cast<DcpProducer*> (conn.get());
         if (producer) {
             producer->closeAllStreams();
@@ -1144,7 +1151,12 @@ void DcpConnMap::disconnect_UNLOCKED(const void *cookie) {
         } else {
             static_cast<DcpConsumer*>(conn.get())->closeAllStreams();
         }
+    }
 
+    // Finished disconnecting the stream; add it to the
+    // deadConnections list.
+    if (conn) {
+        LockHolder lh(connsLock);
         deadConnections.push_back(conn);
     }
 }
index f6ba910..6979037 100644 (file)
@@ -141,6 +141,21 @@ public:
     }
 
     /**
+     * Call a function on each connection, but without the connsLock held
+     * Read Copy Update... the all list is copied under connsLock and then
+     * the function is applied against the copy without the lock held.
+     */
+    template <typename Fun>
+    void eachRCU(Fun f) const {
+        std::list<connection_t> currentConnections;
+        {
+            LockHolder lh(connsLock);
+            currentConnections = all;
+        }
+        std::for_each(currentConnections.begin(), currentConnections.end(), f);
+    }
+
+    /**
      * Call a function on each connection *without* a lock.
      */
     template <typename Fun>
@@ -216,7 +231,7 @@ protected:
     // removing connections.
     // Actual modification of the underlying
     // ConnHandler objects is guarded by {releaseLock}.
-    Mutex                                    connsLock;
+    mutable Mutex                            connsLock;
 
     std::map<const void*, connection_t>      map_;
     std::list<connection_t>                  all;
@@ -508,10 +523,9 @@ private:
 
     bool isPassiveStreamConnected_UNLOCKED(uint16_t vbucket);
 
-    void disconnect_UNLOCKED(const void *cookie);
-
     void closeAllStreams_UNLOCKED();
 
+    // Guarded by the parent's classes `connsLock`
     std::list<connection_t> deadConnections;
 
     SpinLock numBackfillsLock;
index c9366b6..0ca9b4b 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -1647,7 +1647,7 @@ void EventuallyPersistentStore::completeBGFetch(const std::string &key,
         StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
         if (isMeta) {
             if ((v && v->unlocked_restoreMeta(gcb.val.getValue(),
-                                             gcb.val.getStatus(), vb->ht))
+                                              gcb.val.getStatus(), vb->ht))
                 || ENGINE_KEY_ENOENT == status) {
                 /* If ENGINE_KEY_ENOENT is the status from storage and the temp
                  key is removed from hash table by the time bgfetch returns
index 3ead7a4..146a64a 100644 (file)
@@ -4051,7 +4051,10 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doDcpStats(const void *cookie,
                                                          ADD_STAT add_stat) {
     ConnCounter aggregator;
     ConnStatBuilder dcpVisitor(cookie, add_stat, &aggregator);
-    dcpConnMap_->each(dcpVisitor);
+    // MB-19982 - use RCU version of each, as it will drop
+    // the connMap lock before invoking the visitor against
+    // a copy of the connections list.
+    dcpConnMap_->eachRCU(dcpVisitor);
 
     add_casted_stat("ep_dcp_count", aggregator.totalConns, add_stat, cookie);
     add_casted_stat("ep_dcp_producer_count", aggregator.totalProducers, add_stat, cookie);
index 31203ee..1dbd6ac 100644 (file)
@@ -14523,6 +14523,101 @@ static enum test_result test_mb19153(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+struct mb19982_ctx {
+    mb19982_ctx(ENGINE_HANDLE* h, ENGINE_HANDLE_V1* h1, int iterations) {
+        this->h = h;
+        this->h1 = h1;
+        this->iterations = iterations;
+    }
+    ENGINE_HANDLE* h;
+    ENGINE_HANDLE_V1* h1;
+    int iterations;
+};
+
+void mb19982_add_stat(const char *key, const uint16_t klen, const char *val,
+                      const uint32_t vlen, const void *cookie) {
+    // do nothing
+}
+
+extern "C" {
+    static void mb19982_thread_func(void *args) {
+        struct mb19982_ctx *ctx = static_cast<mb19982_ctx *>(args);
+        for (int ii = 0; ii < ctx->iterations; ii++) {
+            checkeq(ctx->h1->get_stats(ctx->h, NULL, "dcp", 3, &mb19982_add_stat),
+                    ENGINE_SUCCESS, "failed get_stats(dcp)");
+        }
+    }
+}
+
+/*
+ * This test creates a DCP consumer on a replica VB and then from a second thread
+ * fires get_stats("dcp") whilst the main thread changes VB state from
+ * replica->active->replica (and so on).
+ * MB-19982 idenified a lock inversion between these two functional paths and this
+ * test proves and protects the issue.
+ */
+static enum test_result test_mb19982(ENGINE_HANDLE *h,
+                                     ENGINE_HANDLE_V1 *h1) {
+
+    // Load up vb0 with num_items
+    int num_items = 1000;
+    int iterations = 1000; // how many stats calls
+
+    struct mb19982_ctx ctx(h, h1, iterations);
+    cb_thread_t cp_thread;
+    const void *cookie = testHarness.create_cookie();
+    uint32_t opaque = 0xFFFF0000;
+    uint32_t flags = 0;
+    std::string name = "unittest";
+    // Switch to replica
+    check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
+          "Failed to set vbucket state.");
+
+    // Open consumer connection
+    checkeq(h1->dcp.open(h, cookie, opaque, 0, flags,
+                         (void*)name.c_str(), name.length()),
+            ENGINE_SUCCESS,
+            "Failed dcp Consumer open connection.");
+
+    add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
+                            PROTOCOL_BINARY_RESPONSE_SUCCESS);
+
+    cb_assert(cb_create_thread(&cp_thread,
+                               mb19982_thread_func,
+                               &ctx, 0) == 0);
+
+    uint32_t stream_opaque = get_int_stat(h, h1,
+                                          "eq_dcpq:unittest:stream_0_opaque",
+                                          "dcp");
+
+    for (int i = 1; i <= num_items; i++) {
+        std::stringstream ss;
+        ss << "key-" << i;
+        checkeq(h1->dcp.snapshot_marker(h, cookie,
+                                        stream_opaque, 0/*vbid*/,
+                                        num_items, num_items + i, 2),
+                ENGINE_SUCCESS,
+                "Failed to send snapshot marker");
+        checkeq(h1->dcp.mutation(h, cookie, stream_opaque,
+                                 ss.str().c_str(), ss.str().length(),
+                                 "value", 5, i * 3, 0, 0, 0,
+                                 i + num_items, i + num_items,
+                                 0, 0, "", 0, INITIAL_NRU_VALUE),
+                ENGINE_SUCCESS,
+                "Failed to send dcp mutation");
+
+        // And flip VB state (this can have a lock inversion with stats)
+        checkeq(h1->dcp.set_vbucket_state(h, cookie, stream_opaque, 0, vbucket_state_active),
+                ENGINE_SUCCESS, "failed to change to active");
+        checkeq(h1->dcp.set_vbucket_state(h, cookie, stream_opaque, 0, vbucket_state_replica),
+                ENGINE_SUCCESS, "failed to change to replica");
+    }
+
+    cb_assert(cb_join_thread(cp_thread) == 0);
+    testHarness.destroy_cookie(cookie);
+    return SUCCESS;
+}
+
 static enum test_result prepare(engine_test_t *test) {
 #ifdef __sun
         // Some of the tests doesn't work on Solaris.. Don't know why yet..
@@ -15595,6 +15690,9 @@ engine_test_t* get_tests(void) {
                  test_mb18452_smallYield, test_setup, teardown, "max_num_nonio=1",
                  prepare, cleanup),
 
+        TestCase("test MB-19982", test_mb19982,
+                 test_setup, teardown, NULL, prepare, cleanup),
+
         TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
     };