MB-16632: Reducing locking contention in DCP-Producer/Stream 00/56300/15
authorabhinavdangeti <abhinav@couchbase.com>
Thu, 22 Oct 2015 22:36:12 +0000 (15:36 -0700)
committerChiyoung Seo <chiyoung@couchbase.com>
Wed, 25 Nov 2015 19:12:51 +0000 (19:12 +0000)
- Adding a new RWLock for streams in Producer and avoid queueLock
- Improving BufferLog and remove need for queueLock on access
- Adding an array of atomic bool for lockless vbucket ready notification
- Changing some ActiveStream variables to be atomic to allow for lockless
  updates.

Change-Id: I11c54f1058c4c8a3f013dfc858a39d17362c9531
Reviewed-on: http://review.couchbase.org/56300
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/dcp-producer.cc
src/dcp-producer.h
src/dcp-stream.cc
src/dcp-stream.h
src/tapconnection.h

index b165bbd..4f00ace 100644 (file)
 
 const uint32_t DcpProducer::defaultNoopInerval = 20;
 
-void BufferLog::insert(DcpResponse* response) {
-    cb_assert(!isFull());
-    bytes_sent += response->getMessageSize();
+DcpProducer::BufferLog::State DcpProducer::BufferLog::getState_UNLOCKED() {
+    if (isEnabled_UNLOCKED()) {
+        if (isFull_UNLOCKED()) {
+            return Full;
+        } else {
+            return SpaceAvailable;
+        }
+    }
+    return Disabled;
+}
+
+void DcpProducer::BufferLog::setBufferSize(size_t maxBytes) {
+    WriterLockHolder lh(logLock);
+    this->maxBytes = maxBytes;
+    if (maxBytes == 0) {
+        bytesSent = 0;
+        ackedBytes = 0;
+    }
+}
+
+bool DcpProducer::BufferLog::insert(size_t bytes) {
+    WriterLockHolder wlh(logLock);
+    bool inserted = false;
+    // If the log is not enabled
+    // or there is space, allow the insert
+    if (!isEnabled_UNLOCKED() || !isFull_UNLOCKED()) {
+        bytesSent += bytesSent;
+        inserted = true;
+    }
+    return inserted;
+}
+
+void DcpProducer::BufferLog::release_UNLOCKED(size_t bytes) {
+    if (bytesSent >= bytes) {
+        bytesSent -= bytes;
+    } else {
+        bytesSent = 0;
+    }
+}
+
+bool DcpProducer::BufferLog::pauseIfFull() {
+    ReaderLockHolder rlh(logLock);
+    if (getState_UNLOCKED() == Full) {
+        producer.setPaused(true);
+        return true;
+    }
+    return false;
+}
+
+void DcpProducer::BufferLog::unpauseIfSpace() {
+    ReaderLockHolder rlh(logLock);
+    if (getState_UNLOCKED() != Full) {
+        producer.notifyPaused(true);
+    }
+}
+
+void DcpProducer::BufferLog::acknowledge(size_t bytes) {
+    WriterLockHolder wlh(logLock);
+    State state = getState_UNLOCKED();
+    if (state != Disabled) {
+        release_UNLOCKED(bytes);
+        ackedBytes += bytes;
+        if (state == Full) {
+            producer.notifyPaused(true);
+        }
+    }
 }
 
-void BufferLog::free(uint32_t bytes_to_free) {
-    if (bytes_sent >= bytes_to_free) {
-        bytes_sent -= bytes_to_free;
+void DcpProducer::BufferLog::addStats(ADD_STAT add_stat, const void *c) {
+    ReaderLockHolder rlh(logLock);
+    if (isEnabled_UNLOCKED()) {
+        producer.addStat("max_buffer_bytes", maxBytes, add_stat, c);
+        producer.addStat("unacked_bytes", bytesSent, add_stat, c);
+        producer.addStat("total_acked_bytes", ackedBytes, add_stat, c);
+        producer.addStat("flow_control", "enabled", add_stat, c);
     } else {
-        bytes_sent = 0;
+        producer.addStat("flow_control", "disabled", add_stat, c);
     }
 }
 
 DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
                          const std::string &name, bool isNotifier)
     : Producer(e, cookie, name), rejectResp(NULL),
-      notifyOnly(isNotifier), lastSendTime(ep_current_time()), log(NULL),
-      itemsSent(0), totalBytesSent(0), ackedBytes(0) {
+      notifyOnly(isNotifier), lastSendTime(ep_current_time()), log(*this),
+      vbReady(e.getConfiguration().getMaxVbuckets()),
+      itemsSent(0), totalBytesSent(0), roundRobinVbReady(0) {
     setSupportAck(true);
     setReserved(true);
     setPaused(true);
@@ -80,12 +148,6 @@ DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
     noopCtx.enabled = false;
 }
 
-DcpProducer::~DcpProducer() {
-    if (log) {
-        delete log;
-    }
-}
-
 ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
                                              uint32_t opaque,
                                              uint16_t vbucket,
@@ -100,7 +162,6 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
         return ENGINE_DISCONNECT;
     }
 
-    LockHolder lh(queueLock);
     RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
     if (!vb) {
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
@@ -140,17 +201,20 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
 
     bool add_vb_conn_map = true;
     std::map<uint16_t, stream_t>::iterator itr;
-    if ((itr = streams.find(vbucket)) != streams.end()) {
-        if (itr->second->getState() != STREAM_DEAD) {
-            LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed"
-                " because a stream already exists for this vbucket",
-                logHeader(), vbucket);
-            return ENGINE_KEY_EEXISTS;
-        } else {
-            streams.erase(vbucket);
-            ready.remove(vbucket);
-            // Don't need to add an entry to vbucket-to-conns map
-            add_vb_conn_map = false;
+    {
+        WriterLockHolder wlh(streamsMutex);
+        if ((itr = streams.find(vbucket)) != streams.end()) {
+            if (itr->second->getState() != STREAM_DEAD) {
+                LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed"
+                    " because a stream already exists for this vbucket",
+                    logHeader(), vbucket);
+                return ENGINE_KEY_EEXISTS;
+            } else {
+                streams.erase(vbucket);
+                vbReady[vbucket].store(false);
+                // Don't need to add an entry to vbucket-to-conns map
+                add_vb_conn_map = false;
+            }
         }
     }
 
@@ -183,20 +247,21 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
     }
 
     if (notifyOnly) {
-        streams[vbucket] = new NotifierStream(&engine_, this, getName(), flags,
+        WriterLockHolder wlh(streamsMutex);
+        streams[vbucket] = new NotifierStream(&engine_, *this, getName(), flags,
                                               opaque, vbucket, notifySeqno,
                                               end_seqno, vbucket_uuid,
                                               snap_start_seqno, snap_end_seqno);
     } else {
-        streams[vbucket] = new ActiveStream(&engine_, this, getName(), flags,
+        WriterLockHolder wlh(streamsMutex);
+        streams[vbucket] = new ActiveStream(&engine_, *this, getName(), flags,
                                             opaque, vbucket, start_seqno,
                                             end_seqno, vbucket_uuid,
                                             snap_start_seqno, snap_end_seqno);
         static_cast<ActiveStream*>(streams[vbucket].get())->setActive();
     }
+    vbReady[vbucket].store(true);
 
-    ready.push_back(vbucket);
-    lh.unlock();
     if (add_vb_conn_map) {
         connection_t conn(this);
         engine_.getDcpConnMap().addVBConnByVBId(conn, vbucket);
@@ -326,26 +391,13 @@ ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
 ENGINE_ERROR_CODE DcpProducer::bufferAcknowledgement(uint32_t opaque,
                                                      uint16_t vbucket,
                                                      uint32_t buffer_bytes) {
-    LockHolder lh(queueLock);
-    if (log) {
-        bool wasFull = log->isFull();
-
-        ackedBytes.fetch_add(buffer_bytes);
-        log->free(buffer_bytes);
-        lh.unlock();
-
-        if (wasFull) {
-            engine_.getDcpConnMap().notifyPausedConnection(this, true);
-        }
-    }
-
+    log.acknowledge(buffer_bytes);
     return ENGINE_SUCCESS;
 }
 
 ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
                                        uint16_t nkey, const void* value,
                                        uint32_t nvalue) {
-    LockHolder lh(queueLock);
     const char* param = static_cast<const char*>(key);
     std::string keyStr(static_cast<const char*>(key), nkey);
     std::string valueStr(static_cast<const char*>(value), nvalue);
@@ -355,16 +407,7 @@ ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
         if (parseUint32(valueStr.c_str(), &size)) {
             /* Size 0 implies the client (DCP consumer) does not support
                flow control */
-            if (!log && size) {
-                log = new BufferLog(size);
-            } else if (log && log->getBufferSize() != size) {
-                if (size) {
-                    log->setBufferSize(size);
-                } else {
-                    delete log;
-                    log = NULL;
-                }
-            }
+            log.setBufferSize(size);
             return ENGINE_SUCCESS;
         }
     } else if (strncmp(param, "stream_buffer_size", nkey) == 0) {
@@ -403,22 +446,23 @@ ENGINE_ERROR_CODE DcpProducer::handleResponse(
             reinterpret_cast<protocol_binary_response_dcp_stream_req*>(resp);
         uint32_t opaque = pkt->message.header.response.opaque;
 
-        LockHolder lh(queueLock);
         stream_t active_stream;
         std::map<uint16_t, stream_t>::iterator itr;
-        for (itr = streams.begin() ; itr != streams.end(); ++itr) {
-            active_stream = itr->second;
-            Stream *str = active_stream.get();
-            if (str && str->getType() == STREAM_ACTIVE) {
-                ActiveStream* as = static_cast<ActiveStream*>(str);
-                if (as && opaque == itr->second->getOpaque()) {
-                    break;
+        {
+            ReaderLockHolder rlh(streamsMutex);
+            for (itr = streams.begin() ; itr != streams.end(); ++itr) {
+                active_stream = itr->second;
+                Stream *str = active_stream.get();
+                if (str && str->getType() == STREAM_ACTIVE) {
+                    ActiveStream* as = static_cast<ActiveStream*>(str);
+                    if (as && opaque == itr->second->getOpaque()) {
+                        break;
+                    }
                 }
             }
         }
 
         if (itr != streams.end()) {
-            lh.unlock();
             ActiveStream *as = static_cast<ActiveStream*>(active_stream.get());
             if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE) {
                 as->setVBucketStateAckRecieved();
@@ -452,55 +496,47 @@ ENGINE_ERROR_CODE DcpProducer::closeStream(uint32_t opaque, uint16_t vbucket) {
         return ENGINE_DISCONNECT;
     }
 
-    LockHolder lh(queueLock);
-    std::map<uint16_t, stream_t>::iterator itr;
-    if ((itr = streams.find(vbucket)) == streams.end()) {
+    stream_t stream = findStreamByVbid(vbucket);
+    ENGINE_ERROR_CODE ret;
+    if (!stream) {
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because no "
             "stream exists for this vbucket", logHeader(), vbucket);
         return ENGINE_KEY_ENOENT;
-    } else if (!itr->second->isActive()) {
+    } else if (!stream->isActive()) {
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because "
             "stream is already marked as dead", logHeader(), vbucket);
-        streams.erase(vbucket);
-        ready.remove(vbucket);
-        lh.unlock();
         connection_t conn(this);
         engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
-        return ENGINE_KEY_ENOENT;
+        ret = ENGINE_KEY_ENOENT;
+    } else {
+        stream->setDead(END_STREAM_CLOSED);
+        connection_t conn(this);
+        engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
+        ret = ENGINE_SUCCESS;
     }
 
-    stream_t stream = itr->second;
-    streams.erase(vbucket);
-    ready.remove(vbucket);
-    lh.unlock();
+    {
+        WriterLockHolder wlh(streamsMutex);
+        streams.erase(vbucket);
+        vbReady[vbucket].store(false);
+    }
 
-    stream->setDead(END_STREAM_CLOSED);
-    connection_t conn(this);
-    engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
-    return ENGINE_SUCCESS;
+    return ret;
 }
 
 void DcpProducer::addStats(ADD_STAT add_stat, const void *c) {
     Producer::addStats(add_stat, c);
 
-    LockHolder lh(queueLock);
-
     addStat("items_sent", getItemsSent(), add_stat, c);
-    addStat("items_remaining", getItemsRemaining_UNLOCKED(), add_stat, c);
+    addStat("items_remaining", getItemsRemaining(), add_stat, c);
     addStat("total_bytes_sent", getTotalBytes(), add_stat, c);
     addStat("last_sent_time", lastSendTime, add_stat, c);
     addStat("noop_enabled", noopCtx.enabled, add_stat, c);
     addStat("noop_wait", noopCtx.pendingRecv, add_stat, c);
 
-    if (log) {
-        addStat("max_buffer_bytes", log->getBufferSize(), add_stat, c);
-        addStat("unacked_bytes", log->getBytesSent(), add_stat, c);
-        addStat("total_acked_bytes", ackedBytes, add_stat, c);
-        addStat("flow_control", "enabled", add_stat, c);
-    } else {
-        addStat("flow_control", "disabled", add_stat, c);
-    }
+    log.addStats(add_stat, c);
 
+    ReaderLockHolder rlh(streamsMutex);
     std::map<uint16_t, stream_t>::iterator itr;
     for (itr = streams.begin(); itr != streams.end(); ++itr) {
         itr->second->addStats(add_stat, c);
@@ -509,21 +545,16 @@ void DcpProducer::addStats(ADD_STAT add_stat, const void *c) {
 
 void DcpProducer::addTakeoverStats(ADD_STAT add_stat, const void* c,
                                    uint16_t vbid) {
-    LockHolder lh(queueLock);
-    std::map<uint16_t, stream_t>::iterator itr = streams.find(vbid);
-    if (itr != streams.end()) {
-        Stream *s = itr->second.get();
-        if (s && s->getType() == STREAM_ACTIVE) {
-            ActiveStream* as = static_cast<ActiveStream*>(s);
-            if (as) {
-                as->addTakeoverStats(add_stat, c);
-            }
+    stream_t stream = findStreamByVbid(vbid);
+    if (stream && stream->getType() == STREAM_ACTIVE) {
+        ActiveStream* as = static_cast<ActiveStream*>(stream.get());
+        if (as) {
+            as->addTakeoverStats(add_stat, c);
         }
     }
 }
 
 void DcpProducer::aggregateQueueStats(ConnCounter* aggregator) {
-    LockHolder lh(queueLock);
     if (!aggregator) {
         LOG(EXTENSION_LOG_WARNING, "%s Pointer to the queue stats aggregator"
             " is NULL!!!", logHeader());
@@ -531,42 +562,37 @@ void DcpProducer::aggregateQueueStats(ConnCounter* aggregator) {
     }
     aggregator->conn_queueDrain += itemsSent;
     aggregator->conn_totalBytes += totalBytesSent;
-    aggregator->conn_queueRemaining += getItemsRemaining_UNLOCKED();
+    aggregator->conn_queueRemaining += getItemsRemaining();
     aggregator->conn_queueBackfillRemaining += totalBackfillBacklogs;
 }
 
 void DcpProducer::notifySeqnoAvailable(uint16_t vbucket, uint64_t seqno) {
-    LockHolder lh(queueLock);
-    std::map<uint16_t, stream_t>::iterator itr = streams.find(vbucket);
-    if (itr != streams.end() && itr->second->isActive()) {
-        stream_t stream = itr->second;
-        lh.unlock();
+    stream_t stream = findStreamByVbid(vbucket);
+    if (stream && stream->isActive()) {
         stream->notifySeqnoAvailable(seqno);
     }
 }
 
 void DcpProducer::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
-    LockHolder lh(queueLock);
-    std::map<uint16_t, stream_t>::iterator itr = streams.find(vbucket);
-    if (itr != streams.end()) {
-        stream_t stream = itr->second;
-        lh.unlock();
+    stream_t stream = findStreamByVbid(vbucket);
+    if (stream) {
         stream->setDead(END_STREAM_STATE);
     }
 }
 
 void DcpProducer::closeAllStreams() {
-    LockHolder lh(queueLock);
     std::list<uint16_t> vblist;
-    while (!streams.empty()) {
-        std::map<uint16_t, stream_t>::iterator itr = streams.begin();
-        uint16_t vbid = itr->first;
-        itr->second->setDead(END_STREAM_DISCONNECTED);
-        streams.erase(vbid);
-        ready.remove(vbid);
-        vblist.push_back(vbid);
+    {
+        WriterLockHolder wlh(streamsMutex);
+        while (!streams.empty()) {
+            std::map<uint16_t, stream_t>::iterator itr = streams.begin();
+            uint16_t vbid = itr->first;
+            itr->second->setDead(END_STREAM_DISCONNECTED);
+            streams.erase(vbid);
+            vbReady[vbid].store(false);
+            vblist.push_back(vbid);
+        }
     }
-    lh.unlock();
 
     connection_t conn(this);
     std::list<uint16_t>::iterator it = vblist.begin();
@@ -584,53 +610,67 @@ const char* DcpProducer::getType() const {
 }
 
 DcpResponse* DcpProducer::getNextItem() {
-    LockHolder lh(queueLock);
-
     setPaused(false);
-    while (!ready.empty()) {
-        if (log && log->isFull()) {
-            setPaused(true);
+    if (roundRobinVbReady >= vbReady.size()) {
+        roundRobinVbReady = 0;
+    }
+    for (; roundRobinVbReady < vbReady.size(); roundRobinVbReady++) {
+
+        if (log.pauseIfFull()) {
             return NULL;
         }
 
-        uint16_t vbucket = ready.front();
-        ready.pop_front();
+        bool expected = true;
+        if (vbReady[roundRobinVbReady].compare_exchange_strong(expected, false)) {
+            uint16_t vbucket = roundRobinVbReady;
+            DcpResponse *op = NULL;
+            std::map<uint16_t, stream_t>::iterator it;
+            stream_t stream;
+            {
+                ReaderLockHolder rlh(streamsMutex);
+                it = streams.find(vbucket);
+                if (it == streams.end()) {
+                    continue;
+                }
+                stream.reset(it->second);
+            }
 
-        if (streams.find(vbucket) == streams.end()) {
-            continue;
-        }
-        DcpResponse* op = streams[vbucket]->next();
-        if (!op) {
-            continue;
-        }
+            // Return the next operation
+            // When an op is returned it is assumed
+            // our bufferLog has been updated.
+            op = stream->next();
 
-        switch (op->getEvent()) {
-            case DCP_SNAPSHOT_MARKER:
-            case DCP_MUTATION:
-            case DCP_DELETION:
-            case DCP_EXPIRATION:
-            case DCP_STREAM_END:
-            case DCP_SET_VBUCKET:
-                break;
-            default:
-                LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to write"
-                    " an unexpected event %d", logHeader(), op->getEvent());
-                abort();
-        }
+            if (!op) {
+                continue;
+            }
 
-        if (log) {
-            log->insert(op);
-        }
-        ready.push_back(vbucket);
+            switch (op->getEvent()) {
+                case DCP_SNAPSHOT_MARKER:
+                case DCP_MUTATION:
+                case DCP_DELETION:
+                case DCP_EXPIRATION:
+                case DCP_STREAM_END:
+                case DCP_SET_VBUCKET:
+                    break;
+                default:
+                    LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to "
+                        "write an unexpected event %d",
+                        logHeader(), op->getEvent());
+                    abort();
+            }
 
-        if (op->getEvent() == DCP_MUTATION || op->getEvent() == DCP_DELETION ||
-            op->getEvent() == DCP_EXPIRATION) {
-            itemsSent++;
-        }
+            vbReady[vbucket].store(true);
 
-        totalBytesSent = totalBytesSent + op->getMessageSize();
+            if (op->getEvent() == DCP_MUTATION ||
+                op->getEvent() == DCP_DELETION ||
+                op->getEvent() == DCP_EXPIRATION) {
+               itemsSent++;
+            }
+
+            totalBytesSent.fetch_add(op->getMessageSize());
 
-        return op;
+            return op;
+        }
     }
 
     setPaused(true);
@@ -641,7 +681,7 @@ void DcpProducer::setDisconnect(bool disconnect) {
     ConnHandler::setDisconnect(disconnect);
 
     if (disconnect) {
-        LockHolder lh(queueLock);
+        ReaderLockHolder rlh(streamsMutex);
         std::map<uint16_t, stream_t>::iterator itr = streams.begin();
         for (; itr != streams.end(); ++itr) {
             itr->second->setDead(END_STREAM_DISCONNECTED);
@@ -650,20 +690,14 @@ void DcpProducer::setDisconnect(bool disconnect) {
 }
 
 void DcpProducer::notifyStreamReady(uint16_t vbucket, bool schedule) {
-    LockHolder lh(queueLock);
-
-    std::list<uint16_t>::iterator iter =
-        std::find(ready.begin(), ready.end(), vbucket);
-    if (iter != ready.end()) {
-        return;
+    bool expected = false;
+    if (vbReady[vbucket].compare_exchange_strong(expected, true)) {
+        log.unpauseIfSpace();
     }
+}
 
-    ready.push_back(vbucket);
-    lh.unlock();
-
-    if (!log || (log && !log->isFull())) {
-        engine_.getDcpConnMap().notifyPausedConnection(this, schedule);
-    }
+void DcpProducer::notifyPaused(bool schedule) {
+    engine_.getDcpConnMap().notifyPausedConnection(this, schedule);
 }
 
 ENGINE_ERROR_CODE DcpProducer::maybeSendNoop(struct dcp_message_producers* producers) {
@@ -701,7 +735,7 @@ void DcpProducer::setTimeForNoop() {
 }
 
 void DcpProducer::clearQueues() {
-    LockHolder lh(queueLock);
+    WriterLockHolder wlh(streamsMutex);
     std::map<uint16_t, stream_t>::iterator itr = streams.begin();
     for (; itr != streams.end(); ++itr) {
         itr->second->clear();
@@ -721,9 +755,9 @@ size_t DcpProducer::getItemsSent() {
     return itemsSent;
 }
 
-size_t DcpProducer::getItemsRemaining_UNLOCKED() {
+size_t DcpProducer::getItemsRemaining() {
     size_t remainingSize = 0;
-
+    ReaderLockHolder rlh(streamsMutex);
     std::map<uint16_t, stream_t>::iterator itr = streams.begin();
     for (; itr != streams.end(); ++itr) {
         Stream *s = (itr->second).get();
@@ -741,8 +775,18 @@ size_t DcpProducer::getTotalBytes() {
     return totalBytesSent;
 }
 
+stream_t DcpProducer::findStreamByVbid(uint16_t vbid) {
+    ReaderLockHolder rlh(streamsMutex);
+    stream_t stream;
+    std::map<uint16_t, stream_t>::iterator itr = streams.find(vbid);
+    if (itr != streams.end()) {
+        stream = itr->second;
+    }
+    return stream;
+}
+
 std::list<uint16_t> DcpProducer::getVBList() {
-    LockHolder lh(queueLock);
+    ReaderLockHolder rlh(streamsMutex);
     std::list<uint16_t> vblist;
     std::map<uint16_t, stream_t>::iterator itr = streams.begin();
     for (; itr != streams.end(); ++itr) {
@@ -758,3 +802,7 @@ bool DcpProducer::windowIsFull() {
 void DcpProducer::flush() {
     abort(); // Not Implemented
 }
+
+bool DcpProducer::bufferLogInsert(size_t bytes) {
+    return log.insert(bytes);
+}
index 8bff028..1468dfd 100644 (file)
 
 class DcpResponse;
 
-class BufferLog {
-public:
-    BufferLog(uint32_t bytes)
-        : max_bytes(bytes), bytes_sent(0) {}
-
-    ~BufferLog() {}
-
-    uint32_t getBufferSize() {
-        return max_bytes;
-    }
-
-    void setBufferSize(uint32_t maxBytes) {
-        max_bytes = maxBytes;
-    }
-
-    uint32_t getBytesSent() {
-        return bytes_sent;
-    }
-
-    bool isFull() {
-        return max_bytes <= bytes_sent;
-    }
-
-    void insert(DcpResponse* response);
-
-    void free(uint32_t bytes_to_free);
-
-private:
-    uint32_t max_bytes;
-    uint32_t bytes_sent;
-};
-
 class DcpProducer : public Producer {
 public:
 
     DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
                 const std::string &n, bool notifyOnly);
 
-    ~DcpProducer();
-
     ENGINE_ERROR_CODE streamRequest(uint32_t flags, uint32_t opaque,
                                     uint16_t vbucket, uint64_t start_seqno,
                                     uint64_t end_seqno, uint64_t vbucket_uuid,
@@ -132,11 +98,94 @@ public:
 
     void notifyStreamReady(uint16_t vbucket, bool schedule);
 
+    void notifyPaused(bool schedule);
+
+    class BufferLog {
+    public:
+
+        /*
+            BufferLog has 3 states.
+            Disabled - Flow-control is not in-use.
+             This is indicated by setting the size to 0 (i.e. setBufferSize(0)).
+
+            SpaceAvailable - There is *some* space available. You can always
+             insert n-bytes even if there's n-1 bytes spare.
+
+            Full - inserts have taken the number of bytes available equal or
+             over the buffer size.
+        */
+        enum State {
+            Disabled,
+            Full,
+            SpaceAvailable
+        };
+
+        BufferLog(DcpProducer& p)
+            : producer(p), maxBytes(0), bytesSent(0), ackedBytes(0) {}
+
+        void setBufferSize(size_t maxBytes);
+
+        void addStats(ADD_STAT add_stat, const void *c);
+
+        /*
+            Return false if the log is full.
+
+            Returns true if the bytes fit or if the buffer log is disabled.
+              The tracked bytes is increased.
+        */
+        bool insert(size_t bytes);
+
+        /*
+            Acknowledge the bytes and unpause the producer if full.
+              The tracked bytes is decreased.
+        */
+        void acknowledge(size_t bytes);
+
+        /*
+            Pause the producer if full.
+        */
+        bool pauseIfFull();
+
+        /*
+            Unpause the producer if there's space (or disabled).
+        */
+        void unpauseIfSpace();
+
+    private:
+
+        bool isEnabled_UNLOCKED() {
+            return maxBytes != 0;
+        }
+
+        bool isFull_UNLOCKED() {
+            return bytesSent >= maxBytes;
+        }
+
+        void release_UNLOCKED(size_t bytes);
+
+        State getState_UNLOCKED();
+
+        RWLock logLock;
+        DcpProducer& producer;
+        size_t maxBytes;
+        size_t bytesSent;
+        size_t ackedBytes;
+    };
+
+    /*
+        Insert bytes into this producer's buffer log.
+
+        If the log is disabled or the insert was successful returns true.
+        Else return false.
+    */
+    bool bufferLogInsert(size_t bytes);
+
 private:
 
     DcpResponse* getNextItem();
 
-    size_t getItemsRemaining_UNLOCKED();
+    size_t getItemsRemaining();
+    stream_t findStreamByVbid(uint16_t vbid);
 
     ENGINE_ERROR_CODE maybeSendNoop(struct dcp_message_producers* producers);
 
@@ -152,13 +201,21 @@ private:
 
     bool notifyOnly;
     rel_time_t lastSendTime;
-    BufferLog* log;
-    std::list<uint16_t> ready;
+    BufferLog log;
+
+    // Guards all accesses to streams map. If only reading elements in streams
+    // (i.e. not adding / removing elements) then can acquire ReadLock, even
+    // if a non-const method is called on stream_t.
+    RWLock streamsMutex;
+
+    std::vector<AtomicValue<bool> > vbReady;
+
     std::map<uint16_t, stream_t> streams;
+
     AtomicValue<size_t> itemsSent;
     AtomicValue<size_t> totalBytesSent;
-    AtomicValue<size_t> ackedBytes;
 
+    size_t roundRobinVbReady;
     static const uint32_t defaultNoopInerval;
 };
 
index d06102c..16bb7ce 100644 (file)
@@ -264,7 +264,7 @@ void Stream::addStats(ADD_STAT add_stat, const void *c) {
     add_casted_stat(buffer, stateName(state_), add_stat, c);
 }
 
-ActiveStream::ActiveStream(EventuallyPersistentEngine* e, DcpProducer* p,
+ActiveStream::ActiveStream(EventuallyPersistentEngine* e, DcpProducer& p,
                            const std::string &n, uint32_t flags,
                            uint32_t opaque, uint16_t vb, uint64_t st_seqno,
                            uint64_t en_seqno, uint64_t vb_uuid,
@@ -285,13 +285,13 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e, DcpProducer* p,
 
     if (start_seqno_ >= end_seqno_) {
         endStream(END_STREAM_OK);
-        itemsReady = true;
+        itemsReady.store(true);
     }
 
     type_ = STREAM_ACTIVE;
 
     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) %sstream created with start seqno "
-        "%llu and end seqno %llu", producer->logHeader(), vb, type, st_seqno,
+        "%llu and end seqno %llu", producer.logHeader(), vb, type, st_seqno,
         en_seqno);
 }
 
@@ -322,7 +322,7 @@ DcpResponse* ActiveStream::next() {
             break;
         default:
             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid state '%s'",
-                producer->logHeader(), vb_, stateName(state_));
+                producer.logHeader(), vb_, stateName(state_));
             abort();
     }
 
@@ -331,7 +331,7 @@ DcpResponse* ActiveStream::next() {
         return next();
     }
 
-    itemsReady = response ? true : false;
+    itemsReady.store(response ? true : false);
     return response;
 }
 
@@ -345,7 +345,7 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
     firstMarkerSent = true;
 
     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Sending disk snapshot with start "
-        "seqno %llu and end seqno %llu", producer->logHeader(), vb_, startSeqno,
+        "seqno %llu and end seqno %llu", producer.logHeader(), vb_, startSeqno,
         endSeqno);
     pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
                                    MARKER_FLAG_DISK));
@@ -362,10 +362,10 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
         curChkSeqno = result.first;
     }
 
-    if (!itemsReady) {
-        itemsReady = true;
-        lh.unlock();
-        producer->notifyStreamReady(vb_, false);
+    lh.unlock();
+    bool inverse = false;
+    if (itemsReady.compare_exchange_strong(inverse, true)) {
+        producer.notifyStreamReady(vb_, false);
     }
 }
 
@@ -374,11 +374,10 @@ void ActiveStream::backfillReceived(Item* itm) {
     if (state_ == STREAM_BACKFILLING) {
         pushToReadyQ(new MutationResponse(itm, opaque_));
         lastReadSeqno = itm->getBySeqno();
-
-        if (!itemsReady) {
-            itemsReady = true;
-            lh.unlock();
-            producer->notifyStreamReady(vb_, false);
+        lh.unlock();
+        bool inverse = false;
+        if (itemsReady.compare_exchange_strong(inverse, true)) {
+            producer.notifyStreamReady(vb_, false);
         }
     } else {
         delete itm;
@@ -386,30 +385,25 @@ void ActiveStream::backfillReceived(Item* itm) {
 }
 
 void ActiveStream::completeBackfill() {
-    LockHolder lh(streamMutex);
-
-    if (state_ == STREAM_BACKFILLING) {
-        isBackfillTaskRunning = false;
+    {
+        LockHolder lh(streamMutex);
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Backfill complete, %d items read"
-            " from disk, last seqno read: %ld", producer->logHeader(), vb_,
+            " from disk, last seqno read: %ld", producer.logHeader(), vb_,
             itemsFromBackfill, lastReadSeqno);
+    }
 
-        if (!itemsReady) {
-            itemsReady = true;
-            lh.unlock();
-            producer->notifyStreamReady(vb_, false);
-        }
+    isBackfillTaskRunning.store(false);
+    bool inverse = false;
+    if (itemsReady.compare_exchange_strong(inverse, true)) {
+        producer.notifyStreamReady(vb_, false);
     }
 }
 
 void ActiveStream::snapshotMarkerAckReceived() {
-    LockHolder lh (streamMutex);
-    waitForSnapshot--;
-
-    if (!itemsReady && waitForSnapshot == 0) {
-        itemsReady = true;
-        lh.unlock();
-        producer->notifyStreamReady(vb_, true);
+    bool inverse = false;
+    if (--waitForSnapshot == 0 &&
+        itemsReady.compare_exchange_strong(inverse, true)) {
+        producer.notifyStreamReady(vb_, true);
     }
 }
 
@@ -423,21 +417,21 @@ void ActiveStream::setVBucketStateAckRecieved() {
             takeoverState = vbucket_state_active;
             transitionState(STREAM_TAKEOVER_SEND);
             LOG(EXTENSION_LOG_INFO, "%s (vb %d) Receive ack for set vbucket "
-                "state to pending message", producer->logHeader(), vb_);
+                "state to pending message", producer.logHeader(), vb_);
         } else {
             LOG(EXTENSION_LOG_INFO, "%s (vb %d) Receive ack for set vbucket "
-                "state to active message", producer->logHeader(), vb_);
+                "state to active message", producer.logHeader(), vb_);
             endStream(END_STREAM_OK);
         }
 
-        if (!itemsReady) {
-            itemsReady = true;
-            lh.unlock();
-            producer->notifyStreamReady(vb_, true);
+        lh.unlock();
+        bool inverse = false;
+        if (itemsReady.compare_exchange_strong(inverse, true)) {
+            producer.notifyStreamReady(vb_, true);
         }
     } else {
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Unexpected ack for set vbucket "
-            "op on stream '%s' state '%s'", producer->logHeader(), vb_,
+            "op on stream '%s' state '%s'", producer.logHeader(), vb_,
             name_.c_str(), stateName(state_));
     }
 }
@@ -499,9 +493,11 @@ DcpResponse* ActiveStream::takeoverSendPhase() {
     if (waitForSnapshot != 0) {
         return NULL;
     }
-
-    DcpResponse* resp = new SetVBucketState(opaque_, vb_, takeoverState);
-    transitionState(STREAM_TAKEOVER_WAIT);
+    DcpResponse* resp = NULL;
+    if (producer.bufferLogInsert(SetVBucketState::baseMsgBytes)) {
+        resp = new SetVBucketState(opaque_, vb_, takeoverState);
+        transitionState(STREAM_TAKEOVER_WAIT);
+    }
     return resp;
 }
 
@@ -529,7 +525,7 @@ void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
     snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
     add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
     snprintf(buffer, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
-    add_casted_stat(buffer, itemsReady ? "true" : "false", add_stat, c);
+    add_casted_stat(buffer, itemsReady.load() ? "true" : "false", add_stat, c);
 }
 
 void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
@@ -576,19 +572,21 @@ void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
 DcpResponse* ActiveStream::nextQueuedItem() {
     if (!readyQ.empty()) {
         DcpResponse* response = readyQ.front();
-        if (response->getEvent() == DCP_MUTATION ||
-            response->getEvent() == DCP_DELETION ||
-            response->getEvent() == DCP_EXPIRATION) {
-            lastSentSeqno = dynamic_cast<MutationResponse*>(response)->getBySeqno();
-
-            if (state_ == STREAM_BACKFILLING) {
-                itemsFromBackfill++;
-            } else {
-                itemsFromMemory++;
+        if (producer.bufferLogInsert(response->getMessageSize())) {
+            if (response->getEvent() == DCP_MUTATION ||
+                response->getEvent() == DCP_DELETION ||
+                response->getEvent() == DCP_EXPIRATION) {
+                lastSentSeqno = dynamic_cast<MutationResponse*>(response)->getBySeqno();
+
+                if (state_ == STREAM_BACKFILLING) {
+                    itemsFromBackfill++;
+                } else {
+                    itemsFromMemory++;
+                }
             }
+            popFromReadyQ();
+            return response;
         }
-        popFromReadyQ();
-        return response;
     }
     return NULL;
 }
@@ -675,24 +673,24 @@ void ActiveStream::snapshot(std::list<MutationResponse*>& items, bool mark) {
 }
 
 uint32_t ActiveStream::setDead(end_stream_status_t status) {
-    LockHolder lh(streamMutex);
-    endStream(status);
+    {
+        LockHolder lh(streamMutex);
+        endStream(status);
+    }
 
-    if (!itemsReady && status != END_STREAM_DISCONNECTED) {
-        itemsReady = true;
-        lh.unlock();
-        producer->notifyStreamReady(vb_, true);
+    bool inverse = false;
+    if (status != END_STREAM_DISCONNECTED &&
+        itemsReady.compare_exchange_strong(inverse, true)) {
+        producer.notifyStreamReady(vb_, true);
     }
     return 0;
 }
 
 void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
-    LockHolder lh(streamMutex);
     if (state_ != STREAM_DEAD) {
-        if (!itemsReady) {
-            itemsReady = true;
-            lh.unlock();
-            producer->notifyStreamReady(vb_, true);
+        bool inverse = false;
+        if (itemsReady.compare_exchange_strong(inverse, true)) {
+            producer.notifyStreamReady(vb_, true);
         }
     }
 }
@@ -705,7 +703,7 @@ void ActiveStream::endStream(end_stream_status_t reason) {
         transitionState(STREAM_DEAD);
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream closing, %llu items sent"
             " from disk, %llu items sent from memory, %llu was last seqno sent"
-            " %s is the reason", producer->logHeader(), vb_, itemsFromBackfill,
+            " %s is the reason", producer.logHeader(), vb_, itemsFromBackfill,
             itemsFromMemory, lastSentSeqno, getEndStreamStatusStr(reason));
     }
 }
@@ -750,7 +748,7 @@ void ActiveStream::scheduleBackfill() {
             ExTask task = new DCPBackfill(engine, this, backfillStart, backfillEnd,
                                           Priority::TapBgFetcherPriority, 0, false);
             ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
-            isBackfillTaskRunning = true;
+            isBackfillTaskRunning.store(true);
         } else {
             if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
                 endStream(END_STREAM_OK);
@@ -759,7 +757,7 @@ void ActiveStream::scheduleBackfill() {
             } else {
                 transitionState(STREAM_IN_MEMORY);
             }
-            itemsReady = true;
+            itemsReady.store(true);
         }
     }
 }
@@ -783,7 +781,7 @@ const char* ActiveStream::getEndStreamStatusStr(end_stream_status_t status)
 
 void ActiveStream::transitionState(stream_state_t newState) {
     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
-        producer->logHeader(), vb_, stateName(state_), stateName(newState));
+        producer.logHeader(), vb_, stateName(state_), stateName(newState));
 
     if (state_ == newState) {
         return;
@@ -809,7 +807,7 @@ void ActiveStream::transitionState(stream_state_t newState) {
             break;
         default:
             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
-                "to %s", producer->logHeader(), vb_, stateName(state_),
+                "to %s", producer.logHeader(), vb_, stateName(state_),
                 stateName(newState));
             abort();
     }
@@ -852,10 +850,10 @@ size_t ActiveStream::getItemsRemaining() {
 
 const char* ActiveStream::logHeader()
 {
-    return producer->logHeader();
+    return producer.logHeader();
 }
 
-NotifierStream::NotifierStream(EventuallyPersistentEngine* e, DcpProducer* p,
+NotifierStream::NotifierStream(EventuallyPersistentEngine* e, DcpProducer& p,
                                const std::string &name, uint32_t flags,
                                uint32_t opaque, uint16_t vb, uint64_t st_seqno,
                                uint64_t en_seqno, uint64_t vb_uuid,
@@ -869,13 +867,13 @@ NotifierStream::NotifierStream(EventuallyPersistentEngine* e, DcpProducer* p,
     if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
         pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
         transitionState(STREAM_DEAD);
-        itemsReady = true;
+        itemsReady.store(true);
     }
 
     type_ = STREAM_NOTIFIER;
 
     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) stream created with start seqno "
-        "%llu and end seqno %llu", producer->logHeader(), vb, st_seqno,
+        "%llu and end seqno %llu", producer.logHeader(), vb, st_seqno,
         en_seqno);
 }
 
@@ -885,10 +883,10 @@ uint32_t NotifierStream::setDead(end_stream_status_t status) {
         transitionState(STREAM_DEAD);
         if (status != END_STREAM_DISCONNECTED) {
             pushToReadyQ(new StreamEndResponse(opaque_, status, vb_));
-            if (!itemsReady) {
-                itemsReady = true;
-                lh.unlock();
-                producer->notifyStreamReady(vb_, true);
+            lh.unlock();
+            bool inverse = false;
+            if (itemsReady.compare_exchange_strong(inverse, true)) {
+                producer.notifyStreamReady(vb_, true);
             }
         }
     }
@@ -900,10 +898,10 @@ void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
     if (state_ != STREAM_DEAD && start_seqno_ < seqno) {
         pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
         transitionState(STREAM_DEAD);
-        if (!itemsReady) {
-            itemsReady = true;
-            lh.unlock();
-            producer->notifyStreamReady(vb_, true);
+        lh.unlock();
+        bool inverse = false;
+        if (itemsReady.compare_exchange_strong(inverse, true)) {
+            producer.notifyStreamReady(vb_, true);
         }
     }
 }
@@ -912,19 +910,23 @@ DcpResponse* NotifierStream::next() {
     LockHolder lh(streamMutex);
 
     if (readyQ.empty()) {
-        itemsReady = false;
+        itemsReady.store(false);
         return NULL;
     }
 
     DcpResponse* response = readyQ.front();
-    popFromReadyQ();
+    if (producer.bufferLogInsert(response->getMessageSize())) {
+        popFromReadyQ();
+    } else {
+        response = NULL;
+    }
 
     return response;
 }
 
 void NotifierStream::transitionState(stream_state_t newState) {
     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
-        producer->logHeader(), vb_, stateName(state_), stateName(newState));
+        producer.logHeader(), vb_, stateName(state_), stateName(newState));
 
     if (state_ == newState) {
         return;
@@ -936,7 +938,7 @@ void NotifierStream::transitionState(stream_state_t newState) {
             break;
         default:
             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
-                "to %s", producer->logHeader(), vb_, stateName(state_),
+                "to %s", producer.logHeader(), vb_, stateName(state_),
                 stateName(newState));
             abort();
     }
@@ -958,7 +960,7 @@ PassiveStream::PassiveStream(EventuallyPersistentEngine* e, DcpConsumer* c,
     LockHolder lh(streamMutex);
     pushToReadyQ(new StreamRequest(vb, opaque, flags, st_seqno, en_seqno,
                                   vb_uuid, snap_start_seqno, snap_end_seqno));
-    itemsReady = true;
+    itemsReady.store(true);
     type_ = STREAM_PASSIVE;
 
     const char* type = (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) ? "takeover" : "";
@@ -997,9 +999,9 @@ void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
             transitionState(STREAM_DEAD);
         }
         pushToReadyQ(new AddStreamResponse(add_opaque, opaque_, status));
-        if (!itemsReady) {
-            itemsReady = true;
-            lh.unlock();
+        lh.unlock();
+        bool inverse = false;
+        if (itemsReady.compare_exchange_strong(inverse, true)) {
             consumer->notifyStreamReady(vb_);
         }
     }
@@ -1021,9 +1023,9 @@ void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
     pushToReadyQ(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
                                   end_seqno_, vb_uuid_, snap_start_seqno_,
                                   snap_end_seqno_));
-    if (!itemsReady) {
-        itemsReady = true;
-        lh.unlock();
+    lh.unlock();
+    bool inverse = false;
+    if (itemsReady.compare_exchange_strong(inverse, true)) {
         consumer->notifyStreamReady(vb_);
     }
 }
@@ -1304,9 +1306,9 @@ void PassiveStream::processSetVBucketState(SetVBucketState* state) {
 
     LockHolder lh (streamMutex);
     pushToReadyQ(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
-    if (!itemsReady) {
-        itemsReady = true;
-        lh.unlock();
+    lh.unlock();
+    bool inverse = false;
+    if (itemsReady.compare_exchange_strong(inverse, true)) {
         consumer->notifyStreamReady(vb_);
     }
 }
@@ -1330,9 +1332,9 @@ void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
         if (cur_snapshot_ack) {
             LockHolder lh(streamMutex);
             pushToReadyQ(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
-            if (!itemsReady) {
-                itemsReady = true;
-                lh.unlock();
+            lh.unlock();
+            bool inverse = false;
+            if (itemsReady.compare_exchange_strong(inverse, true)) {
                 consumer->notifyStreamReady(vb_);
             }
             cur_snapshot_ack = false;
@@ -1352,7 +1354,7 @@ void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
     snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(), vb_);
     add_casted_stat(buf, buffer.bytes, add_stat, c);
     snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
-    add_casted_stat(buf, itemsReady ? "true" : "false", add_stat, c);
+    add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
     snprintf(buf, bsize, "%s:stream_%d_last_received_seqno", name_.c_str(), vb_);
     add_casted_stat(buf, last_seqno, add_stat, c);
     snprintf(buf, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
@@ -1373,7 +1375,7 @@ DcpResponse* PassiveStream::next() {
     LockHolder lh(streamMutex);
 
     if (readyQ.empty()) {
-        itemsReady = false;
+        itemsReady.store(false);
         return NULL;
     }
 
index 424d892..031228f 100644 (file)
@@ -141,7 +141,7 @@ protected:
     stream_state_t state_;
     stream_type_t type_;
 
-    bool itemsReady;
+    AtomicValue<bool> itemsReady;
     Mutex streamMutex;
     std::queue<DcpResponse*> readyQ;
 
@@ -154,7 +154,7 @@ private:
 
 class ActiveStream : public Stream {
 public:
-    ActiveStream(EventuallyPersistentEngine* e, DcpProducer* p,
+    ActiveStream(EventuallyPersistentEngine* e, DcpProducer& p,
                  const std::string &name, uint32_t flags, uint32_t opaque,
                  uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
                  uint64_t vb_uuid, uint64_t snap_start_seqno,
@@ -245,16 +245,18 @@ private:
     //! Whether ot not this is the first snapshot marker sent
     bool firstMarkerSent;
 
-    int waitForSnapshot;
+    AtomicValue<int> waitForSnapshot;
 
     EventuallyPersistentEngine* engine;
-    DcpProducer* producer;
-    bool isBackfillTaskRunning;
+
+    DcpProducer& producer;
+
+    AtomicValue<bool> isBackfillTaskRunning;
 };
 
 class NotifierStream : public Stream {
 public:
-    NotifierStream(EventuallyPersistentEngine* e, DcpProducer* producer,
+    NotifierStream(EventuallyPersistentEngine* e, DcpProducer& producer,
                    const std::string &name, uint32_t flags, uint32_t opaque,
                    uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
                    uint64_t vb_uuid, uint64_t snap_start_seqno,
@@ -276,7 +278,7 @@ private:
 
     void transitionState(stream_state_t newState);
 
-    DcpProducer* producer;
+    DcpProducer& producer;
 };
 
 class PassiveStream : public Stream {
@@ -349,7 +351,7 @@ private:
     } buffer;
 };
 
-typedef SingleThreadedRCPtr<Stream> stream_t;
+typedef RCPtr<Stream> stream_t;
 typedef RCPtr<PassiveStream> passive_stream_t;
 
 #endif  // SRC_DCP_STREAM_H_
index cec2180..3841998 100644 (file)
@@ -284,7 +284,7 @@ public:
     virtual const char *getType() const = 0;
 
     template <typename T>
-    void addStat(const char *nm, const T &val, ADD_STAT add_stat, const void *c) {
+    void addStat(const char *nm, const T &val, ADD_STAT add_stat, const void *c) const {
         std::stringstream tap;
         tap << name << ":" << nm;
         std::stringstream value;
@@ -293,7 +293,7 @@ public:
         add_casted_stat(n.data(), value.str().data(), add_stat, c);
     }
 
-    void addStat(const char *nm, bool val, ADD_STAT add_stat, const void *c) {
+    void addStat(const char *nm, bool val, ADD_STAT add_stat, const void *c) const {
         addStat(nm, val ? "true" : "false", add_stat, c);
     }