MB-19982: Don't hold connsLock for duration of dcp stats 11/65211/12
authorJim Walker <jim@couchbase.com>
Fri, 24 Jun 2016 12:28:34 +0000 (12:28 +0000)
committerDave Rigby <daver@couchbase.com>
Tue, 28 Jun 2016 14:18:56 +0000 (14:18 +0000)
The Mb identified a lock inversion between dcp->set_vbucket_state
and get_stats("dcp")

The get_stats path uses doDcpStats which holds connsLock whilst
all connections are visited and their stats gathered. When getting
a PassiveStream's stats the buffer.mutex is needed.

The set_vbucket_state obtains the same locks in the reverse order.
Whilst buffer.mutex is held it will try to get connsLock
(via EventuallyPersistentStore::setVBucketState calling into dcpConnMap).

The fix is to work on a copy of the "all" list so that we can do the
work without the lock.

ref-counted pointers should stop any issues where the connection
being visited is freed/dropped from another thread.

Change-Id: Iff5f7be1d78278a4b00bb07b859697cca3115299
Reviewed-on: http://review.couchbase.org/65211
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/connmap.h
src/ep_engine.cc
tests/ep_testsuite.cc

index 83bc5d3..a4e0dc2 100644 (file)
@@ -141,6 +141,21 @@ public:
     }
 
     /**
+     * Call a function on each connection, but without the connsLock held
+     * Read Copy Update... the all list is copied under connsLock and then
+     * the function is applied against the copy without the lock held.
+     */
+    template <typename Fun>
+    void eachRCU(Fun f) const {
+        std::list<connection_t> currentConnections;
+        {
+            LockHolder lh(connsLock);
+            currentConnections = all;
+        }
+        std::for_each(currentConnections.begin(), currentConnections.end(), f);
+    }
+
+    /**
      * Call a function on each connection *without* a lock.
      */
     template <typename Fun>
@@ -216,7 +231,7 @@ protected:
     // removing connections.
     // Actual modification of the underlying
     // ConnHandler objects is guarded by {releaseLock}.
-    Mutex                                    connsLock;
+    mutable Mutex                            connsLock;
 
     std::map<const void*, connection_t>      map_;
     std::list<connection_t>                  all;
index 57780e2..a26236d 100644 (file)
@@ -3951,7 +3951,10 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doDcpStats(const void *cookie,
                                                          ADD_STAT add_stat) {
     ConnCounter aggregator;
     ConnStatBuilder dcpVisitor(cookie, add_stat, &aggregator);
-    dcpConnMap_->each(dcpVisitor);
+    // MB-19982 - use RCU version of each, as it will drop
+    // the connMap lock before invoking the visitor against
+    // a copy of the connections list.
+    dcpConnMap_->eachRCU(dcpVisitor);
 
     add_casted_stat("ep_dcp_count", aggregator.totalConns, add_stat, cookie);
     add_casted_stat("ep_dcp_producer_count", aggregator.totalProducers, add_stat, cookie);
index 865f35a..6e44285 100644 (file)
@@ -11926,6 +11926,101 @@ static enum test_result test_mb19635_upgrade_from_25x(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+struct mb19982_ctx {
+    mb19982_ctx(ENGINE_HANDLE* h, ENGINE_HANDLE_V1* h1, int iterations) {
+        this->h = h;
+        this->h1 = h1;
+        this->iterations = iterations;
+    }
+    ENGINE_HANDLE* h;
+    ENGINE_HANDLE_V1* h1;
+    int iterations;
+};
+
+void mb19982_add_stat(const char *key, const uint16_t klen, const char *val,
+                      const uint32_t vlen, const void *cookie) {
+    // do nothing
+}
+
+extern "C" {
+    static void mb19982_thread_func(void *args) {
+        struct mb19982_ctx *ctx = static_cast<mb19982_ctx *>(args);
+        for (int ii = 0; ii < ctx->iterations; ii++) {
+            checkeq(ctx->h1->get_stats(ctx->h, NULL, "dcp", 3, &mb19982_add_stat),
+                    ENGINE_SUCCESS, "failed get_stats(dcp)");
+        }
+    }
+}
+
+/*
+ * This test creates a DCP consumer on a replica VB and then from a second thread
+ * fires get_stats("dcp") whilst the main thread changes VB state from
+ * replica->active->replica (and so on).
+ * MB-19982 idenified a lock inversion between these two functional paths and this
+ * test proves and protects the issue.
+ */
+static enum test_result test_mb19982(ENGINE_HANDLE *h,
+                                     ENGINE_HANDLE_V1 *h1) {
+
+    // Load up vb0 with num_items
+    int num_items = 1000;
+    int iterations = 1000; // how many stats calls
+
+    struct mb19982_ctx ctx(h, h1, iterations);
+    cb_thread_t cp_thread;
+    const void *cookie = testHarness.create_cookie();
+    uint32_t opaque = 0xFFFF0000;
+    uint32_t flags = 0;
+    std::string name = "unittest";
+    // Switch to replica
+    check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
+          "Failed to set vbucket state.");
+
+    // Open consumer connection
+    checkeq(h1->dcp.open(h, cookie, opaque, 0, flags,
+                         (void*)name.c_str(), name.length()),
+            ENGINE_SUCCESS,
+            "Failed dcp Consumer open connection.");
+
+    add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
+                            PROTOCOL_BINARY_RESPONSE_SUCCESS);
+
+    cb_assert(cb_create_thread(&cp_thread,
+                               mb19982_thread_func,
+                               &ctx, 0) == 0);
+
+    uint32_t stream_opaque = get_int_stat(h, h1,
+                                          "eq_dcpq:unittest:stream_0_opaque",
+                                          "dcp");
+
+    for (int i = 1; i <= num_items; i++) {
+        std::stringstream ss;
+        ss << "key-" << i;
+        checkeq(h1->dcp.snapshot_marker(h, cookie,
+                                        stream_opaque, 0/*vbid*/,
+                                        num_items, num_items + i, 2),
+                ENGINE_SUCCESS,
+                "Failed to send snapshot marker");
+        checkeq(h1->dcp.mutation(h, cookie, stream_opaque,
+                                 ss.str().c_str(), ss.str().length(),
+                                 "value", 5, i * 3, 0, 0, 0,
+                                 i + num_items, i + num_items,
+                                 0, 0, "", 0, INITIAL_NRU_VALUE),
+                ENGINE_SUCCESS,
+                "Failed to send dcp mutation");
+
+        // And flip VB state (this can have a lock inversion with stats)
+        checkeq(h1->dcp.set_vbucket_state(h, cookie, stream_opaque, 0, vbucket_state_active),
+                ENGINE_SUCCESS, "failed to change to active");
+        checkeq(h1->dcp.set_vbucket_state(h, cookie, stream_opaque, 0, vbucket_state_replica),
+                ENGINE_SUCCESS, "failed to change to replica");
+    }
+
+    cb_assert(cb_join_thread(cp_thread) == 0);
+    testHarness.destroy_cookie(cookie);
+    return SUCCESS;
+}
+
 static enum test_result prepare(engine_test_t *test) {
 #ifdef __sun
         // Some of the tests doesn't work on Solaris.. Don't know why yet..
@@ -12877,6 +12972,9 @@ engine_test_t* get_tests(void) {
                  test_mb19635_upgrade_from_25x, test_setup, teardown, NULL,
                  prepare, cleanup),
 
+        TestCase("test MB-19982", test_mb19982,
+                 test_setup, teardown, NULL, prepare, cleanup),
+
         TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
     };