[BP] MB-16915: Use refcounted pointers on producer/consumer 47/57447/2
authorJim Walker <jim@couchbase.com>
Mon, 30 Nov 2015 13:31:59 +0000 (13:31 +0000)
committerChiyoung Seo <chiyoung@couchbase.com>
Thu, 3 Dec 2015 23:35:24 +0000 (23:35 +0000)
Prevents a race/crash occuring when the DcpProducer is destroyed
and there are backfill tasks running/pending.

The test case reveals the probem when run under valgrind as
a series of invalid reads of freed memory. E.g.

==40673== Thread 17:
==40673== Invalid read of size 8
==40673==    at 0x71A3CEE: DCPBackfill::run() (dcp-stream.cc:175)
==40673==    by 0x717215C: ExecutorThread::run() (executorthread.cc:110)
==40673==    by 0x7172868: launch_executor_thread (executorthread.cc:34)
==40673==    by 0x503EC67: platform_thread_wrap (cb_pthreads.c:24)
==40673==    by 0x524A181: start_thread (pthread_create.c:312)
==40673==    by 0x555A47C: clone (clone.S:111)
==40673==  Address 0x64c2380 is 48 bytes inside a block of size 384 free'd
==40673==    at 0x4C2C2BC: operator delete(void*) (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
==40673==    by 0x718C4ED: DcpConnMap::manageConnections() (atomic.h:430)
==40673==    by 0x71906A5: ConnManager::run() (connmap.cc:151)
==40673==    by 0x717215C: ExecutorThread::run() (executorthread.cc:110)
==40673==    by 0x7172868: launch_executor_thread (executorthread.cc:34)
==40673==    by 0x503EC67: platform_thread_wrap (cb_pthreads.c:24)
==40673==    by 0x524A181: start_thread (pthread_create.c:312)
==40673==    by 0x555A47C: clone (clone.S:111)

Change-Id: I32a7dfd10daa4565b9cbb4c8142ed8f71c13ca31
Reviewed-on: http://review.couchbase.org/57296
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Reviewed-on: http://review.couchbase.org/57447
Tested-by: Chiyoung Seo <chiyoung@couchbase.com>
src/connmap.h
src/dcp-consumer.h
src/dcp-producer.h
src/dcp-stream.cc
src/dcp-stream.h
src/executorpool.cc
tests/ep_testsuite.cc

index 4a28ac4..d5f451b 100644 (file)
 #include "locks.h"
 #include "syncobject.h"
 #include "atomicqueue.h"
+#include "dcp-consumer.h"
+#include "dcp-producer.h"
 
 // Forward declaration
 class ConnNotifier;
 class TapConsumer;
 class TapProducer;
-class DcpConsumer;
-class DcpProducer;
 class Item;
 class EventuallyPersistentEngine;
 
index fd8a136..c8b8f6f 100644 (file)
@@ -22,8 +22,8 @@
 
 #include "tapconnection.h"
 #include "dcp-stream.h"
+typedef RCPtr<PassiveStream> passive_stream_t;
 
-class PassiveStream;
 class DcpResponse;
 
 class DcpConsumer : public Consumer, public Notifiable {
index 8bff028..3b10b44 100644 (file)
@@ -23,6 +23,8 @@
 #include "tapconnection.h"
 #include "dcp-stream.h"
 
+typedef SingleThreadedRCPtr<Stream> stream_t;
+
 class DcpResponse;
 
 class BufferLog {
index 94680ad..1a72961 100644 (file)
@@ -263,7 +263,7 @@ void Stream::addStats(ADD_STAT add_stat, const void *c) {
     add_casted_stat(buffer, stateName(state_), add_stat, c);
 }
 
-ActiveStream::ActiveStream(EventuallyPersistentEngine* e, DcpProducer* p,
+ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
                            const std::string &n, uint32_t flags,
                            uint32_t opaque, uint16_t vb, uint64_t st_seqno,
                            uint64_t en_seqno, uint64_t vb_uuid,
@@ -854,7 +854,7 @@ const char* ActiveStream::logHeader()
     return producer->logHeader();
 }
 
-NotifierStream::NotifierStream(EventuallyPersistentEngine* e, DcpProducer* p,
+NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
                                const std::string &name, uint32_t flags,
                                uint32_t opaque, uint16_t vb, uint64_t st_seqno,
                                uint64_t en_seqno, uint64_t vb_uuid,
@@ -943,7 +943,7 @@ void NotifierStream::transitionState(stream_state_t newState) {
     state_ = newState;
 }
 
-PassiveStream::PassiveStream(EventuallyPersistentEngine* e, DcpConsumer* c,
+PassiveStream::PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t c,
                              const std::string &name, uint32_t flags,
                              uint32_t opaque, uint16_t vb, uint64_t st_seqno,
                              uint64_t en_seqno, uint64_t vb_uuid,
index 424d892..3ee8b37 100644 (file)
@@ -26,9 +26,13 @@ class EventuallyPersistentEngine;
 class MutationResponse;
 class SetVBucketState;
 class SnapshotMarker;
+class DcpResponse;
+
 class DcpConsumer;
+typedef SingleThreadedRCPtr<DcpConsumer> dcp_consumer_t;
+
 class DcpProducer;
-class DcpResponse;
+typedef SingleThreadedRCPtr<DcpProducer> dcp_producer_t;
 
 typedef enum {
     STREAM_PENDING,
@@ -154,7 +158,7 @@ private:
 
 class ActiveStream : public Stream {
 public:
-    ActiveStream(EventuallyPersistentEngine* e, DcpProducer* p,
+    ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
                  const std::string &name, uint32_t flags, uint32_t opaque,
                  uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
                  uint64_t vb_uuid, uint64_t snap_start_seqno,
@@ -248,13 +252,13 @@ private:
     int waitForSnapshot;
 
     EventuallyPersistentEngine* engine;
-    DcpProducer* producer;
+    dcp_producer_t producer;
     bool isBackfillTaskRunning;
 };
 
 class NotifierStream : public Stream {
 public:
-    NotifierStream(EventuallyPersistentEngine* e, DcpProducer* producer,
+    NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t producer,
                    const std::string &name, uint32_t flags, uint32_t opaque,
                    uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
                    uint64_t vb_uuid, uint64_t snap_start_seqno,
@@ -276,12 +280,12 @@ private:
 
     void transitionState(stream_state_t newState);
 
-    DcpProducer* producer;
+    dcp_producer_t producer;
 };
 
 class PassiveStream : public Stream {
 public:
-    PassiveStream(EventuallyPersistentEngine* e, DcpConsumer* consumer,
+    PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t consumer,
                   const std::string &name, uint32_t flags, uint32_t opaque,
                   uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
                   uint64_t vb_uuid, uint64_t snap_start_seqno,
@@ -331,7 +335,7 @@ private:
     const char* getEndStreamStatusStr(end_stream_status_t status);
 
     EventuallyPersistentEngine* engine;
-    DcpConsumer* consumer;
+    dcp_consumer_t consumer;
     uint64_t last_seqno;
 
     uint64_t cur_snapshot_start;
@@ -349,7 +353,4 @@ private:
     } buffer;
 };
 
-typedef SingleThreadedRCPtr<Stream> stream_t;
-typedef RCPtr<PassiveStream> passive_stream_t;
-
 #endif  // SRC_DCP_STREAM_H_
index 2eac053..56f2444 100644 (file)
@@ -148,7 +148,8 @@ ExecutorPool::ExecutorPool(size_t maxThreads, size_t nTaskSets,
                            size_t maxReaders, size_t maxWriters,
                            size_t maxAuxIO,   size_t maxNonIO) :
                   numTaskSets(nTaskSets), totReadyTasks(0),
-                  isHiPrioQset(false), isLowPrioQset(false), numBuckets(0) {
+                  isHiPrioQset(false), isLowPrioQset(false), numBuckets(0),
+                  numSleepers(0) {
     size_t numCPU = getNumCPU();
     size_t numThreads = (size_t)((numCPU * 3)/4);
     numThreads = (numThreads < EP_MIN_NUM_THREADS) ?
index 007f3da..96d2bf7 100644 (file)
@@ -8419,6 +8419,73 @@ static enum test_result test_dcp_invalid_snapshot_marker(ENGINE_HANDLE* h,
     return SUCCESS;
 }
 
+/*
+ * Test that destroying a DCP producer before it ends
+ * works. MB-16915 reveals itself via valgrind.
+ */
+static enum test_result test_dcp_early_termination(ENGINE_HANDLE* h,
+                                                   ENGINE_HANDLE_V1* h1) {
+
+
+    // create enough streams that some backfill tasks should overlap
+    // with the connection deletion task.
+    const int streams = 100;
+
+    // 1 item so that we will at least allow backfill to be scheduled
+    const int num_items = 1;
+    uint64_t vbuuid[streams];
+    for (int i = 0; i < streams; i++) {
+
+        check(set_vbucket_state(h, h1, i, vbucket_state_active),
+            "Failed to set vbucket state");
+        std::stringstream statkey;
+        statkey << "vb_" << i <<  ":0:id";
+        vbuuid[i] = get_ull_stat(h, h1, statkey.str().c_str(), "failovers");
+
+        /* Set n items */
+
+        for (int count = 0; count < num_items; count++) {
+            std::stringstream key;
+            key << "KEY" << i << count;
+            check(ENGINE_SUCCESS ==
+                  store(h, h1, NULL, OPERATION_SET, key.str().c_str(),
+                        "somevalue", NULL, 0, i, 0), "Error storing.");
+        }
+    }
+    wait_for_flusher_to_settle(h, h1);
+
+    const void *cookie = testHarness.create_cookie();
+    uint32_t opaque = 1;
+    check(h1->dcp.open(h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER,
+                       (void*)"unittest", strlen("unittest")) == ENGINE_SUCCESS,
+          "Failed dcp producer open connection.");
+
+    check(h1->dcp.control(h, cookie, ++opaque, "connection_buffer_size",
+                          strlen("connection_buffer_size"),
+                          "1024", 4) == ENGINE_SUCCESS,
+          "Failed to establish connection buffer");
+
+    struct dcp_message_producers* producers = get_dcp_producers();
+    for (int i = 0; i < streams; i++) {
+        uint64_t rollback = 0;
+        check(h1->dcp.stream_req(h, cookie, DCP_ADD_STREAM_FLAG_DISKONLY,
+                                 ++opaque, i, 0, num_items,
+                                 vbuuid[i], 0, num_items, &rollback,
+                                 mock_dcp_add_failover_log)
+                    == ENGINE_SUCCESS,
+              "Failed to initiate stream request");
+        h1->dcp.step(h, cookie, producers);
+    }
+
+    // Destroy the connection
+    testHarness.destroy_cookie(cookie);
+
+    // Let all AUXIO (backfills) finish
+    wait_for_stat_to_be(h, h1, "ep_workload:LowPrioQ_AuxIO:InQsize", 0, "workload");
+    wait_for_stat_to_be(h, h1, "ep_workload:LowPrioQ_AuxIO:OutQsize", 0, "workload");
+    return SUCCESS;
+}
+
 extern "C" {
     static void wait_for_persistence_thread(void *arg) {
         struct handle_pair *hp = static_cast<handle_pair *>(arg);
@@ -12312,6 +12379,8 @@ engine_test_t* get_tests(void) {
         TestCase("test MB-16357", test_mb16357,
                  test_setup, teardown, "compaction_exp_mem_threshold=85",
                  prepare, cleanup),
+        TestCase("test dcp early termination", test_dcp_early_termination,
+                 test_setup, teardown, NULL, prepare, cleanup),
         TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
     };