}
void DcpConnMap::disconnect(const void *cookie) {
- LockHolder lh(connsLock);
- disconnect_UNLOCKED(cookie);
-}
-
-void DcpConnMap::disconnect_UNLOCKED(const void *cookie) {
- std::list<connection_t>::iterator iter;
- for (iter = all.begin(); iter != all.end(); ++iter) {
- if ((*iter)->getCookie() == cookie) {
- (*iter)->setDisconnect(true);
- all.erase(iter);
- break;
+ // Move the connection matching this cookie from the `all` and map_
+ // data structures (under connsLock).
+ connection_t conn;
+ {
+ LockHolder lh(connsLock);
+ std::list<connection_t>::iterator iter;
+ for (iter = all.begin(); iter != all.end(); ++iter) {
+ if ((*iter)->getCookie() == cookie) {
+ (*iter)->setDisconnect(true);
+ all.erase(iter);
+ break;
+ }
}
- }
-
- std::map<const void*, connection_t>::iterator itr(map_.find(cookie));
- if (itr != map_.end()) {
- connection_t conn = itr->second;
- if (conn.get()) {
- LOG(EXTENSION_LOG_INFO, "%s Removing connection",
- conn->logHeader());
- map_.erase(itr);
+ std::map<const void*, connection_t>::iterator itr(map_.find(cookie));
+ if (itr != map_.end()) {
+ conn = itr->second;
+ if (conn.get()) {
+ LOG(EXTENSION_LOG_INFO, "%s Removing connection",
+ conn->logHeader());
+ map_.erase(itr);
+ }
}
+ }
+ // Note we shutdown the stream *not* under the connsLock; this is
+ // because as part of closing a DcpConsumer stream we need to
+ // acquire PassiveStream::buffer.bufMutex; and that could deadlock
+ // in EventuallyPersistentStore::setVBucketState, via
+ // PassiveStream::processBufferedMessages.
+ if (conn) {
DcpProducer* producer = dynamic_cast<DcpProducer*> (conn.get());
if (producer) {
producer->closeAllStreams();
} else {
static_cast<DcpConsumer*>(conn.get())->closeAllStreams();
}
+ }
+ // Finished disconnecting the stream; add it to the
+ // deadConnections list.
+ if (conn) {
+ LockHolder lh(connsLock);
deadConnections.push_back(conn);
}
}