Merge branch 'watson'
[ep-engine.git] / src / dcp / producer.cc
index 2b3cf4f..7159dd3 100644 (file)
  */
 
 #include <vector>
+#include <memcached/server_api.h>
 
 #include "dcp/producer.h"
 
 #include "backfill.h"
-#include "compress.h"
 #include "common.h"
 #include "ep_engine.h"
 #include "failover-table.h"
 #include "dcp/backfill-manager.h"
+#include "dcp/dcpconnmap.h"
 #include "dcp/response.h"
 #include "dcp/stream.h"
 
-const uint32_t DcpProducer::defaultNoopInerval = 20;
+const std::chrono::seconds DcpProducer::defaultDcpNoopTxInterval(20);
 
 DcpProducer::BufferLog::State DcpProducer::BufferLog::getState_UNLOCKED() {
     if (isEnabled_UNLOCKED()) {
@@ -81,7 +82,16 @@ bool DcpProducer::BufferLog::pauseIfFull() {
 
 void DcpProducer::BufferLog::unpauseIfSpaceAvailable() {
     ReaderLockHolder rlh(logLock);
-    if (getState_UNLOCKED() != Full) {
+    if (getState_UNLOCKED() == Full) {
+        LOG(EXTENSION_LOG_NOTICE,
+            "%s Unable to notify paused connection "
+            "because DcpProducer::BufferLog is full; ackedBytes:%lx, "
+            "bytesSent:%lx, maxBytes:%lx",
+            producer.logHeader(),
+            ackedBytes,
+            bytesSent,
+            maxBytes);
+    } else {
         producer.notifyPaused(true);
     }
 }
@@ -93,6 +103,14 @@ void DcpProducer::BufferLog::acknowledge(size_t bytes) {
         release_UNLOCKED(bytes);
         ackedBytes += bytes;
         if (state == Full) {
+            LOG(EXTENSION_LOG_NOTICE,
+                "%s Notifying paused connection now that "
+                "DcpProducer::Bufferlog is no longer full; ackedBytes:%lx, "
+                "bytesSent:%lx, maxBytes:%lx",
+                producer.logHeader(),
+                ackedBytes,
+                bytesSent,
+                maxBytes);
             producer.notifyPaused(true);
         }
     }
@@ -110,15 +128,25 @@ void DcpProducer::BufferLog::addStats(ADD_STAT add_stat, const void *c) {
     }
 }
 
-DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
-                         const std::string &name, bool isNotifier)
-    : Producer(e, cookie, name), rejectResp(NULL),
-      notifyOnly(isNotifier), lastSendTime(ep_current_time()), log(*this),
-      itemsSent(0), totalBytesSent(0) {
+DcpProducer::DcpProducer(EventuallyPersistentEngine& e,
+                         const void* cookie,
+                         const std::string& name,
+                         bool isNotifier,
+                         bool startTask,
+                         MutationType mutType)
+    : Producer(e, cookie, name),
+      rejectResp(NULL),
+      notifyOnly(isNotifier),
+      lastSendTime(ep_current_time()),
+      log(*this),
+      itemsSent(0),
+      totalBytesSent(0),
+      mutationType(mutType) {
     setSupportAck(true);
     setReserved(true);
     setPaused(true);
 
+    logger.setId(e.getServerApi()->cookie->get_log_info(cookie).first);
     if (notifyOnly) {
         setLogHeader("DCP (Notifier) " + getName() + " -");
     } else {
@@ -144,11 +172,13 @@ DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
     // noop interval to 20 seconds by default, but in post 3.0 releases we set
     // it to be higher by default. Starting in 3.0.1 the DCP consumer sets the
     // noop interval of the producer when connecting so in an all 3.0.1+ cluster
-    // this value will be overriden. In 3.0 however we do not set the noop
+    // this value will be overridden. In 3.0 however we do not set the noop
     // interval so setting this value will make sure we don't disconnect on
     // accident due to the producer and the consumer having a different noop
     // interval.
-    noopCtx.noopInterval = defaultNoopInerval;
+    noopCtx.dcpNoopTxInterval = defaultDcpNoopTxInterval;
+    noopCtx.dcpIdleTimeout = std::chrono::seconds(
+            engine_.getConfiguration().getDcpIdleTimeout());
     noopCtx.pendingRecv = false;
     noopCtx.enabled = false;
 
@@ -165,17 +195,21 @@ DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
         supportsCursorDropping = true;
     }
 
-    backfillMgr.reset(new BackfillManager(&engine_));
+    backfillMgr.reset(new BackfillManager(engine_));
 
-    checkpointCreatorTask = new ActiveStreamCheckpointProcessorTask(e);
-    ExecutorPool::get()->schedule(checkpointCreatorTask, AUXIO_TASK_IDX);
+    if (startTask) {
+        createCheckpointProcessorTask();
+        scheduleCheckpointProcessorTask();
+    }
 }
 
 DcpProducer::~DcpProducer() {
     backfillMgr.reset();
     delete rejectResp;
 
-    ExecutorPool::get()->cancel(checkpointCreatorTask->getId());
+    if (checkpointCreatorTask) {
+        ExecutorPool::get()->cancel(checkpointCreatorTask->getId());
+    }
 }
 
 ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
@@ -188,17 +222,26 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
                                              uint64_t snap_end_seqno,
                                              uint64_t *rollback_seqno,
                                              dcp_add_failover_log callback) {
+
+    lastReceiveTime = ep_current_time();
     if (doDisconnect()) {
         return ENGINE_DISCONNECT;
     }
 
-    RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
+    VBucketPtr vb = engine_.getVBucket(vbucket);
     if (!vb) {
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
             "this vbucket doesn't exist", logHeader(), vbucket);
         return ENGINE_NOT_MY_VBUCKET;
     }
 
+    if ((flags & DCP_ADD_STREAM_ACTIVE_VB_ONLY) &&
+        (vb->getState() != vbucket_state_active)) {
+        LOG(EXTENSION_LOG_NOTICE, "%s (vb %d) Stream request failed because "
+            "the vbucket is in %s state, only active vbuckets were requested",
+            logHeader(), vbucket, vb->toString(vb->getState()));
+        return ENGINE_NOT_MY_VBUCKET;
+    }
     if (vb->checkpointManager.getOpenCheckpointId() == 0) {
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
             "this vbucket is in backfill state", logHeader(), vbucket);
@@ -224,17 +267,20 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
     }
 
     bool add_vb_conn_map = true;
-    std::map<uint16_t, stream_t>::iterator itr;
     {
-        WriterLockHolder wlh(streamsMutex);
-        if ((itr = streams.find(vbucket)) != streams.end()) {
-            if (itr->second->getState() != STREAM_DEAD) {
+        // Need to synchronise the search and conditional erase,
+        // therefore use external locking here.
+        std::lock_guard<StreamsMap> guard(streams);
+        auto it = streams.find(vbucket, guard);
+        if (it.second) {
+            auto& stream = it.first;
+            if (stream->isActive()) {
                 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed"
                     " because a stream already exists for this vbucket",
                     logHeader(), vbucket);
                 return ENGINE_KEY_EEXISTS;
             } else {
-                streams.erase(vbucket);
+                streams.erase(vbucket, guard);
 
                 // Don't need to add an entry to vbucket-to-conns map
                 add_vb_conn_map = false;
@@ -251,16 +297,31 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
         start_seqno = static_cast<uint64_t>(vb->getHighSeqno());
     }
 
-    if (vb->failovers->needsRollback(start_seqno, vb->getHighSeqno(),
-                                     vbucket_uuid, snap_start_seqno,
-                                     snap_end_seqno, vb->getPurgeSeqno(),
-                                     rollback_seqno)) {
-        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed "
-            "because a rollback to seqno %" PRIu64 " is required "
-            "(start seqno %" PRIu64 ", vb_uuid %" PRIu64 ", snapStartSeqno %" PRIu64
-            ", snapEndSeqno %" PRIu64 ")",
-            logHeader(), vbucket, *rollback_seqno, start_seqno, vbucket_uuid,
-            snap_start_seqno, snap_end_seqno);
+    std::pair<bool, std::string> need_rollback =
+            vb->failovers->needsRollback(start_seqno,
+                                         vb->getHighSeqno(),
+                                         vbucket_uuid,
+                                         snap_start_seqno,
+                                         snap_end_seqno,
+                                         vb->getPurgeSeqno(),
+                                         rollback_seqno);
+
+    if (need_rollback.first) {
+        LOG(EXTENSION_LOG_WARNING,
+            "%s (vb %d) Stream request requires rollback to seqno:%" PRIu64
+            " because %s. Client requested"
+            " seqnos:{%" PRIu64 ",%" PRIu64 "}"
+            " snapshot:{%" PRIu64 ",%" PRIu64 "}"
+            " uuid:%" PRIu64,
+            logHeader(),
+            vbucket,
+            *rollback_seqno,
+            need_rollback.second.c_str(),
+            start_seqno,
+            end_seqno,
+            snap_start_seqno,
+            snap_end_seqno,
+            vbucket_uuid);
         return ENGINE_ROLLBACK;
     }
 
@@ -276,7 +337,7 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
     }
 
     if (flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
-        end_seqno = engine_.getEpStore()->getLastPersistedSeqno(vbucket);
+        end_seqno = engine_.getKVBucket()->getLastPersistedSeqno(vbucket);
     }
 
     if (!notifyOnly && start_seqno > end_seqno) {
@@ -308,11 +369,12 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
                                opaque, vbucket, notifySeqno,
                                end_seqno, vbucket_uuid,
                                snap_start_seqno, snap_end_seqno);
-   } else {
+    } else {
         s = new ActiveStream(&engine_, this, getName(), flags,
                              opaque, vbucket, start_seqno,
                              end_seqno, vbucket_uuid,
-                             snap_start_seqno, snap_end_seqno);
+                             snap_start_seqno, snap_end_seqno,
+                             (mutationType == MutationType::KeyOnly));
     }
 
     {
@@ -323,9 +385,12 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
             return ENGINE_NOT_MY_VBUCKET;
         }
 
-        WriterLockHolder wlh(streamsMutex);
-        s->setActive();
-        streams[vbucket] = s;
+        if (!notifyOnly) {
+            // MB-19428: Only activate the stream if we are adding it to the
+            // streams map.
+            static_cast<ActiveStream*>(s.get())->setActive();
+        }
+        streams.insert(std::make_pair(vbucket, s));
     }
 
     notifyStreamReady(vbucket);
@@ -341,11 +406,12 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
 ENGINE_ERROR_CODE DcpProducer::getFailoverLog(uint32_t opaque, uint16_t vbucket,
                                               dcp_add_failover_log callback) {
     (void) opaque;
+    lastReceiveTime = ep_current_time();
     if (doDisconnect()) {
         return ENGINE_DISCONNECT;
     }
 
-    RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
+    VBucketPtr vb = engine_.getVBucket(vbucket);
     if (!vb) {
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Get Failover Log failed "
             "because this vbucket doesn't exist", logHeader(), vbucket);
@@ -363,6 +429,10 @@ ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
     }
 
     ENGINE_ERROR_CODE ret;
+    if ((ret = maybeDisconnect()) != ENGINE_FAILED) {
+          return ret;
+    }
+
     if ((ret = maybeSendNoop(producers)) != ENGINE_FAILED) {
         return ret;
     }
@@ -378,25 +448,20 @@ ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
         }
     }
 
-    Item* itmCpy = NULL;
-    if (resp->getEvent() == DCP_MUTATION) {
+    Item* itmCpy = nullptr;
+    auto* mutationResponse = dynamic_cast<MutationProducerResponse*>(resp);
+    if (mutationResponse != nullptr) {
         try {
-            itmCpy = static_cast<MutationResponse*>(resp)->getItemCopy();
+            itmCpy = mutationResponse->getItemCopy();
         } catch (const std::bad_alloc&) {
             rejectResp = resp;
-            LOG(EXTENSION_LOG_WARNING, "%s (vb %d) ENOMEM while trying to copy "
-                "item with seqno %" PRIu64 "before streaming it", logHeader(),
-                static_cast<MutationResponse*>(resp)->getVBucket(),
-                static_cast<MutationResponse*>(resp)->getBySeqno());
+            LOG(EXTENSION_LOG_WARNING,
+                "%s (vb %d) ENOMEM while trying to copy "
+                "item with seqno %" PRIu64 "before streaming it",
+                logHeader(),
+                mutationResponse->getVBucket(),
+                *mutationResponse->getBySeqno());
             return ENGINE_ENOMEM;
-        } catch (const std::logic_error&) {
-            rejectResp = resp;
-            LOG(EXTENSION_LOG_WARNING, "%s (vb %d) illegal mutation payload "
-                "type while copying an item with seqno %" PRIu64 "before "
-                "streaming it", logHeader(),
-                static_cast<MutationResponse*>(resp)->getVBucket(),
-                static_cast<MutationResponse*>(resp)->getBySeqno());
-            return ENGINE_ENOTSUP;
         }
 
         if (enableValueCompression) {
@@ -419,62 +484,59 @@ ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
                 log.acknowledge(sizeBefore - sizeAfter);
             }
         }
-
     }
 
     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL,
                                                                      true);
     switch (resp->getEvent()) {
-        case DCP_STREAM_END:
+        case DcpResponse::Event::StreamEnd:
         {
             StreamEndResponse *se = static_cast<StreamEndResponse*>(resp);
             ret = producers->stream_end(getCookie(), se->getOpaque(),
                                         se->getVbucket(), se->getFlags());
             break;
         }
-        case DCP_MUTATION:
+        case DcpResponse::Event::Mutation:
         {
-            MutationResponse *m = dynamic_cast<MutationResponse*> (resp);
-            if (m->getExtMetaData()) {
-                std::pair<const char*, uint16_t> meta = m->getExtMetaData()->getExtMeta();
-                ret = producers->mutation(getCookie(), m->getOpaque(), itmCpy,
-                                          m->getVBucket(), m->getBySeqno(),
-                                          m->getRevSeqno(), 0,
-                                          meta.first, meta.second,
-                                          m->getItem()->getNRUValue());
-            } else {
-                ret = producers->mutation(getCookie(), m->getOpaque(), itmCpy,
-                                          m->getVBucket(), m->getBySeqno(),
-                                          m->getRevSeqno(), 0,
-                                          NULL, 0,
-                                          m->getItem()->getNRUValue());
+            if (itmCpy == nullptr) {
+                throw std::logic_error(
+                    "DcpProducer::step(Mutation): itmCpy must be != nullptr");
             }
+            std::pair<const char*, uint16_t> meta{nullptr, 0};
+            if (mutationResponse->getExtMetaData()) {
+                meta = mutationResponse->getExtMetaData()->getExtMeta();
+            }
+            ret = producers->mutation(getCookie(),
+                                      mutationResponse->getOpaque(),
+                                      itmCpy,
+                                      mutationResponse->getVBucket(),
+                                      *mutationResponse->getBySeqno(),
+                                      mutationResponse->getRevSeqno(),
+                                      0 /* lock time */,
+                                      meta.first, meta.second,
+                                      mutationResponse->getItem()->getNRUValue());
             break;
         }
-        case DCP_DELETION:
+        case DcpResponse::Event::Deletion:
         {
-            MutationResponse *m = static_cast<MutationResponse*>(resp);
-            if (m->getExtMetaData()) {
-                std::pair<const char*, uint16_t> meta = m->getExtMetaData()->getExtMeta();
-                ret = producers->deletion(getCookie(), m->getOpaque(),
-                                          m->getItem()->getKey().c_str(),
-                                          m->getItem()->getNKey(),
-                                          m->getItem()->getCas(),
-                                          m->getVBucket(), m->getBySeqno(),
-                                          m->getRevSeqno(),
-                                          meta.first, meta.second);
-            } else {
-                ret = producers->deletion(getCookie(), m->getOpaque(),
-                                          m->getItem()->getKey().c_str(),
-                                          m->getItem()->getNKey(),
-                                          m->getItem()->getCas(),
-                                          m->getVBucket(), m->getBySeqno(),
-                                          m->getRevSeqno(),
-                                          NULL, 0);
+            if (itmCpy == nullptr) {
+                throw std::logic_error(
+                    "DcpProducer::step(Deletion): itmCpy must be != nullptr");
+            }
+            std::pair<const char*, uint16_t> meta{nullptr, 0};
+            if (mutationResponse->getExtMetaData()) {
+                meta = mutationResponse->getExtMetaData()->getExtMeta();
             }
+            ret = producers->deletion(getCookie(),
+                                      mutationResponse->getOpaque(),
+                                      itmCpy,
+                                      mutationResponse->getVBucket(),
+                                      *mutationResponse->getBySeqno(),
+                                      mutationResponse->getRevSeqno(),
+                                      meta.first, meta.second);
             break;
         }
-        case DCP_SNAPSHOT_MARKER:
+        case DcpResponse::Event::SnapshotMarker:
         {
             SnapshotMarker *s = static_cast<SnapshotMarker*>(resp);
             ret = producers->marker(getCookie(), s->getOpaque(),
@@ -484,26 +546,38 @@ ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
                                     s->getFlags());
             break;
         }
-        case DCP_SET_VBUCKET:
+        case DcpResponse::Event::SetVbucket:
         {
             SetVBucketState *s = static_cast<SetVBucketState*>(resp);
             ret = producers->set_vbucket_state(getCookie(), s->getOpaque(),
                                                s->getVBucket(), s->getState());
             break;
         }
+        case DcpResponse::Event::SystemEvent: {
+            SystemEventProducerMessage* s =
+                    static_cast<SystemEventProducerMessage*>(resp);
+            ret = producers->system_event(
+                    getCookie(),
+                    s->getOpaque(),
+                    s->getVBucket(),
+                    uint32_t(s->getSystemEvent()),
+                    *s->getBySeqno(),
+                    {reinterpret_cast<const uint8_t*>(s->getKey().data()),
+                     s->getKey().size()},
+                    s->getEventData());
+            break;
+        }
         default:
         {
-            LOG(EXTENSION_LOG_WARNING, "%s Unexpected dcp event (%d), "
-                "disconnecting", logHeader(), resp->getEvent());
+            LOG(EXTENSION_LOG_WARNING, "%s Unexpected dcp event (%s), "
+                "disconnecting", logHeader(),
+                resp->to_string());
             ret = ENGINE_DISCONNECT;
             break;
         }
     }
 
     ObjectRegistry::onSwitchThread(epe);
-    if (resp->getEvent() == DCP_MUTATION && ret != ENGINE_SUCCESS) {
-        delete itmCpy;
-    }
 
     if (ret == ENGINE_E2BIG) {
         rejectResp = resp;
@@ -518,6 +592,7 @@ ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
 ENGINE_ERROR_CODE DcpProducer::bufferAcknowledgement(uint32_t opaque,
                                                      uint16_t vbucket,
                                                      uint32_t buffer_bytes) {
+    lastReceiveTime = ep_current_time();
     log.acknowledge(buffer_bytes);
     return ENGINE_SUCCESS;
 }
@@ -525,6 +600,7 @@ ENGINE_ERROR_CODE DcpProducer::bufferAcknowledgement(uint32_t opaque,
 ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
                                        uint16_t nkey, const void* value,
                                        uint32_t nvalue) {
+    lastReceiveTime = ep_current_time();
     const char* param = static_cast<const char*>(key);
     std::string keyStr(static_cast<const char*>(key), nkey);
     std::string valueStr(static_cast<const char*>(value), nvalue);
@@ -570,8 +646,28 @@ ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
         }
         return ENGINE_SUCCESS;
     } else if (strncmp(param, "set_noop_interval", nkey) == 0) {
-        if (parseUint32(valueStr.c_str(), &noopCtx.noopInterval)) {
-            return ENGINE_SUCCESS;
+        uint32_t noopInterval;
+        if (parseUint32(valueStr.c_str(), &noopInterval)) {
+            /*
+             * We need to ensure that we only set the noop interval to a value
+             * that is a multiple of the connection manager interval. The reason
+             * is that if there is no DCP traffic we snooze for the connection
+             * manager interval before sending the noop.
+             */
+            if (noopInterval % engine_.getConfiguration().
+                    getConnectionManagerInterval() == 0) {
+                noopCtx.dcpNoopTxInterval = std::chrono::seconds(noopInterval);
+                return ENGINE_SUCCESS;
+            } else {
+                LOG(EXTENSION_LOG_WARNING, "%s The ctrl parameter "
+                    "set_noop_interval is being set to %" PRIu32 " seconds."
+                    "This is not a multiple of the connectionManagerInterval "
+                    "of %" PRIu64 " seconds, and so is not supported.",
+                    logHeader(), noopInterval,
+                    uint64_t(engine_.getConfiguration().
+                             getConnectionManagerInterval()));
+                return ENGINE_EINVAL;
+            }
         }
     } else if(strncmp(param, "set_priority", nkey) == 0) {
         if (valueStr == "high") {
@@ -595,10 +691,10 @@ ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
     return ENGINE_EINVAL;
 }
 
-ENGINE_ERROR_CODE DcpProducer::handleResponse(
-                                        protocol_binary_response_header *resp) {
+bool DcpProducer::handleResponse(protocol_binary_response_header* resp) {
+    lastReceiveTime = ep_current_time();
     if (doDisconnect()) {
-        return ENGINE_DISCONNECT;
+        return false;
     }
 
     uint8_t opcode = resp->response.opcode;
@@ -608,24 +704,22 @@ ENGINE_ERROR_CODE DcpProducer::handleResponse(
             reinterpret_cast<protocol_binary_response_dcp_stream_req*>(resp);
         uint32_t opaque = pkt->message.header.response.opaque;
 
-        stream_t active_stream;
-        std::map<uint16_t, stream_t>::iterator itr;
-        {
-            ReaderLockHolder rlh(streamsMutex);
-            for (itr = streams.begin() ; itr != streams.end(); ++itr) {
-                active_stream = itr->second;
-                Stream *str = active_stream.get();
-                if (str && str->getType() == STREAM_ACTIVE) {
-                    ActiveStream* as = static_cast<ActiveStream*>(str);
-                    if (as && opaque == itr->second->getOpaque()) {
-                        break;
-                    }
+
+        // Search for an active stream with the same opaque as the response.
+        auto itr = streams.find_if(
+            [opaque](const StreamsMap::value_type& s) {
+                const auto& stream = s.second;
+                if (stream && stream->isTypeActive()) {
+                    ActiveStream* as = static_cast<ActiveStream*>(stream.get());
+                    return (as && opaque == stream->getOpaque());
+                } else {
+                    return false;
                 }
             }
-        }
+        );
 
-        if (itr != streams.end()) {
-            ActiveStream *as = static_cast<ActiveStream*>(active_stream.get());
+        if (itr.second) {
+            ActiveStream *as = static_cast<ActiveStream*>(itr.first.get());
             if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE) {
                 as->setVBucketStateAckRecieved();
             } else if (opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
@@ -633,53 +727,53 @@ ENGINE_ERROR_CODE DcpProducer::handleResponse(
             }
         }
 
-        return ENGINE_SUCCESS;
+        return true;
     } else if (opcode == PROTOCOL_BINARY_CMD_DCP_MUTATION ||
         opcode == PROTOCOL_BINARY_CMD_DCP_DELETION ||
         opcode == PROTOCOL_BINARY_CMD_DCP_EXPIRATION ||
         opcode == PROTOCOL_BINARY_CMD_DCP_STREAM_END) {
         // TODO: When nacking is implemented we need to handle these responses
-        return ENGINE_SUCCESS;
+        return true;
     } else if (opcode == PROTOCOL_BINARY_CMD_DCP_NOOP) {
         if (noopCtx.opaque == resp->response.opaque) {
             noopCtx.pendingRecv = false;
-            return ENGINE_SUCCESS;
+            return true;
         }
     }
 
     LOG(EXTENSION_LOG_WARNING, "%s Trying to handle an unknown response %d, "
         "disconnecting", logHeader(), opcode);
 
-    return ENGINE_DISCONNECT;
+    return false;
 }
 
 ENGINE_ERROR_CODE DcpProducer::closeStream(uint32_t opaque, uint16_t vbucket) {
+    lastReceiveTime = ep_current_time();
     if (doDisconnect()) {
         return ENGINE_DISCONNECT;
     }
 
-    stream_t stream = findStreamByVbid(vbucket);
+    auto it = streams.erase(vbucket);
+
     ENGINE_ERROR_CODE ret;
-    if (!stream) {
+    if (!it.second) {
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because no "
             "stream exists for this vbucket", logHeader(), vbucket);
         return ENGINE_KEY_ENOENT;
-    } else if (!stream->isActive()) {
-        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because "
-            "stream is already marked as dead", logHeader(), vbucket);
-        connection_t conn(this);
-        engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
-        ret = ENGINE_KEY_ENOENT;
     } else {
-        stream->setDead(END_STREAM_CLOSED);
-        connection_t conn(this);
-        engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
-        ret = ENGINE_SUCCESS;
-    }
-
-    {
-        WriterLockHolder wlh(streamsMutex);
-        streams.erase(vbucket);
+        auto& stream = it.first;
+        if (!stream->isActive()) {
+            LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because "
+                "stream is already marked as dead", logHeader(), vbucket);
+            connection_t conn(this);
+            engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
+            ret = ENGINE_KEY_ENOENT;
+        } else {
+            stream->setDead(END_STREAM_CLOSED);
+            connection_t conn(this);
+            engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
+            ret = ENGINE_SUCCESS;
+        }
     }
 
     return ret;
@@ -689,17 +783,23 @@ void DcpProducer::notifyBackfillManager() {
     backfillMgr->wakeUpTask();
 }
 
-bool DcpProducer::recordBackfillManagerBytesRead(uint32_t bytes) {
-    return backfillMgr->bytesRead(bytes);
+bool DcpProducer::recordBackfillManagerBytesRead(size_t bytes, bool force) {
+    if (force) {
+        backfillMgr->bytesForceRead(bytes);
+        return true;
+    }
+    return backfillMgr->bytesCheckAndRead(bytes);
 }
 
-void DcpProducer::recordBackfillManagerBytesSent(uint32_t bytes) {
+void DcpProducer::recordBackfillManagerBytesSent(size_t bytes) {
     backfillMgr->bytesSent(bytes);
 }
 
-void DcpProducer::scheduleBackfillManager(stream_t s,
-                                          uint64_t start, uint64_t end) {
-    backfillMgr->schedule(s, start, end);
+void DcpProducer::scheduleBackfillManager(VBucket& vb,
+                                          const active_stream_t& s,
+                                          uint64_t start,
+                                          uint64_t end) {
+    backfillMgr->schedule(vb, s, start, end);
 }
 
 void DcpProducer::addStats(ADD_STAT add_stat, const void *c) {
@@ -708,8 +808,8 @@ void DcpProducer::addStats(ADD_STAT add_stat, const void *c) {
     addStat("items_sent", getItemsSent(), add_stat, c);
     addStat("items_remaining", getItemsRemaining(), add_stat, c);
     addStat("total_bytes_sent", getTotalBytes(), add_stat, c);
-    addStat("last_sent_time", lastSendTime, add_stat,
-            c);
+    addStat("last_sent_time", lastSendTime, add_stat, c);
+    addStat("last_receive_time", lastReceiveTime, add_stat, c);
     addStat("noop_enabled", noopCtx.enabled, add_stat, c);
     addStat("noop_wait", noopCtx.pendingRecv, add_stat, c);
     addStat("priority", priority.c_str(), add_stat, c);
@@ -730,26 +830,41 @@ void DcpProducer::addStats(ADD_STAT add_stat, const void *c) {
 
     log.addStats(add_stat, c);
 
-    ReaderLockHolder rlh(streamsMutex);
     addStat("num_streams", streams.size(), add_stat, c);
-    std::map<uint16_t, stream_t>::iterator itr;
-    for (itr = streams.begin(); itr != streams.end(); ++itr) {
-        itr->second->addStats(add_stat, c);
+
+    // Make a copy of all valid streams (under lock), and then call addStats
+    // for each one. (Done in two stages to minmise how long we have the
+    // streams map locked for).
+    std::vector<StreamsMap::mapped_type> valid_streams;
+
+    streams.for_each(
+        [&valid_streams](const StreamsMap::value_type& element) {
+            valid_streams.push_back(element.second);
+        }
+    );
+    for (const auto& stream : valid_streams) {
+        stream->addStats(add_stat, c);
     }
 }
 
 void DcpProducer::addTakeoverStats(ADD_STAT add_stat, const void* c,
-                                   uint16_t vbid) {
-
-    stream_t stream = findStreamByVbid(vbid);
-    if (stream && stream->getType() == STREAM_ACTIVE) {
-        ActiveStream* as = static_cast<ActiveStream*>(stream.get());
-        if (as) {
-            if (as->getState() == STREAM_DEAD) {
-                return;
-            }
-            as->addTakeoverStats(add_stat, c);
+                                   const VBucket& vb) {
+
+    auto stream = findStream(vb.getId());
+    if (stream) {
+        if (stream->isTypeActive()) {
+            ActiveStream* as = static_cast<ActiveStream*>(stream.get());
+            as->addTakeoverStats(add_stat, c, vb);
+        } else {
+            LOG(EXTENSION_LOG_WARNING, "%s (vb:%" PRIu16 ") "
+                "DcpProducer::addTakeoverStats Stream type is %s and not the "
+                "expected Active",
+                logHeader(), vb.getId(), to_string(stream->getType()).c_str());
         }
+    } else {
+        LOG(EXTENSION_LOG_NOTICE, "%s (vb:%" PRIu16 ") "
+            "DcpProducer::addTakeoverStats Unable to find stream",
+            logHeader(), vb.getId());
     }
 }
 
@@ -761,14 +876,14 @@ void DcpProducer::aggregateQueueStats(ConnCounter& aggregator) {
 }
 
 void DcpProducer::notifySeqnoAvailable(uint16_t vbucket, uint64_t seqno) {
-    stream_t stream = findStreamByVbid(vbucket);
+    auto stream = findStream(vbucket);
     if (stream && stream->isActive()) {
         stream->notifySeqnoAvailable(seqno);
     }
 }
 
 void DcpProducer::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
-    stream_t stream = findStreamByVbid(vbucket);
+    auto stream = findStream(vbucket);
     if (stream) {
         LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") State changed to "
             "%s, closing active stream!",
@@ -780,22 +895,12 @@ void DcpProducer::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
 bool DcpProducer::handleSlowStream(uint16_t vbid,
                                    const std::string &name) {
     if (supportsCursorDropping) {
-        stream_t stream = findStreamByVbid(vbid);
+        auto stream = findStream(vbid);
         if (stream) {
             if (stream->getName().compare(name) == 0) {
                 ActiveStream* as = static_cast<ActiveStream*>(stream.get());
-                if (as) {
-                    LOG(EXTENSION_LOG_NOTICE, "%s (vb %" PRIu16 ")  Producer "
-                        "is handling slow stream;"
-                        " state:%s lastReadSeqno:%" PRIu64
-                        " lastSentSeqno:%" PRIu64,
-                        logHeader(), vbid,
-                        Stream::stateName(as->getState()),
-                        as->getLastReadSeqno(),
-                        as->getLastSentSeqno());
-                    as->handleSlowStream();
-                    return true;
-                }
+                as->handleSlowStream();
+                return true;
             }
         }
     }
@@ -803,16 +908,21 @@ bool DcpProducer::handleSlowStream(uint16_t vbid,
 }
 
 void DcpProducer::closeAllStreams() {
+    lastReceiveTime = ep_current_time();
     std::vector<uint16_t> vbvector;
     {
-        WriterLockHolder wlh(streamsMutex);
-        while (!streams.empty()) {
-            std::map<uint16_t, stream_t>::iterator itr = streams.begin();
-            uint16_t vbid = itr->first;
-            itr->second->setDead(END_STREAM_DISCONNECTED);
-            streams.erase(vbid);
-            vbvector.push_back(vbid);
-        }
+        // Need to synchronise the disconnect and clear, therefore use
+        // external locking here.
+        std::lock_guard<StreamsMap> guard(streams);
+
+        streams.for_each(
+            [&vbvector](StreamsMap::value_type& iter) {
+                vbvector.push_back(iter.first);
+                iter.second->setDead(END_STREAM_DISCONNECTED);
+            },
+            guard);
+
+        streams.clear(guard);
     }
     connection_t conn(this);
     for (const auto vbid: vbvector) {
@@ -853,12 +963,10 @@ DcpResponse* DcpProducer::getNextItem() {
             DcpResponse* op = NULL;
             stream_t stream;
             {
-                ReaderLockHolder rlh(streamsMutex);
-                std::map<uint16_t, stream_t>::iterator it = streams.find(vbucket);
-                if (it == streams.end()) {
+                stream = findStream(vbucket);
+                if (!stream) {
                     continue;
                 }
-                stream.reset(it->second);
             }
 
             op = stream->next();
@@ -869,25 +977,28 @@ DcpResponse* DcpProducer::getNextItem() {
             }
 
             switch (op->getEvent()) {
-                case DCP_SNAPSHOT_MARKER:
-                case DCP_MUTATION:
-                case DCP_DELETION:
-                case DCP_EXPIRATION:
-                case DCP_STREAM_END:
-                case DCP_SET_VBUCKET:
+                case DcpResponse::Event::SnapshotMarker:
+                case DcpResponse::Event::Mutation:
+                case DcpResponse::Event::Deletion:
+                case DcpResponse::Event::Expiration:
+                case DcpResponse::Event::StreamEnd:
+                case DcpResponse::Event::SetVbucket:
+                case DcpResponse::Event::SystemEvent:
                     break;
                 default:
                     throw std::logic_error(
                             std::string("DcpProducer::getNextItem: "
                             "Producer (") + logHeader() + ") is attempting to "
                             "write an unexpected event:" +
-                            std::to_string(op->getEvent()));
+                            op->to_string());
             }
 
             ready.pushUnique(vbucket);
 
-            if (op->getEvent() == DCP_MUTATION || op->getEvent() == DCP_DELETION ||
-                op->getEvent() == DCP_EXPIRATION) {
+            if (op->getEvent() == DcpResponse::Event::Mutation ||
+                op->getEvent() == DcpResponse::Event::Deletion ||
+                op->getEvent() == DcpResponse::Event::Expiration ||
+                op->getEvent() == DcpResponse::Event::SystemEvent) {
                 itemsSent++;
             }
 
@@ -911,11 +1022,11 @@ void DcpProducer::setDisconnect(bool disconnect) {
     ConnHandler::setDisconnect(disconnect);
 
     if (disconnect) {
-        ReaderLockHolder rlh(streamsMutex);
-        std::map<uint16_t, stream_t>::iterator itr = streams.begin();
-        for (; itr != streams.end(); ++itr) {
-            itr->second->setDead(END_STREAM_DISCONNECTED);
-        }
+        streams.for_each(
+            [](StreamsMap::value_type& iter){
+                iter.second->setDead(END_STREAM_DISCONNECTED);
+            }
+        );
     }
 }
 
@@ -929,34 +1040,47 @@ void DcpProducer::notifyPaused(bool schedule) {
     engine_.getDcpConnMap().notifyPausedConnection(this, schedule);
 }
 
-ENGINE_ERROR_CODE DcpProducer::maybeSendNoop(struct dcp_message_producers* producers) {
-    if (!noopCtx.enabled) {
-        // Returning ENGINE_FAILED means ignore and continue without sending a noop
+ENGINE_ERROR_CODE DcpProducer::maybeDisconnect() {
+    std::chrono::seconds elapsedTime(ep_current_time() - lastReceiveTime);
+    if (noopCtx.enabled && elapsedTime > noopCtx.dcpIdleTimeout) {
+        LOG(EXTENSION_LOG_NOTICE, "%s Disconnecting because the connection"
+            " appears to be dead", logHeader());
+            return ENGINE_DISCONNECT;
+        }
+        // Returning ENGINE_FAILED means ignore and continue
+        // without disconnecting
         return ENGINE_FAILED;
-    }
-    size_t sinceTime = ep_current_time() - noopCtx.sendTime;
-    if (sinceTime <= noopCtx.noopInterval) {
-        // The time interval has not passed so ignore and continue without sending
+}
+
+ENGINE_ERROR_CODE DcpProducer::maybeSendNoop(
+        struct dcp_message_producers* producers) {
+    if (!noopCtx.enabled) {
+        // Returning ENGINE_FAILED means ignore and continue
+        // without sending a noop
         return ENGINE_FAILED;
     }
-    // The time interval has passed.  First check to see if waiting for a noop reply
-    if (noopCtx.pendingRecv) {
-        LOG(EXTENSION_LOG_NOTICE, "%s Disconnected because the connection"
-            " appears to be dead", logHeader());
-        return ENGINE_DISCONNECT;
-    }
-    // Try to send a noop to the consumer
-    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
-    ENGINE_ERROR_CODE ret = producers->noop(getCookie(), ++noopCtx.opaque);
-    ObjectRegistry::onSwitchThread(epe);
-
-    if (ret == ENGINE_SUCCESS) {
-        ret = ENGINE_WANT_MORE;
-        noopCtx.pendingRecv = true;
-        noopCtx.sendTime = ep_current_time();
-        lastSendTime = noopCtx.sendTime;
+    std::chrono::seconds elapsedTime(ep_current_time() - noopCtx.sendTime);
+
+    // Check to see if waiting for a noop reply.
+    // If not try to send a noop to the consumer if the interval has passed
+    if (!noopCtx.pendingRecv && elapsedTime >= noopCtx.dcpNoopTxInterval) {
+        EventuallyPersistentEngine *epe = ObjectRegistry::
+                onSwitchThread(NULL, true);
+        ENGINE_ERROR_CODE ret = producers->noop(getCookie(), ++noopCtx.opaque);
+        ObjectRegistry::onSwitchThread(epe);
+
+        if (ret == ENGINE_SUCCESS) {
+            ret = ENGINE_WANT_MORE;
+            noopCtx.pendingRecv = true;
+            noopCtx.sendTime = ep_current_time();
+            lastSendTime = noopCtx.sendTime;
+        }
+      return ret;
     }
-    return ret;
+    // We have already sent a noop and are awaiting a receive or
+    // the time interval has not passed.  In either case continue
+    // without sending a noop.
+    return ENGINE_FAILED;
 }
 
 bool DcpProducer::isTimeForNoop() {
@@ -969,10 +1093,11 @@ void DcpProducer::setTimeForNoop() {
 }
 
 void DcpProducer::clearQueues() {
-    WriterLockHolder wlh(streamsMutex);
-    for (const auto element: streams) {
-        element.second->clear();
-    }
+    streams.for_each(
+        [](StreamsMap::value_type& iter) {
+            iter.second->clear();
+        }
+    );
 }
 
 size_t DcpProducer::getBackfillQueueSize() {
@@ -985,16 +1110,14 @@ size_t DcpProducer::getItemsSent() {
 
 size_t DcpProducer::getItemsRemaining() {
     size_t remainingSize = 0;
-    ReaderLockHolder rlh(streamsMutex);
-    std::map<uint16_t, stream_t>::iterator itr = streams.begin();
-    for (; itr != streams.end(); ++itr) {
-        Stream *s = (itr->second).get();
-
-        if (s->getType() == STREAM_ACTIVE) {
-            ActiveStream *as = static_cast<ActiveStream *>(s);
-            remainingSize += as->getItemsRemaining();
+    streams.for_each(
+        [&remainingSize](const StreamsMap::value_type& iter) {
+            if (iter.second->isTypeActive()) {
+                ActiveStream *as = static_cast<ActiveStream *>(iter.second.get());
+                remainingSize += as->getItemsRemaining();
+            }
         }
-    }
+    );
 
     return remainingSize;
 }
@@ -1003,22 +1126,12 @@ size_t DcpProducer::getTotalBytes() {
     return totalBytesSent;
 }
 
-stream_t DcpProducer::findStreamByVbid(uint16_t vbid) {
-    ReaderLockHolder rlh(streamsMutex);
-    stream_t stream;
-    std::map<uint16_t, stream_t>::iterator itr = streams.find(vbid);
-    if (itr != streams.end()) {
-        stream = itr->second;
-    }
-    return stream;
-}
-
 std::vector<uint16_t> DcpProducer::getVBVector() {
-    ReaderLockHolder rlh(streamsMutex);
     std::vector<uint16_t> vbvector;
-    for (const auto element: streams) {
-        vbvector.push_back(element.first);
-    }
+    streams.for_each(
+        [&vbvector](StreamsMap::value_type& iter) {
+        vbvector.push_back(iter.first);
+    });
     return vbvector;
 }
 
@@ -1026,12 +1139,37 @@ bool DcpProducer::bufferLogInsert(size_t bytes) {
     return log.insert(bytes);
 }
 
-void DcpProducer::scheduleCheckpointProcessorTask(stream_t s) {
+void DcpProducer::createCheckpointProcessorTask() {
+    checkpointCreatorTask = new ActiveStreamCheckpointProcessorTask(engine_);
+}
+
+void DcpProducer::scheduleCheckpointProcessorTask() {
+    ExecutorPool::get()->schedule(checkpointCreatorTask);
+}
+
+void DcpProducer::scheduleCheckpointProcessorTask(const stream_t& s) {
+    if (!checkpointCreatorTask) {
+        throw std::logic_error(
+                "DcpProducer::scheduleCheckpointProcessorTask task is null");
+    }
     static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
         ->schedule(s);
 }
 
 void DcpProducer::clearCheckpointProcessorTaskQueues() {
+    if (!checkpointCreatorTask) {
+        throw std::logic_error(
+                "DcpProducer::clearCheckpointProcessorTaskQueues task is null");
+    }
     static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
         ->clearQueues();
 }
+
+SingleThreadedRCPtr<Stream> DcpProducer::findStream(uint16_t vbid) {
+    auto it = streams.find(vbid);
+    if (it.second) {
+        return it.first;
+    } else {
+        return SingleThreadedRCPtr<Stream>();
+    }
+}