MB-19153: Break circular dependency while deleting bucket 99/62699/4
authorabhinavdangeti <abhinav@couchbase.com>
Tue, 12 Apr 2016 01:28:37 +0000 (18:28 -0700)
committerChiyoung Seo <chiyoung@couchbase.com>
Tue, 12 Apr 2016 18:34:28 +0000 (18:34 +0000)
As part of unregistering the last bucket, when stopTaskGroup
is invoked, all the running threads will cancelled. In this
issue reported, when DcpBackfill was closed, the ref count of
the DcpProducer whose reference it was holding on to became
zero, causing its destructor to be invoked. In the DcpProducer's
destructor, an attempt was made to cancel the checkpoint creator
task which needed to acquire the executorpool's tMutex that
unregisterBucket had already acquired.

Reproduction steps:
<delete_bucket> --> <unregister_bucket> --> <stop_task_group>
    --> <acquire tMutex> --> .. --> <cancel DcpBackfill> -->
    --> <destroy DcpBackfill> --> <destroy DcpProducer>
    --> <cancel Checkpoint creator task> --> [tries to acquire tMutex]

The fix here would be to not attempt to kill the task within
the DcpProducer's destructor, but to do so when the producer is
being disconnected.

+ Unit test case that reproduces the hang.

Change-Id: Ia3c0597e3d8f85a1b40ef56e251e38339023b471
Reviewed-on: http://review.couchbase.org/62699
Well-Formed: buildbot <build@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
src/connmap.cc
src/connmap.h
src/dcp-producer.cc
src/dcp-producer.h
src/ep_engine.cc
tests/ep_testsuite.cc
tests/module_tests/stream_test.cc

index 72e8583..7eb96e0 100644 (file)
@@ -1055,7 +1055,7 @@ void DcpConnMap::closeAllStreams_UNLOCKED() {
         DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
         if (producer) {
             producer->closeAllStreams();
-            producer->clearCheckpointProcessorTaskQueues();
+            producer->cancelCheckpointProcessorTask();
         } else {
             static_cast<DcpConsumer*>(itr->second.get())->closeAllStreams();
         }
@@ -1089,7 +1089,7 @@ void DcpConnMap::disconnect_UNLOCKED(const void *cookie) {
         DcpProducer* producer = dynamic_cast<DcpProducer*> (conn.get());
         if (producer) {
             producer->closeAllStreams();
-            producer->clearCheckpointProcessorTaskQueues();
+            producer->cancelCheckpointProcessorTask();
         } else {
             static_cast<DcpConsumer*>(conn.get())->closeAllStreams();
         }
@@ -1183,3 +1183,9 @@ void DcpConnMap::notifyVBConnections(uint16_t vbid, uint64_t bySeqno)
         conn->notifySeqnoAvailable(vbid, bySeqno);
     }
 }
+
+void DcpConnMap::addStats(ADD_STAT add_stat, const void *c) {
+    LockHolder lh(connsLock);
+    add_casted_stat("ep_dcp_dead_conn_count", deadConnections.size(),
+                    add_stat, c);
+}
index 749cb4d..5f9cde9 100644 (file)
@@ -467,6 +467,8 @@ public:
     ENGINE_ERROR_CODE addPassiveStream(ConnHandler* conn, uint32_t opaque,
                                        uint16_t vbucket, uint32_t flags);
 
+    void addStats(ADD_STAT add_stat, const void *c);
+
 private:
 
     bool isPassiveStreamConnected_UNLOCKED(uint16_t vbucket);
index 8c68354..71fa895 100644 (file)
@@ -151,10 +151,6 @@ DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
     ExecutorPool::get()->schedule(checkpointCreatorTask, AUXIO_TASK_IDX);
 }
 
-DcpProducer::~DcpProducer() {
-    ExecutorPool::get()->cancel(checkpointCreatorTask->getId());
-}
-
 ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
                                              uint32_t opaque,
                                              uint16_t vbucket,
@@ -819,7 +815,8 @@ void DcpProducer::scheduleCheckpointProcessorTask(stream_t s) {
         ->schedule(s);
 }
 
-void DcpProducer::clearCheckpointProcessorTaskQueues() {
+void DcpProducer::cancelCheckpointProcessorTask() {
     static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
         ->clearQueues();
+    ExecutorPool::get()->cancel(checkpointCreatorTask->getId());
 }
index de34b8b..4d7be1a 100644 (file)
@@ -31,8 +31,6 @@ public:
     DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
                 const std::string &n, bool notifyOnly);
 
-    ~DcpProducer();
-
     ENGINE_ERROR_CODE streamRequest(uint32_t flags, uint32_t opaque,
                                     uint16_t vbucket, uint64_t start_seqno,
                                     uint64_t end_seqno, uint64_t vbucket_uuid,
@@ -189,9 +187,10 @@ public:
     void scheduleCheckpointProcessorTask(stream_t s);
 
     /*
-        Clears active stream checkpoint processor task's queue.
+        Clears active stream checkpoint processor task's queue,
+        and cancels the task.
     */
-    void clearCheckpointProcessorTaskQueues();
+    void cancelCheckpointProcessorTask();
 
 private:
 
index 6406305..9c2e836 100644 (file)
@@ -3962,6 +3962,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doDcpStats(const void *cookie,
     add_casted_stat("ep_dcp_queue_backfillremaining",
                     aggregator.conn_queueBackfillRemaining, add_stat, cookie);
 
+    dcpConnMap_->addStats(add_stat, cookie);
     return ENGINE_SUCCESS;
 }
 
index d0e303e..fc3ccc9 100644 (file)
@@ -11727,6 +11727,71 @@ static enum test_result test_mb16357(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+/**
+ * This test demonstrates bucket shutdown when there is a rogue
+ * backfill (whose producer and stream are already closed).
+ */
+static enum test_result test_mb19153(ENGINE_HANDLE *h,
+                                     ENGINE_HANDLE_V1 *h1) {
+
+    putenv(strdup("ALLOW_NO_STATS_UPDATE=yeah"));
+
+    // Set max num AUX IO to 0, so no backfill would start
+    // immediately
+    set_param(h, h1, protocol_binary_engine_param_flush,
+              "max_num_auxio", "0");
+
+    int num_items = 10000;
+
+    for (int j = 0; j < num_items; ++j) {
+        item *i = NULL;
+        std::stringstream ss;
+        ss << "key-" << j;
+        check(store(h, h1, NULL, OPERATION_SET,
+                    ss.str().c_str(), "data", &i, 0, 0, 0, 0)
+                    == ENGINE_SUCCESS, "Failed to store a value");
+
+        h1->release(h, NULL, i);
+    }
+
+    const void *cookie = testHarness.create_cookie();
+    uint32_t flags = DCP_OPEN_PRODUCER;
+    const char *name = "unittest";
+
+    uint32_t opaque = 1;
+    uint64_t start = 0;
+    uint64_t end = num_items;
+
+    // Setup a producer connection
+    checkeq(ENGINE_SUCCESS,
+            h1->dcp.open(h, cookie, ++opaque, 0, flags,
+                         (void*)name, strlen(name)),
+            "Failed dcp Consumer open connection.");
+
+    // Initiate a stream request
+    uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    uint64_t rollback = 0;
+    checkeq(ENGINE_SUCCESS,
+            h1->dcp.stream_req(h, cookie, 0, opaque, 0, start, end,
+                               vb_uuid, 0, 0,
+                               &rollback, mock_dcp_add_failover_log),
+            "Expected success");
+
+    // Disconnect the producer
+    testHarness.destroy_cookie(cookie);
+
+    // Wait for ConnManager to clear out dead connections from dcpConnMap
+    wait_for_stat_to_be(h, h1, "ep_dcp_dead_conn_count", 0, "dcp");
+
+    // Set auxIO threads to 1, so the backfill for the closed producer
+    // is picked up, and begins to run.
+    set_param(h, h1, protocol_binary_engine_param_flush,
+              "max_num_auxio", "1");
+
+    // Terminate engine
+    return SUCCESS;
+}
+
 static enum test_result prepare(engine_test_t *test) {
 #ifdef __sun
         // Some of the tests doesn't work on Solaris.. Don't know why yet..
@@ -12664,6 +12729,9 @@ engine_test_t* get_tests(void) {
         TestCase("test MB-16357", test_mb16357,
                  test_setup, teardown, "compaction_exp_mem_threshold=85",
                  prepare, cleanup),
+        TestCase("test MB-19153", test_mb19153,
+                 test_setup, teardown, NULL, prepare, cleanup),
+
         TestCase("test dcp early termination", test_dcp_early_termination,
                  test_setup, teardown, NULL, prepare, cleanup),
         TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
index 172d32b..57ead25 100644 (file)
@@ -139,7 +139,7 @@ static void test_mb17766(const std::string& test_dbname) {
               mock_stream->public_nextCheckpointItem(),
               "nextCheckpointItem() after processing items should be false.");
 
-    producer->clearCheckpointProcessorTaskQueues();
+    producer->cancelCheckpointProcessorTask();
 }
 
 int main(int argc, char **argv) {