MB-19252: Fix data race on Stream::readyQueueMemory 72/62972/6
authorDave Rigby <daver@couchbase.com>
Mon, 18 Apr 2016 13:47:19 +0000 (14:47 +0100)
committerChiyoung Seo <chiyoung@couchbase.com>
Sat, 23 Apr 2016 01:05:30 +0000 (01:05 +0000)
As detected by TSan:

WARNING: ThreadSanitizer: data race (pid=17244)
  Read of size 8 at 0x7d480000b370 by main thread (mutexes: write M24165, write M969, read M24121):
    #0 Stream::getReadyQueueMemory() /home/daver/repos/couchbase/server/ep-engine/src/dcp-stream.cc:234 (ep.so+0x00000028f51e)
    #1 ActiveStream::addStats(void (*)(char const*, unsigned short, char const*, unsigned int, void const*), void const*) /home/daver/repos/couchbase/server/ep-engine/src/dcp-stream.cc:563 (ep.so+0x00000029452f)
    #2 DcpProducer::addStats(void (*)(char const*, unsigned short, char const*, unsigned int, void const*), void const*) /home/daver/repos/couchbase/server/ep-engine/src/dcp-producer.cc:551 (ep.so+0x00000027f1a0)
    #3 ConnStatBuilder::operator()(SingleThreadedRCPtr<ConnHandler>&) /home/daver/repos/couchbase/server/ep-engine/src/ep_engine.cc:3696 (ep.so+0x000000182d54)

  Previous write of size 8 at 0x7d480000b370 by thread T16 (mutexes: write M24143):
    #0 Stream::pushToReadyQ(DcpResponse*) /home/daver/repos/couchbase/server/ep-engine/src/dcp-stream.cc:211 (ep.so+0x00000028f4a6)
    #1 ActiveStream::backfillReceived(Item*) /home/daver/repos/couchbase/server/ep-engine/src/dcp-stream.cc:407 (ep.so+0x00000028d6e5)
    #2 CacheCallback::callback(CacheLookup&) /home/daver/repos/couchbase/server/ep-engine/src/dcp-stream.cc:87 (ep.so+0x00000028d4b3)
    #3 CouchKVStore::recordDbDump(_db*, _docinfo*, void*) /home/daver/repos/couchbase/server/ep-engine/src/couch-kvstore/couch-kvstore.cc:1563 (ep.so+0x00000031dec5)

See also: http://review.couchbase.org/54314 which originally fixed
this issue in watson; however it also fixed a couple of other issues
in the same patch.

Change-Id: Iae6a34403394e54c9d7213a7c2703be761e7dc0f
Reviewed-on: http://review.couchbase.org/62972
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Will Gardner <will.gardner@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/dcp-stream.cc
src/dcp-stream.h

index cd4d281..a90fd38 100644 (file)
@@ -208,7 +208,8 @@ void Stream::pushToReadyQ(DcpResponse* resp)
 {
     if (resp) {
         readyQ.push(resp);
-        readyQueueMemory += resp->getMessageSize();
+        readyQueueMemory.fetch_add(resp->getMessageSize(),
+                                   memory_order_relaxed);
     }
 }
 
@@ -218,20 +219,20 @@ void Stream::popFromReadyQ(void)
         uint32_t respSize = readyQ.front()->getMessageSize();
         readyQ.pop();
         /* Decrement the readyQ size */
-        if ((readyQueueMemory - respSize) <= readyQueueMemory) {
-            readyQueueMemory -= respSize;
+        if (respSize <= readyQueueMemory.load(memory_order_relaxed)) {
+            readyQueueMemory.fetch_sub(respSize, memory_order_relaxed);
         } else {
             LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
                 "underflow, likely wrong stat calculation! curr size: %llu;"
-                "new size: %d", name_.c_str(), getVBucket(), readyQueueMemory,
+                "new size: %d", name_.c_str(), getVBucket(), readyQueueMemory.load(),
                 respSize);
-            readyQueueMemory = 0;
+            readyQueueMemory.store(0, memory_order_relaxed);
         }
     }
 }
 
 uint64_t Stream::getReadyQueueMemory() {
-    return readyQueueMemory;
+    return readyQueueMemory.load(memory_order_relaxed);
 }
 
 const char * Stream::stateName(stream_state_t st) const {
index 237111c..b5d4f51 100644 (file)
@@ -154,8 +154,11 @@ protected:
     const static uint64_t dcpMaxSeqno;
 
 private:
-    /* This tracks the memory occupied by elements in the readyQ */
-    uint64_t readyQueueMemory;
+    /* readyQueueMemory tracks the memory occupied by elements
+     * in the readyQ.  It is an atomic because otherwise
+       getReadyQueueMemory would need to acquire streamMutex.
+     */
+    AtomicValue <uint64_t> readyQueueMemory;
 };
 
 typedef RCPtr<Stream> stream_t;