[BP] MB-16915: Remove cyclic reference between DcpConsumer and PassiveStream. 49/57449/2
authorManu Dhundi <manu@couchbase.com>
Thu, 3 Dec 2015 21:32:35 +0000 (13:32 -0800)
committerChiyoung Seo <chiyoung@couchbase.com>
Thu, 3 Dec 2015 23:35:57 +0000 (23:35 +0000)
DcpConsumer holds a reference to PassiveStream and vice versa. We must
make sure that one of them (DcpConsumer here) releases the reference
to another in a function other than the object destructor.

Change-Id: I8e5c262bc5ac50342f85ba80d481987a26a7a21d
Reviewed-on: http://review.couchbase.org/57429
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-on: http://review.couchbase.org/57449
Tested-by: Chiyoung Seo <chiyoung@couchbase.com>
src/dcp-consumer.cc
tests/ep_testsuite.cc

index 01d970b..3c8a55c 100644 (file)
@@ -91,7 +91,6 @@ DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
 }
 
 DcpConsumer::~DcpConsumer() {
-    closeAllStreams();
     delete[] streams;
 }
 
@@ -145,6 +144,7 @@ ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
 
 ENGINE_ERROR_CODE DcpConsumer::closeStream(uint32_t opaque, uint16_t vbucket) {
     if (doDisconnect()) {
+        streams[vbucket].reset();
         return ENGINE_DISCONNECT;
     }
 
@@ -162,6 +162,7 @@ ENGINE_ERROR_CODE DcpConsumer::closeStream(uint32_t opaque, uint16_t vbucket) {
 
     uint32_t bytesCleared = stream->setDead(END_STREAM_CLOSED);
     flowControl.freedBytes.fetch_add(bytesCleared);
+    streams[vbucket].reset();
     return ENGINE_SUCCESS;
 }
 
@@ -701,6 +702,7 @@ void DcpConsumer::closeAllStreams() {
         passive_stream_t stream = streams[vbucket];
         if (stream) {
             stream->setDead(END_STREAM_DISCONNECTED);
+            streams[vbucket].reset();
         }
     }
 }
index 96d2bf7..2aba386 100644 (file)
@@ -4954,7 +4954,8 @@ static enum test_result test_dcp_close_stream(ENGINE_HANDLE *h,
           "Expected success");
 
     state = get_str_stat(h, h1, "eq_dcpq:unittest:stream_0_state", "dcp");
-    check(state.compare("dead") == 0, "Expected stream in dead state");
+    checkeq(static_cast<unsigned long>(0), state.length(),
+            "Did not expect to find the closed stream");
 
     testHarness.destroy_cookie(cookie);
     return SUCCESS;