MB-19228: Address possible data races in ActiveStream context 18/62918/6
authorabhinavdangeti <abhinav@couchbase.com>
Mon, 5 Oct 2015 22:22:48 +0000 (15:22 -0700)
committerChiyoung Seo <chiyoung@couchbase.com>
Sat, 23 Apr 2016 01:00:48 +0000 (01:00 +0000)
Address possible data races in ActiveStream context when gathering
stats.

WARNING: ThreadSanitizer: data race (pid=27028)

  Read of size 8 at 0x7d480000b1f8 by main thread (mutexes: write M32941632, write M1367, write M32940809):
    #0 void STATWRITER_NAMESPACE::add_casted_stat<unsigned long>(char const*, unsigned long const&, void (*)(char const*, unsigned short, char const*, unsigned int, void const*), void const*) /home/abhinav/couchbase/ep-engine/src/statwriter.h:45 (ep.so+0x000000037825)
    #1 ActiveStream::addStats(void (*)(char const*, unsigned short, char const*, unsigned int, void const*), void const*) /home/abhinav/couchbase/ep-engine/src/dcp/stream.cc:477 (ep.so+0x000000071d16)
    #2 DcpProducer::addStats(void (*)(char const*, unsigned short, char const*, unsigned int, void const*), void const*) /home/abhinav/couchbase/ep-engine/src/dcp/producer.cc:602 (ep.so+0x000000068057)
    #3 ConnStatBuilder::operator()(SingleThreadedRCPtr<ConnHandler>&) /home/abhinav/couchbase/ep-engine/src/ep_engine.cc:3887 (ep.so+0x0000000e13e1)
    #4 EventuallyPersistentEngine::doDcpStats(void const*, void (*)(char const*, unsigned short, char const*, unsigned int, void const*)) /home/abhinav/couchbase/ep-engine/src/ep_engine.cc:4144 (ep.so+0x0000000c151a)
    #5 EventuallyPersistentEngine::getStats(void const*, char const*, int, void (*)(char const*, unsigned short, char const*, unsigned int, void const*)) /home/abhinav/couchbase/ep-engine/src/ep_engine.cc:4564 (ep.so+0x0000000c5405)
    #6 EvpGetStats(engine_interface*, void const*, char const*, int, void (*)(char const*, unsigned short, char const*, unsigned int, void const*)) /home/abhinav/couchbase/ep-engine/src/ep_engine.cc:213 (ep.so+0x0000000b422e)
    #7 mock_get_stats(engine_interface*, void const*, char const*, int, void (*)(char const*, unsigned short, char const*, unsigned int, void const*)) /home/abhinav/couchbase/memcached/programs/engine_testapp/engine_testapp.cc:239 (engine_testapp+0x0000000ba9ad)
    #8 get_int_stat(engine_interface*, engine_interface_v1*, char const*, char const*) /home/abhinav/couchbase/ep-engine/tests/ep_test_apis.cc:990 (ep_testsuite.so+0x0000000aeb81)
    #9 dcp_stream(engine_interface*, engine_interface_v1*, char const*, void const*, unsigned short, unsigned int, unsigned long, unsigned long, unsigned long, unsigned long, unsigned long, int, int, int, int, bool, bool, unsigned char, bool, unsigned long*, bool) /home/abhinav/couchbase/ep-engine/tests/ep_testsuite.cc:4090 (ep_testsuite.so+0x00000009790c)
    #10 test_dcp_producer_stream_req_dgm(engine_interface*, engine_interface_v1*) /home/abhinav/couchbase/ep-engine/tests/ep_testsuite.cc:4564 (ep_testsuite.so+0x000000077604)
    #11 execute_test(test, char const*, char const*) /home/abhinav/couchbase/memcached/programs/engine_testapp/engine_testapp.cc:1090 (engine_testapp+0x0000000b946c)
    #12 __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287 (libc.so.6+0x000000021ec4)

  Previous write of size 8 at 0x7d480000b1f8 by thread T9 (mutexes: write M32940880, write M32940855):
    #0 ActiveStream::backfillReceived(Item*, backfill_source_t) /home/abhinav/couchbase/ep-engine/src/dcp/stream.cc:287 (ep.so+0x00000007054e)
    #1 DiskCallback::callback(GetValue&) /home/abhinav/couchbase/ep-engine/src/dcp/backfill.cc:94 (ep.so+0x000000056067)
    #2 CouchKVStore::recordDbDump(_db*, _docinfo*, void*) /home/abhinav/couchbase/ep-engine/src/couch-kvstore/couch-kvstore.cc:1757 (ep.so+0x00000018103f)
    #3 recordDbDumpC(_db*, _docinfo*, void*) /home/abhinav/couchbase/ep-engine/src/couch-kvstore/couch-kvstore.cc:66 (ep.so+0x00000017fcc5)
    #4 lookup_callback(couchfile_lookup_request*, _sized_buf const*, _sized_buf const*) /home/abhinav/couchbase/couchstore/src/couch_db.cc:767 (libcouchstore.so+0x00000000d7f5)
    #5 btree_lookup_inner(couchfile_lookup_request*, unsigned long, int, int) /home/abhinav/couchbase/couchstore/src/btree_read.cc:99 (libcouchstore.so+0x00000000b5b2)
    #6 btree_lookup_inner(couchfile_lookup_request*, unsigned long, int, int) /home/abhinav/couchbase/couchstore/src/btree_read.cc:69 (libcouchstore.so+0x00000000b370)
    #7 btree_lookup_inner(couchfile_lookup_request*, unsigned long, int, int) /home/abhinav/couchbase/couchstore/src/btree_read.cc:69 (libcouchstore.so+0x00000000b370)
    #8 btree_lookup /home/abhinav/couchbase/couchstore/src/btree_read.cc:131 (libcouchstore.so+0x00000000b00c)
    #9 couchstore_changes_since /home/abhinav/couchbase/couchstore/src/couch_db.cc:812 (libcouchstore.so+0x00000000d601)
    #10 CouchKVStore::scan(ScanContext*) /home/abhinav/couchbase/ep-engine/src/couch-kvstore/couch-kvstore.cc:1264 (ep.so+0x00000017f77e)
    #11 DCPBackfill::scan() /home/abhinav/couchbase/ep-engine/src/dcp/backfill.cc:193 (ep.so+0x000000057672)
    #12 DCPBackfill::run() /home/abhinav/couchbase/ep-engine/src/dcp/backfill.cc:118 (ep.so+0x000000056647)
    #13 BackfillManager::backfill() /home/abhinav/couchbase/ep-engine/src/dcp/backfill-manager.cc:240 (ep.so+0x0000000508d5)
    #14 BackfillManagerTask::run() /home/abhinav/couchbase/ep-engine/src/dcp/backfill-manager.cc:43 (ep.so+0x00000005052f)
    #15 ExecutorThread::run() /home/abhinav/couchbase/ep-engine/src/executorthread.cc:112 (ep.so+0x0000000f8796)
    #16 launch_executor_thread(void*) /home/abhinav/couchbase/ep-engine/src/executorthread.cc:33 (ep.so+0x0000000f8335)
    #17 platform_thread_wrap /home/abhinav/couchbase/platform/src/cb_pthreads.c:23 (libplatform.so.0.1.0+0x000000003d31)

Change-Id: I166917524b5fcad285b3623ff160e875c316d983
Reviewed-on: http://review.couchbase.org/62918
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Will Gardner <will.gardner@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/dcp-stream.cc
src/dcp-stream.h

index 6796f80..cd4d281 100644 (file)
@@ -421,7 +421,7 @@ void ActiveStream::completeBackfill() {
         LockHolder lh(streamMutex);
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Backfill complete, %d items read"
             " from disk, last seqno read: %ld", producer->logHeader(), vb_,
-            itemsFromBackfill, lastReadSeqno);
+            itemsFromBackfill, lastReadSeqno.load());
     }
 
     isBackfillTaskRunning.store(false);
@@ -455,7 +455,7 @@ void ActiveStream::setVBucketStateAckRecieved() {
             RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
             LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Vbucket marked as "
                 "dead, last sent seqno: %" PRIu64 ", high seqno: %" PRIu64 "",
-                producer->logHeader(), vb_, lastSentSeqno,
+                producer->logHeader(), vb_, lastSentSeqno.load(),
                 vbucket->getHighSeqno());
         } else {
             LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") Receive ack for set "
@@ -487,8 +487,8 @@ DcpResponse* ActiveStream::backfillPhase() {
     }
 
     if (!isBackfillTaskRunning && readyQ.empty()) {
-        backfillRemaining = 0;
-        if (lastReadSeqno >= end_seqno_) {
+        backfillRemaining.store(0, memory_order_relaxed);
+        if (lastReadSeqno.load() >= end_seqno_) {
             endStream(END_STREAM_OK);
         } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
             transitionState(STREAM_TAKEOVER_SEND);
@@ -507,7 +507,7 @@ DcpResponse* ActiveStream::backfillPhase() {
 }
 
 DcpResponse* ActiveStream::inMemoryPhase() {
-    if (lastSentSeqno >= end_seqno_) {
+    if (lastSentSeqno.load() >= end_seqno_) {
         endStream(END_STREAM_OK);
     } else if (readyQ.empty()) {
         if (nextCheckpointItem()) {
@@ -556,9 +556,9 @@ void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
     snprintf(buffer, bsize, "%s:stream_%d_memory", name_.c_str(), vb_);
     add_casted_stat(buffer, itemsFromMemory, add_stat, c);
     snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
-    add_casted_stat(buffer, lastSentSeqno, add_stat, c);
+    add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
     snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno", name_.c_str(), vb_);
-    add_casted_stat(buffer, lastReadSeqno, add_stat, c);
+    add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
     snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
     add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
     snprintf(buffer, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
@@ -830,7 +830,8 @@ void ActiveStream::endStream(end_stream_status_t reason) {
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream closing, %llu items sent"
             " from disk, %llu items sent from memory, %llu was last seqno sent"
             " %s is the reason", producer->logHeader(), vb_, itemsFromBackfill,
-            itemsFromMemory, lastSentSeqno, getEndStreamStatusStr(reason));
+            itemsFromMemory.load(), lastSentSeqno.load(),
+            getEndStreamStatusStr(reason));
     }
 }
 
@@ -988,12 +989,12 @@ size_t ActiveStream::getItemsRemaining() {
     uint64_t high_seqno = vbucket->getHighSeqno();
 
     if (end_seqno_ < high_seqno) {
-        if (end_seqno_ > lastSentSeqno) {
-            return (end_seqno_ - lastSentSeqno);
+        if (end_seqno_ > lastSentSeqno.load()) {
+            return (end_seqno_ - lastSentSeqno.load());
         }
     } else {
-        if (high_seqno > lastSentSeqno) {
-            return (high_seqno - lastSentSeqno);
+        if (high_seqno > lastSentSeqno.load()) {
+            return (high_seqno - lastSentSeqno.load());
         }
     }
 
index 09d1027..237111c 100644 (file)
@@ -194,7 +194,7 @@ public:
     void setVBucketStateAckRecieved();
 
     void incrBackfillRemaining(size_t by) {
-        backfillRemaining += by;
+        backfillRemaining.fetch_add(by);
     }
 
     void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
@@ -251,20 +251,24 @@ private:
     bool isCurrentSnapshotCompleted() const;
 
     //! The last sequence number queued from disk or memory
-    uint64_t lastReadSeqno;
+    AtomicValue<uint64_t> lastReadSeqno;
     //! The last sequence number sent to the network layer
-    uint64_t lastSentSeqno;
+    AtomicValue<uint64_t> lastSentSeqno;
     //! The last known seqno pointed to by the checkpoint cursor
     uint64_t curChkSeqno;
     //! The current vbucket state to send in the takeover stream
     vbucket_state_t takeoverState;
-    //! The amount of items remaining to be read from disk
-    size_t backfillRemaining;
+    /* backfillRemaining is a stat recording the amount of
+     * items remaining to be read from disk.  It is an atomic
+     * because otherwise the function incrBackfillRemaining
+     * must acquire the streamMutex lock.
+     */
+    AtomicValue <size_t> backfillRemaining;
 
     //! The amount of items that have been read from disk
     size_t itemsFromBackfill;
     //! The amount of items that have been read from memory
-    size_t itemsFromMemory;
+    AtomicValue<size_t> itemsFromMemory;
     //! Whether ot not this is the first snapshot marker sent
     bool firstMarkerSent;