MB-18171: Break cyclic reference between ActiveStream & ChkptProcesser 60/60060/4
authorabhinavdangeti <abhinav@couchbase.com>
Tue, 16 Feb 2016 18:49:09 +0000 (10:49 -0800)
committerabhinav dangeti <abhinav@couchbase.com>
Wed, 17 Feb 2016 18:07:15 +0000 (18:07 +0000)
Removing circular dependency between ActiveStream and
ActiveStreamCheckpointProcesserTask where each holds a reference
to the other causing a memory leak during shutdown.

Also explicitly clear the queues of checkpointProcessor task upon
disconnection of the DcpProducer, so as to remove a cyclic reference
between DcpProducer, ActiveStream, and ActiveStreamCheckpointProcesserTask.

Change-Id: Ifac03a40132431476a6b5000725ce972068b47f4
Reviewed-on: http://review.couchbase.org/60060
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/connmap.cc
src/dcp-producer.cc
src/dcp-producer.h
src/dcp-stream.cc
src/dcp-stream.h
tests/module_tests/stream_test.cc

index e2e88ba..72e8583 100644 (file)
@@ -1055,6 +1055,7 @@ void DcpConnMap::closeAllStreams_UNLOCKED() {
         DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
         if (producer) {
             producer->closeAllStreams();
+            producer->clearCheckpointProcessorTaskQueues();
         } else {
             static_cast<DcpConsumer*>(itr->second.get())->closeAllStreams();
         }
@@ -1088,6 +1089,7 @@ void DcpConnMap::disconnect_UNLOCKED(const void *cookie) {
         DcpProducer* producer = dynamic_cast<DcpProducer*> (conn.get());
         if (producer) {
             producer->closeAllStreams();
+            producer->clearCheckpointProcessorTaskQueues();
         } else {
             static_cast<DcpConsumer*>(conn.get())->closeAllStreams();
         }
index 7f0adee..dfe6ff0 100644 (file)
@@ -264,8 +264,7 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
         streams[vbucket] = new ActiveStream(&engine_, this, getName(), flags,
                                             opaque, vbucket, start_seqno,
                                             end_seqno, vbucket_uuid,
-                                            snap_start_seqno, snap_end_seqno,
-                                            checkpointCreatorTask);
+                                            snap_start_seqno, snap_end_seqno);
         static_cast<ActiveStream*>(streams[vbucket].get())->setActive();
     }
 
@@ -799,3 +798,13 @@ void DcpProducer::flush() {
 bool DcpProducer::bufferLogInsert(size_t bytes) {
     return log.insert(bytes);
 }
+
+void DcpProducer::scheduleCheckpointProcessorTask(stream_t s) {
+    static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
+        ->schedule(s);
+}
+
+void DcpProducer::clearCheckpointProcessorTaskQueues() {
+    static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
+        ->clearQueues();
+}
index 6397041..6bc8294 100644 (file)
@@ -182,6 +182,17 @@ public:
     */
     bool bufferLogInsert(size_t bytes);
 
+    /*
+        Schedules active stream checkpoint processor task
+        for given stream.
+    */
+    void scheduleCheckpointProcessorTask(stream_t s);
+
+    /*
+        Clears active stream checkpoint processor task's queue.
+    */
+    void clearCheckpointProcessorTaskQueues();
+
 private:
 
     /**
index 0fde627..0caeb5d 100644 (file)
@@ -268,8 +268,7 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
                            const std::string &n, uint32_t flags,
                            uint32_t opaque, uint16_t vb, uint64_t st_seqno,
                            uint64_t en_seqno, uint64_t vb_uuid,
-                           uint64_t snap_start_seqno, uint64_t snap_end_seqno,
-                           ExTask task)
+                           uint64_t snap_start_seqno, uint64_t snap_end_seqno)
     :  Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
               snap_start_seqno, snap_end_seqno),
        lastReadSeqno(st_seqno), lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
@@ -277,7 +276,7 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
        itemsFromBackfill(0), itemsFromMemory(0), firstMarkerSent(false),
        waitForSnapshot(0), engine(e), producer(p),
        isBackfillTaskRunning(false), lastSentSnapEndSeqno(0),
-       checkpointCreatorTask(task), chkptItemsExtractionInProgress(false) {
+       chkptItemsExtractionInProgress(false) {
 
     const char* type = "";
     if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
@@ -631,8 +630,7 @@ bool ActiveStream::nextCheckpointItem() {
     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
     if (vbucket && vbucket->checkpointManager.getNumItemsForTAPConnection(name_) > 0) {
         // schedule this stream to build the next checkpoint
-        static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
-        ->schedule(this);
+        producer->scheduleCheckpointProcessorTask(this);
         return true;
     } else if (chkptItemsExtractionInProgress) {
         return true;
@@ -689,6 +687,14 @@ void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
     }
 }
 
+void ActiveStreamCheckpointProcessorTask::clearQueues() {
+    LockHolder lh(workQueueLock);
+    while (!queue.empty()) {
+        queue.pop();
+    }
+    queuedVbuckets.clear();
+}
+
 void ActiveStream::nextCheckpointItemTask() {
     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
     if (vbucket) {
index e0c6888..09d1027 100644 (file)
@@ -168,7 +168,7 @@ public:
                  const std::string &name, uint32_t flags, uint32_t opaque,
                  uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
                  uint64_t vb_uuid, uint64_t snap_start_seqno,
-                 uint64_t snap_end_seqno, ExTask task);
+                 uint64_t snap_end_seqno);
 
     ~ActiveStream() {
         LockHolder lh(streamMutex);
@@ -277,8 +277,6 @@ private:
     //! Last snapshot end seqno sent to the DCP client
     uint64_t lastSentSnapEndSeqno;
 
-    ExTask checkpointCreatorTask;
-
     /* Flag used by checkpointCreatorTask that is set before all items are
        extracted for given checkpoint cursor, and is unset after all retrieved
        items are added to the readyQ */
@@ -303,6 +301,7 @@ public:
     bool run();
     void schedule(stream_t stream);
     void wakeup();
+    void clearQueues();
 
 private:
 
index 0fa981b..172d32b 100644 (file)
@@ -37,9 +37,9 @@ public:
                      const std::string &name, uint32_t flags, uint32_t opaque,
                      uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
                      uint64_t vb_uuid, uint64_t snap_start_seqno,
-                     uint64_t snap_end_seqno, ExTask task)
+                     uint64_t snap_end_seqno)
     : ActiveStream(e, p, name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
-                   snap_start_seqno, snap_end_seqno, task) {}
+                   snap_start_seqno, snap_end_seqno) {}
 
     // Expose underlying protected ActiveStream methods as public
     void public_getOutstandingItems(RCPtr<VBucket> &vb,
@@ -101,7 +101,6 @@ static void test_mb17766(const std::string& test_dbname) {
     dcp_producer_t producer = new DcpProducer(*engine, /*cookie*/NULL,
                                               "test_mb_17766_producer",
                                               /*notifyOnly*/false);
-    ExTask task = new ActiveStreamCheckpointProcessorTask(*engine);
     stream_t stream = new MockActiveStream(engine, producer,
                                            producer->getName(),
                                            /*flags*/0,
@@ -110,7 +109,7 @@ static void test_mb17766(const std::string& test_dbname) {
                                             /*en_seqno*/~0,
                                             /*vb_uuid*/0xabcd,
                                             /*snap_start_seqno*/0,
-                                            /*snap_end_seqno*/~0, task);
+                                            /*snap_end_seqno*/~0);
 
     RCPtr<VBucket> vb0 = engine->getVBucket(0);
     EXPECT_EQ(true, vb0, "Failed to get valid VBucket object for id 0");
@@ -139,6 +138,8 @@ static void test_mb17766(const std::string& test_dbname) {
     EXPECT_EQ(false,
               mock_stream->public_nextCheckpointItem(),
               "nextCheckpointItem() after processing items should be false.");
+
+    producer->clearCheckpointProcessorTaskQueues();
 }
 
 int main(int argc, char **argv) {