Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / connmap.cc
index b41591e..0555933 100644 (file)
@@ -1114,29 +1114,36 @@ void DcpConnMap::closeAllStreams_UNLOCKED() {
 }
 
 void DcpConnMap::disconnect(const void *cookie) {
-    LockHolder lh(connsLock);
-    disconnect_UNLOCKED(cookie);
-}
-
-void DcpConnMap::disconnect_UNLOCKED(const void *cookie) {
-    std::list<connection_t>::iterator iter;
-    for (iter = all.begin(); iter != all.end(); ++iter) {
-        if ((*iter)->getCookie() == cookie) {
-            (*iter)->setDisconnect(true);
-            all.erase(iter);
-            break;
+    // Move the connection matching this cookie from the `all` and map_
+    // data structures (under connsLock).
+    connection_t conn;
+    {
+        LockHolder lh(connsLock);
+        std::list<connection_t>::iterator iter;
+        for (iter = all.begin(); iter != all.end(); ++iter) {
+            if ((*iter)->getCookie() == cookie) {
+                (*iter)->setDisconnect(true);
+                all.erase(iter);
+                break;
+            }
         }
-    }
-
-    std::map<const void*, connection_t>::iterator itr(map_.find(cookie));
-    if (itr != map_.end()) {
-        connection_t conn = itr->second;
-        if (conn.get()) {
-            LOG(EXTENSION_LOG_INFO, "%s Removing connection",
-                conn->logHeader());
-            map_.erase(itr);
+        std::map<const void*, connection_t>::iterator itr(map_.find(cookie));
+        if (itr != map_.end()) {
+            conn = itr->second;
+            if (conn.get()) {
+                LOG(EXTENSION_LOG_INFO, "%s Removing connection",
+                    conn->logHeader());
+                map_.erase(itr);
+            }
         }
+    }
 
+    // Note we shutdown the stream *not* under the connsLock; this is
+    // because as part of closing a DcpConsumer stream we need to
+    // acquire PassiveStream::buffer.bufMutex; and that could deadlock
+    // in EventuallyPersistentStore::setVBucketState, via
+    // PassiveStream::processBufferedMessages.
+    if (conn) {
         DcpProducer* producer = dynamic_cast<DcpProducer*> (conn.get());
         if (producer) {
             producer->closeAllStreams();
@@ -1144,7 +1151,12 @@ void DcpConnMap::disconnect_UNLOCKED(const void *cookie) {
         } else {
             static_cast<DcpConsumer*>(conn.get())->closeAllStreams();
         }
+    }
 
+    // Finished disconnecting the stream; add it to the
+    // deadConnections list.
+    if (conn) {
+        LockHolder lh(connsLock);
         deadConnections.push_back(conn);
     }
 }