MB-19405: [BP] Address possible data races in PassiveStream context 46/63446/3
authorabhinavdangeti <abhinav@couchbase.com>
Thu, 28 Apr 2016 01:08:56 +0000 (18:08 -0700)
committerChiyoung Seo <chiyoung@couchbase.com>
Thu, 28 Apr 2016 18:10:43 +0000 (18:10 +0000)
WARNING: ThreadSanitizer: data race (pid=3212)

  Write of size 8 at 0x7d5000016908 by thread T5 (mutexes: write M26478):
    #0 PassiveStream::reconnectStream(RCPtr<VBucket>&, unsigned int, unsigned long) /home/abhinav/couchbase/ep-engine/src/dcp/stream.cc:1097 (ep.so+0x000000076c0f)
    #1 DcpConsumer::doRollback(unsigned int, unsigned short, unsigned long) /home/abhinav/couchbase/ep-engine/src/dcp/consumer.cc:676 (ep.so+0x00000005db67)
    #2 RollbackTask::run() /home/abhinav/couchbase/ep-engine/src/dcp/consumer.cc:574 (ep.so+0x00000005d9d4)
    #3 ExecutorThread::run() /home/abhinav/couchbase/ep-engine/src/executorthread.cc:112 (ep.so+0x0000000f8916)
    #4 launch_executor_thread(void*) /home/abhinav/couchbase/ep-engine/src/executorthread.cc:33 (ep.so+0x0000000f84b5)
    #5 platform_thread_wrap /home/abhinav/couchbase/platform/src/cb_pthreads.c:23 (libplatform.so.0.1.0+0x000000003d31)

  Previous read of size 8 at 0x7d5000016908 by main thread (mutexes: write M1367):
    #0 PassiveStream::setDead_UNLOCKED(end_stream_status_t, LockHolder*) /home/abhinav/couchbase/ep-engine/src/dcp/stream.cc:1046 (ep.so+0x0000000759ca)
    #1 PassiveStream::setDead(end_stream_status_t) /home/abhinav/couchbase/ep-engine/src/dcp/stream.cc:1056 (ep.so+0x0000000766d7)
    #2 DcpConsumer::closeAllStreams() /home/abhinav/couchbase/ep-engine/src/dcp/consumer.cc:860 (ep.so+0x00000005a006)
    #3 DcpConnMap::disconnect_UNLOCKED(void const*) /home/abhinav/couchbase/ep-engine/src/connmap.cc:1137 (ep.so+0x000000049972)
    #4 DcpConnMap::disconnect(void const*) /home/abhinav/couchbase/ep-engine/src/connmap.cc:1111 (ep.so+0x00000004969b)
    #5 EventuallyPersistentEngine::handleDisconnect(void const*) /home/abhinav/couchbase/ep-engine/src/ep_engine.cc:6224 (ep.so+0x0000000d3bea)
    #6 EvpHandleDisconnect(void const*, ENGINE_EVENT_TYPE, void const*, void const*) /home/abhinav/couchbase/ep-engine/src/ep_engine.cc:1783 (ep.so+0x0000000b7046)
    #7 mock_perform_callbacks /home/abhinav/couchbase/memcached/programs/engine_testapp/mock_server.c:296 (engine_testapp+0x0000000bd420)
    #8 test_rollback_to_zero(engine_interface*, engine_interface_v1*) /home/abhinav/couchbase/ep-engine/tests/ep_testsuite.cc:5434 (ep_testsuite.so+0x00000007f45f)
    #9 execute_test(test, char const*, char const*) /home/abhinav/couchbase/memcached/programs/engine_testapp/engine_testapp.cc:1090 (engine_testapp+0x0000000b946c)
    #10 __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287 (libc.so.6+0x000000021ec4)

(Reviewed-on: http://review.couchbase.org/55785)

Change-Id: I287bd95f8b03cb207419d0a0e57ca71be6058b19
Reviewed-on: http://review.couchbase.org/63446
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
src/dcp-stream.cc
src/dcp-stream.h

index 89d219a..58873fd 100644 (file)
@@ -1155,7 +1155,7 @@ uint32_t PassiveStream::setDead(end_stream_status_t status) {
     uint32_t unackedBytes = clearBuffer();
     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Setting stream to dead state,"
         " last_seqno is %llu, unackedBytes is %u, status is %s",
-        consumer->logHeader(), vb_, last_seqno, unackedBytes,
+        consumer->logHeader(), vb_, last_seqno.load(), unackedBytes,
         getEndStreamStatusStr(status));
     return unackedBytes;
 }
@@ -1189,7 +1189,7 @@ void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
         start_seqno, end_seqno_, snap_start_seqno_, snap_end_seqno_);
 
     LockHolder lh(streamMutex);
-    last_seqno = start_seqno;
+    last_seqno.store(start_seqno);
     pushToReadyQ(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
                                   end_seqno_, vb_uuid_, snap_start_seqno_,
                                   snap_end_seqno_));
@@ -1216,16 +1216,16 @@ ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
         {
             MutationResponse* m = static_cast<MutationResponse*>(resp);
             uint64_t bySeqno = m->getBySeqno();
-            if (bySeqno <= last_seqno) {
+            if (bySeqno <= last_seqno.load()) {
                 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous (out of "
                     "sequence) mutation received, with opaque: %ld, its "
                     "seqno (%llu) is not greater than last received seqno "
                     "(%llu); Dropping mutation!", consumer->logHeader(),
-                    vb_, opaque_, bySeqno, last_seqno);
+                    vb_, opaque_, bySeqno, last_seqno.load());
                 delete m;
                 return ENGINE_ERANGE;
             }
-            last_seqno = bySeqno;
+            last_seqno.store(bySeqno);
             break;
         }
         case DCP_SNAPSHOT_MARKER:
@@ -1233,12 +1233,12 @@ ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
             SnapshotMarker* s = static_cast<SnapshotMarker*>(resp);
             uint64_t snapStart = s->getStartSeqno();
             uint64_t snapEnd = s->getEndSeqno();
-            if (snapStart < last_seqno && snapEnd <= last_seqno) {
+            if (snapStart < last_seqno.load() && snapEnd <= last_seqno.load()) {
                 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous snapshot "
                     "marker received, with opaque: %ld, its start (%llu), and"
                     "end (%llu) are less than last received seqno (%llu); "
                     "Dropping marker!", consumer->logHeader(), vb_, opaque_,
-                    snapStart, snapEnd, last_seqno);
+                    snapStart, snapEnd, last_seqno.load());
                 delete s;
                 return ENGINE_ERANGE;
             }
@@ -1333,11 +1333,11 @@ ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
         return ENGINE_NOT_MY_VBUCKET;
     }
 
-    if (mutation->getBySeqno() > cur_snapshot_end) {
+    if (mutation->getBySeqno() > cur_snapshot_end.load()) {
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous mutation [sequence "
             "number (%llu) greater than current snapshot end seqno (%llu)] "
             "being processed; Dropping the mutation!", consumer->logHeader(),
-            vb_, mutation->getBySeqno(), cur_snapshot_end);
+            vb_, mutation->getBySeqno(), cur_snapshot_end.load());
         return ENGINE_ERANGE;
     }
 
@@ -1345,7 +1345,8 @@ ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
     if (saveSnapshot) {
         LockHolder lh = vb->getSnapshotLock();
         ret = commitMutation(mutation, vb->isBackfillPhase());
-        vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start, cur_snapshot_end);
+        vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start.load(),
+                                        cur_snapshot_end.load());
         saveSnapshot = false;
         lh.unlock();
     } else {
@@ -1385,11 +1386,11 @@ ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
         return ENGINE_NOT_MY_VBUCKET;
     }
 
-    if (deletion->getBySeqno() > cur_snapshot_end) {
+    if (deletion->getBySeqno() > cur_snapshot_end.load()) {
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous deletion [sequence "
             "number (%llu) greater than current snapshot end seqno (%llu)] "
             "being processed; Dropping the deletion!", consumer->logHeader(),
-            vb_, deletion->getBySeqno(), cur_snapshot_end);
+            vb_, deletion->getBySeqno(), cur_snapshot_end.load());
         return ENGINE_ERANGE;
     }
 
@@ -1397,7 +1398,8 @@ ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
     if (saveSnapshot) {
         LockHolder lh = vb->getSnapshotLock();
         ret = commitDeletion(deletion, vb->isBackfillPhase());
-        vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start, cur_snapshot_end);
+        vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start.load(),
+                                        cur_snapshot_end.load());
         saveSnapshot = false;
         lh.unlock();
     } else {
@@ -1435,9 +1437,9 @@ ENGINE_ERROR_CODE PassiveStream::commitDeletion(MutationResponse* deletion,
 void PassiveStream::processMarker(SnapshotMarker* marker) {
     RCPtr<VBucket> vb = engine->getVBucket(vb_);
 
-    cur_snapshot_start = marker->getStartSeqno();
-    cur_snapshot_end = marker->getEndSeqno();
-    cur_snapshot_type = (marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory;
+    cur_snapshot_start.store(marker->getStartSeqno());
+    cur_snapshot_end.store(marker->getEndSeqno());
+    cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory);
     saveSnapshot = true;
 
     if (vb) {
@@ -1474,8 +1476,8 @@ void PassiveStream::processSetVBucketState(SetVBucketState* state) {
 }
 
 void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
-    if (byseqno == cur_snapshot_end) {
-        if (cur_snapshot_type == disk && vb->isBackfillPhase()) {
+    if (byseqno == cur_snapshot_end.load()) {
+        if (cur_snapshot_type.load() == disk && vb->isBackfillPhase()) {
             vb->setBackfillPhase(false);
             uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
             vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
@@ -1499,7 +1501,7 @@ void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
             }
             cur_snapshot_ack = false;
         }
-        cur_snapshot_type = none;
+        cur_snapshot_type.store(none);
         vb->setCurrentSnapshot(byseqno, byseqno);
     }
 }
@@ -1523,18 +1525,18 @@ void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
     snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
     add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
     snprintf(buf, bsize, "%s:stream_%d_last_received_seqno", name_.c_str(), vb_);
-    add_casted_stat(buf, last_seqno, add_stat, c);
+    add_casted_stat(buf, last_seqno.load(), add_stat, c);
     snprintf(buf, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
     add_casted_stat(buf, getReadyQueueMemory(), add_stat, c);
 
     snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type", name_.c_str(), vb_);
-    add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type), add_stat, c);
+    add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type.load()), add_stat, c);
 
-    if (cur_snapshot_type != none) {
+    if (cur_snapshot_type.load() != none) {
         snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start", name_.c_str(), vb_);
-        add_casted_stat(buf, cur_snapshot_start, add_stat, c);
+        add_casted_stat(buf, cur_snapshot_start.load(), add_stat, c);
         snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end", name_.c_str(), vb_);
-        add_casted_stat(buf, cur_snapshot_end, add_stat, c);
+        add_casted_stat(buf, cur_snapshot_end.load(), add_stat, c);
     }
 }
 
index 5461e97..f9c7288 100644 (file)
@@ -428,11 +428,11 @@ private:
 
     EventuallyPersistentEngine* engine;
     dcp_consumer_t consumer;
-    uint64_t last_seqno;
+    AtomicValue<uint64_t> last_seqno;
 
-    uint64_t cur_snapshot_start;
-    uint64_t cur_snapshot_end;
-    snapshot_type_t cur_snapshot_type;
+    AtomicValue<uint64_t> cur_snapshot_start;
+    AtomicValue<uint64_t> cur_snapshot_end;
+    AtomicValue<snapshot_type_t> cur_snapshot_type;
     bool cur_snapshot_ack;
     bool saveSnapshot;