MB-19093 [BP]: [ActiveStream] Address potential lock-inversion scenarios 98/62498/4
authorabhinavdangeti <abhinav@couchbase.com>
Fri, 15 Jan 2016 16:36:52 +0000 (08:36 -0800)
committerabhinav dangeti <abhinav@couchbase.com>
Thu, 7 Apr 2016 23:11:34 +0000 (23:11 +0000)
Acquire vbucket state lock only when really necessary
in the ActiveStream context. Also avoid acquiring one
lock within the other wherever possible in the ActiveStream
context again.

This change is to avert potential deadlocks due to
lock inversion that will be induced by upcoming changes,
here are the scenarios:
(i)     Locking between streamsMutex, streamMutex and
        vb_stateLock in the set operation - handle
        response scenario.
        (http://factory.couchbase.com/job/ep-engine-threadsanitizer-master/1225/console)
(ii)    In case of a set operation, vb_stateLock is
        acquired and then streamMutex is acquired for
        notification. During markDiskSnapshot, the
        streamMutex is acquired before the vb_stateLock
        lock is acquired.
        (http://factory.couchbase.com/job/ep-engine-threadsanitizer-master/1268/console)

(Already reviewed at: http://review.couchbase.org/58557)

Change-Id: I5e5a3e2cc5ba9ae17090e1a3ee4bde100d305f1c
Reviewed-on: http://review.couchbase.org/62498
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/dcp-producer.cc
src/dcp-stream.cc

index 75d7115..8c68354 100644 (file)
@@ -253,19 +253,23 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
         return rv;
     }
 
+    stream_t s;
     if (notifyOnly) {
+        s = new NotifierStream(&engine_, this, getName(), flags,
+                               opaque, vbucket, notifySeqno,
+                               end_seqno, vbucket_uuid,
+                               snap_start_seqno, snap_end_seqno);
+   } else {
+        s = new ActiveStream(&engine_, this, getName(), flags,
+                             opaque, vbucket, start_seqno,
+                             end_seqno, vbucket_uuid,
+                             snap_start_seqno, snap_end_seqno);
+        static_cast<ActiveStream*>(s.get())->setActive();
+    }
+
+    {
         WriterLockHolder wlh(streamsMutex);
-        streams[vbucket] = new NotifierStream(&engine_, this, getName(), flags,
-                                              opaque, vbucket, notifySeqno,
-                                              end_seqno, vbucket_uuid,
-                                              snap_start_seqno, snap_end_seqno);
-    } else {
-        WriterLockHolder wlh(streamsMutex);
-        streams[vbucket] = new ActiveStream(&engine_, this, getName(), flags,
-                                            opaque, vbucket, start_seqno,
-                                            end_seqno, vbucket_uuid,
-                                            snap_start_seqno, snap_end_seqno);
-        static_cast<ActiveStream*>(streams[vbucket].get())->setActive();
+        streams[vbucket] = s;
     }
 
     ready.pushUnique(vbucket);
index 0caeb5d..6796f80 100644 (file)
@@ -360,18 +360,17 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
     firstMarkerSent = true;
 
     RCPtr<VBucket> vb = engine->getVBucket(vb_);
-    if (vb) {
-        ReaderLockHolder rlh(vb->getStateLock());
-        if (vb->getState() == vbucket_state_replica) {
-            if (end_seqno_ > endSeqno) {
-                /* We possibly have items in the open checkpoint
-                   (incomplete snapshot) */
-                LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Merging backfill and "
-                    "memory snapshot for a replica vbucket, start seqno "
-                    "%" PRIu64 " and end seqno %" PRIu64, producer->logHeader(),
-                    vb_, startSeqno, endSeqno);
-                endSeqno = end_seqno_;
-            }
+    // An atomic read of vbucket state without acquiring the
+    // reader lock for state should suffice here.
+    if (vb && vb->getState() == vbucket_state_replica) {
+        if (end_seqno_ > endSeqno) {
+            /* We possibly have items in the open checkpoint
+               (incomplete snapshot) */
+            LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Merging backfill"
+                " and memory snapshot for a replica vbucket, start seqno "
+                "%" PRIu64 " and end seqno %" PRIu64 "",
+                producer->logHeader(), vb_, startSeqno, endSeqno);
+            endSeqno = end_seqno_;
         }
     }
 
@@ -447,22 +446,24 @@ void ActiveStream::setVBucketStateAckRecieved() {
             LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") Receive ack for set "
                 "vbucket state to pending message", producer->logHeader(), vb_);
 
-            RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+            takeoverState = vbucket_state_active;
+            transitionState(STREAM_TAKEOVER_SEND);
+            lh.unlock();
+
             engine->getEpStore()->setVBucketState(vb_, vbucket_state_dead,
                                                   false, false);
+            RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
             LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Vbucket marked as "
                 "dead, last sent seqno: %" PRIu64 ", high seqno: %" PRIu64 "",
                 producer->logHeader(), vb_, lastSentSeqno,
                 vbucket->getHighSeqno());
-            takeoverState = vbucket_state_active;
-            transitionState(STREAM_TAKEOVER_SEND);
         } else {
             LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") Receive ack for set "
                 "vbucket state to active message", producer->logHeader(), vb_);
             endStream(END_STREAM_OK);
+            lh.unlock();
         }
 
-        lh.unlock();
         bool inverse = false;
         if (itemsReady.compare_exchange_strong(inverse, true)) {
             producer->notifyStreamReady(vb_, true);
@@ -472,6 +473,7 @@ void ActiveStream::setVBucketStateAckRecieved() {
             "op on stream '%s' state '%s'", producer->logHeader(), vb_,
             name_.c_str(), stateName(state_));
     }
+
 }
 
 DcpResponse* ActiveStream::backfillPhase() {
@@ -1006,12 +1008,11 @@ const char* ActiveStream::logHeader()
 bool ActiveStream::isCurrentSnapshotCompleted() const
 {
     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
-    if (vbucket) {
-        ReaderLockHolder rlh(vbucket->getStateLock());
-        if (vbucket_state_replica == vbucket->getState()) {
-            if (lastSentSnapEndSeqno >= lastReadSeqno) {
-                return false;
-            }
+    // An atomic read of vbucket state without acquiring the
+    // reader lock for state should suffice here.
+    if (vbucket && vbucket->getState() == vbucket_state_replica) {
+        if (lastSentSnapEndSeqno >= lastReadSeqno) {
+            return false;
         }
     }
     return true;