MB-19275: Address data race on a DCP stream's state 27/63027/6
authorabhinavdangeti <abhinav@couchbase.com>
Wed, 28 Oct 2015 21:58:55 +0000 (14:58 -0700)
committerChiyoung Seo <chiyoung@couchbase.com>
Sat, 23 Apr 2016 01:12:27 +0000 (01:12 +0000)
WARNING: ThreadSanitizer: data race (pid=139161)
  Read of size 4 at 0x7d480000b150 by thread T31 (mutexes: write M51120):
    #0 DCPBackfill::scan() ep-engine/src/dcp/stream.h:126 (ep.so+0x000000053391)
    #1 DCPBackfill::run() ep-engine/src/dcp/backfill.cc:118 (ep.so+0x000000052737)
    #2 BackfillManager::backfill() ep-engine/src/dcp/backfill-manager.cc:240 (ep.so+0x00000004cf65)
    #3 BackfillManagerTask::run() ep-engine/src/dcp/backfill-manager.cc:43 (ep.so+0x00000004cb8f)
    #4 ExecutorThread::run() ep-engine/src/executorthread.cc:115 (ep.so+0x0000000eb94d)
    #5 launch_executor_thread(void*) ep-engine/src/executorthread.cc:33 (ep.so+0x0000000eb515)
    #6 platform_thread_wrap(void*) platform/src/cb_pthreads.cc:53 (libplatform.so.0.1.0+0x0000000048ab)

  Previous write of size 4 at 0x7d480000b150 by main thread (mutexes: write M1241, write M32448, write M51071, write M51087):
    #0 ActiveStream::transitionState(stream_state_t) ep-engine/src/dcp/stream.cc:829 (ep.so+0x00000006accb)
    #1 ActiveStream::endStream(end_stream_status_t) ep-engine/src/dcp/stream.cc:688 (ep.so+0x00000006a8c2)
    #2 ActiveStream::setDead(end_stream_status_t) ep-engine/src/dcp/stream.cc:654 (ep.so+0x00000006f27b)
    #3 DcpProducer::setDisconnect(bool) ep-engine/src/dcp/producer.cc:835 (ep.so+0x000000065605)
    #4 DcpConnMap::disconnect_UNLOCKED(void const*) ep-engine/src/connmap.cc:1116 (ep.so+0x000000045d6c)
    #5 DcpConnMap::disconnect(void const*) ep-engine/src/connmap.cc:1109 (ep.so+0x000000045c8b)
    #6 EventuallyPersistentEngine::handleDisconnect(void const*) ep-engine/src/ep_engine.cc:6265 (ep.so+0x0000000ca38a)
    #7 EvpHandleDisconnect(void const*, ENGINE_EVENT_TYPE, void const*, void const*) ep-engine/src/ep_engine.cc:1802 (ep.so+0x0000000af976)
    #8 destroy_mock_cookie memcached/programs/engine_testapp/mock_server.cc:325 (engine_testapp+0x0000004f4082)
    #9 dcp_stream_req(engine_interface*, engine_interface_v1*, unsigned int, unsigned short, unsigned long, unsigned long, unsigned long, unsigned long, unsigned long, unsigned long, ENGINE_ERROR_CODE) ep-engine/tests/ep_testsuite.cc:4331 (ep_testsuite.so+0x000000090b06)
    #10 test_failover_log_dcp(engine_interface*, engine_interface_v1*) ep-engine/tests/ep_testsuite.cc:14127 (ep_testsuite.so+0x00000007ce7a)
    #11 execute_test(test, char const*, char const*) memcached/programs/engine_testapp/engine_testapp.cc:1091 (engine_testapp+0x0000004cb315)
    #12 __libc_start_main /build/buildd/eglibc-2.15/csu/libc-start.c:226 (libc.so.6+0x00000002176c)

Change-Id: Icfc82fa877999d128184c9cac8f8c0e1cafc4e67
Reviewed-on: http://review.couchbase.org/63027
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Will Gardner <will.gardner@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/dcp-stream.cc
src/dcp-stream.h

index 586a9c4..19b12d4 100644 (file)
@@ -316,7 +316,7 @@ DcpResponse* ActiveStream::next() {
 
     DcpResponse* response = NULL;
 
-    switch (state_) {
+    switch (initState) {
         case STREAM_PENDING:
             break;
         case STREAM_BACKFILLING:
@@ -340,7 +340,9 @@ DcpResponse* ActiveStream::next() {
             abort();
     }
 
-    if (state_ != STREAM_DEAD && initState != state_ && !response) {
+    stream_state_t newState = state_;
+
+    if (newState != STREAM_DEAD && newState != state_ && !response) {
         lh.unlock();
         return next();
     }
@@ -916,7 +918,7 @@ void ActiveStream::transitionState(stream_state_t newState) {
         return;
     }
 
-    switch (state_) {
+    switch (state_.load()) {
         case STREAM_PENDING:
             cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
             break;
@@ -1099,7 +1101,7 @@ void NotifierStream::transitionState(stream_state_t newState) {
         return;
     }
 
-    switch (state_) {
+    switch (state_.load()) {
         case STREAM_PENDING:
             cb_assert(newState == STREAM_DEAD);
             break;
@@ -1571,7 +1573,7 @@ void PassiveStream::transitionState(stream_state_t newState) {
         return;
     }
 
-    switch (state_) {
+    switch (state_.load()) {
         case STREAM_PENDING:
             cb_assert(newState == STREAM_READING || newState == STREAM_DEAD);
             break;
index b5d4f51..3623677 100644 (file)
@@ -144,7 +144,7 @@ protected:
     uint64_t vb_uuid_;
     uint64_t snap_start_seqno_;
     uint64_t snap_end_seqno_;
-    stream_state_t state_;
+    AtomicValue<stream_state_t> state_;
     stream_type_t type_;
 
     AtomicValue<bool> itemsReady;