MB-16500 [BP]: Removing unnecessary locking in consumer code 80/56080/3
authorabhinavdangeti <abhinav@couchbase.com>
Thu, 13 Aug 2015 18:46:35 +0000 (11:46 -0700)
committerabhinav dangeti <abhinav@couchbase.com>
Wed, 14 Oct 2015 18:26:38 +0000 (18:26 +0000)
streamMutex is to protect the ready list, but not the streams list.

The front end operations: addStream, closeStream, handleResponse, step
- wouldn't race with each other over the streams list, as multiple
memcached threads will not serve a single cookie.

The back end operations: processBufferedMessages (doesn't grab lock any
way), doRollback just read from streams list.

An addstream (front end op) is the only one that updates streams, and
this wouldn't update when a rollback is in progress.

Therefore, renaming the streamMutex lock in DCPConsumer to readyMutex
which is more apt for its operation - guarding the ready list.

Change-Id: Ia342d7243fef4b97b729aa94fdc64ad020711589
Reviewed-on: http://review.couchbase.org/54406
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Manu Dhundi <manu@couchbase.com>
Reviewed-on: http://review.couchbase.org/56080
Reviewed-by: Dave Rigby <daver@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Tested-by: abhinav dangeti <abhinav@couchbase.com>
src/dcp-consumer.cc
src/dcp-consumer.h

index bbc914d..34d75c6 100644 (file)
@@ -97,7 +97,7 @@ DcpConsumer::~DcpConsumer() {
 
 ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
                                          uint32_t flags) {
-    LockHolder lh(streamMutex);
+    LockHolder lh(readyMutex);
     if (doDisconnect()) {
         return ENGINE_DISCONNECT;
     }
@@ -144,7 +144,6 @@ ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
 }
 
 ENGINE_ERROR_CODE DcpConsumer::closeStream(uint32_t opaque, uint16_t vbucket) {
-    LockHolder lh(streamMutex);
     if (doDisconnect()) {
         return ENGINE_DISCONNECT;
     }
@@ -536,7 +535,6 @@ bool DcpConsumer::doRollback(uint32_t opaque, uint16_t vbid,
 
     cb_assert(err == ENGINE_SUCCESS);
 
-    LockHolder lh(streamMutex);
     RCPtr<VBucket> vb = engine_.getVBucket(vbid);
     streams[vbid]->reconnectStream(vb, opaque, vb->getHighSeqno());
 
@@ -605,7 +603,7 @@ process_items_error_t DcpConsumer::processBufferedItems() {
 }
 
 DcpResponse* DcpConsumer::getNextItem() {
-    LockHolder lh(streamMutex);
+    LockHolder lh(readyMutex);
 
     setPaused(false);
     while (!ready.empty()) {
@@ -655,7 +653,6 @@ void DcpConsumer::notifyStreamReady(uint16_t vbucket) {
 
 void DcpConsumer::streamAccepted(uint32_t opaque, uint16_t status, uint8_t* body,
                                  uint32_t bodylen) {
-    LockHolder lh(streamMutex);
 
     opaque_map::iterator oitr = opaqueMap_.find(opaque);
     if (oitr != opaqueMap_.end()) {
@@ -689,7 +686,6 @@ void DcpConsumer::streamAccepted(uint32_t opaque, uint16_t status, uint8_t* body
 }
 
 bool DcpConsumer::isValidOpaque(uint32_t opaque, uint16_t vbucket) {
-    LockHolder lh(streamMutex);
     passive_stream_t stream = streams[vbucket];
     return stream && stream->getOpaque() == opaque;
 }
index f94940e..fd8a136 100644 (file)
@@ -118,10 +118,13 @@ private:
     uint64_t opaqueCounter;
     size_t processTaskId;
     AtomicValue<bool> itemsToProcess;
-    Mutex streamMutex;
+
+    Mutex readyMutex;
     std::list<uint16_t> ready;
+
     passive_stream_t* streams;
     opaque_map opaqueMap_;
+
     rel_time_t lastNoopTime;
     uint32_t backoffs;
     uint32_t noopInterval;