Merge branch 'watson'
[ep-engine.git] / src / dcp / stream.cc
index 24b930b..c7156bc 100644 (file)
 #include "dcp/stream.h"
 #include "replicationthrottle.h"
 
-static const char* snapshotTypeToString(snapshot_type_t type) {
-    static const char * const snapshotTypes[] = { "none", "disk", "memory" };
-    if (type < none || type > memory) {
-        throw std::invalid_argument("snapshotTypeToString: type (which is " +
-                                    std::to_string(type) +
-                                    ") is not a valid snapshot_type_t");
+#include <memory>
+
+
+const char* to_string(Stream::Snapshot type) {
+    switch (type) {
+    case Stream::Snapshot::None:
+        return "none";
+    case Stream::Snapshot::Disk:
+        return "disk";
+    case Stream::Snapshot::Memory:
+        return "memory";
+    }
+    throw std::logic_error("to_string(Stream::Snapshot): called with invalid "
+            "Snapshot type:" + std::to_string(int(type)));
+}
+
+const std::string to_string(Stream::Type type) {
+    switch (type) {
+    case Stream::Type::Active:
+        return "Active";
+    case Stream::Type::Notifier:
+        return "Notifier";
+    case Stream::Type::Passive:
+        return "Passive";
     }
-    return snapshotTypes[type];
+    throw std::logic_error("to_string(Stream::Type): called with invalid "
+            "type:" + std::to_string(int(type)));
 }
 
 const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
@@ -46,12 +65,19 @@ const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
 Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
                uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
                uint64_t vb_uuid, uint64_t snap_start_seqno,
-               uint64_t snap_end_seqno)
-    : name_(name), flags_(flags), opaque_(opaque), vb_(vb),
-      start_seqno_(start_seqno), end_seqno_(end_seqno), vb_uuid_(vb_uuid),
+               uint64_t snap_end_seqno, Type type)
+    : name_(name),
+      flags_(flags),
+      opaque_(opaque),
+      vb_(vb),
+      start_seqno_(start_seqno),
+      end_seqno_(end_seqno),
+      vb_uuid_(vb_uuid),
       snap_start_seqno_(snap_start_seqno),
       snap_end_seqno_(snap_end_seqno),
-      state_(STREAM_PENDING), itemsReady(false),
+      state_(StreamState::Pending),
+      type_(type),
+      itemsReady(false),
       readyQ_non_meta_items(0),
       readyQueueMemory(0) {
 }
@@ -62,6 +88,55 @@ Stream::~Stream() {
     clear_UNLOCKED();
 }
 
+const std::string Stream::to_string(Stream::StreamState st) {
+    switch(st) {
+    case StreamState::Pending:
+        return "pending";
+    case StreamState::Backfilling:
+        return "backfilling";
+    case StreamState::InMemory:
+        return "in-memory";
+    case StreamState::TakeoverSend:
+        return "takeover-send";
+    case StreamState::TakeoverWait:
+        return "takeover-wait";
+    case StreamState::Reading:
+        return "reading";
+    case StreamState::Dead:
+        return "dead";
+    }
+    throw std::invalid_argument(
+        "Stream::to_string(StreamState): " + std::to_string(int(st)));
+}
+
+bool Stream::isTypeActive() const {
+    return type_ == Type::Active;
+}
+
+bool Stream::isActive() const {
+    return state_.load() != StreamState::Dead;
+}
+
+bool Stream::isBackfilling() const {
+    return state_.load() == StreamState::Backfilling;
+}
+
+bool Stream::isInMemory() const {
+    return state_.load() == StreamState::InMemory;
+}
+
+bool Stream::isPending() const {
+    return state_.load() == StreamState::Pending;
+}
+
+bool Stream::isTakeoverSend() const {
+    return state_.load() == StreamState::TakeoverSend;
+}
+
+bool Stream::isTakeoverWait() const {
+    return state_.load() == StreamState::TakeoverWait;
+}
+
 void Stream::clear_UNLOCKED() {
     while (!readyQ.empty()) {
         DcpResponse* resp = readyQ.front();
@@ -112,19 +187,6 @@ uint64_t Stream::getReadyQueueMemory() {
     return readyQueueMemory.load(std::memory_order_relaxed);
 }
 
-const char* Stream::stateName(stream_state_t st) {
-    static const char * const stateNames[] = {
-        "pending", "backfilling", "in-memory", "takeover-send", "takeover-wait",
-        "reading", "dead"
-    };
-    if (st < STREAM_PENDING || st > STREAM_DEAD) {
-        throw std::invalid_argument("Stream::stateName: st (which is " +
-                                        std::to_string(st) +
-                                        ") is not a valid stream_state_t");
-    }
-    return stateNames[st];
-}
-
 void Stream::addStats(ADD_STAT add_stat, const void *c) {
     try {
         const int bsize = 1024;
@@ -152,37 +214,58 @@ void Stream::addStats(ADD_STAT add_stat, const void *c) {
         add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
         checked_snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(),
                          vb_);
-        add_casted_stat(buffer, stateName(state_), add_stat, c);
+        add_casted_stat(buffer, to_string(state_.load()), add_stat, c);
     } catch (std::exception& error) {
         LOG(EXTENSION_LOG_WARNING,
             "Stream::addStats: Failed to build stats: %s", error.what());
     }
 }
 
-ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
-                           const std::string &n, uint32_t flags,
-                           uint32_t opaque, uint16_t vb, uint64_t st_seqno,
-                           uint64_t en_seqno, uint64_t vb_uuid,
-                           uint64_t snap_start_seqno, uint64_t snap_end_seqno)
-    :  Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
-              snap_start_seqno, snap_end_seqno),
-       isBackfillTaskRunning(false), pendingBackfill(false),
-       lastReadSeqnoUnSnapshotted(st_seqno), lastReadSeqno(st_seqno),
-       lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
-       takeoverState(vbucket_state_pending), backfillRemaining(0),
-       itemsFromMemoryPhase(0), firstMarkerSent(false), waitForSnapshot(0),
-       engine(e), producer(p),
-       payloadType((flags & DCP_ADD_STREAM_FLAG_NO_VALUE) ? KEY_ONLY :
-                                                            KEY_VALUE),
-       lastSentSnapEndSeqno(0), chkptItemsExtractionInProgress(false) {
-
+ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
+                           dcp_producer_t p,
+                           const std::string& n,
+                           uint32_t flags,
+                           uint32_t opaque,
+                           uint16_t vb,
+                           uint64_t st_seqno,
+                           uint64_t en_seqno,
+                           uint64_t vb_uuid,
+                           uint64_t snap_start_seqno,
+                           uint64_t snap_end_seqno,
+                           bool isKeyOnly)
+    : Stream(n,
+             flags,
+             opaque,
+             vb,
+             st_seqno,
+             en_seqno,
+             vb_uuid,
+             snap_start_seqno,
+             snap_end_seqno,
+             Type::Active),
+      isBackfillTaskRunning(false),
+      pendingBackfill(false),
+      lastReadSeqno(st_seqno),
+      backfillRemaining(0),
+      lastReadSeqnoUnSnapshotted(st_seqno),
+      lastSentSeqno(st_seqno),
+      curChkSeqno(st_seqno),
+      takeoverState(vbucket_state_pending),
+      itemsFromMemoryPhase(0),
+      firstMarkerSent(false),
+      waitForSnapshot(0),
+      engine(e),
+      producer(p),
+      lastSentSnapEndSeqno(0),
+      chkptItemsExtractionInProgress(false),
+      keyOnly(isKeyOnly) {
     const char* type = "";
     if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
         type = "takeover ";
         end_seqno_ = dcpMaxSeqno;
     }
 
-    RCPtr<VBucket> vbucket = engine->getVBucket(vb);
+    VBucketPtr vbucket = engine->getVBucket(vb);
     if (vbucket) {
         ReaderLockHolder rlh(vbucket->getStateLock());
         if (vbucket->getState() == vbucket_state_replica) {
@@ -201,8 +284,6 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
     backfillItems.disk = 0;
     backfillItems.sent = 0;
 
-    type_ = STREAM_ACTIVE;
-
     bufferedBackfill.bytes = 0;
     bufferedBackfill.items = 0;
 
@@ -218,139 +299,150 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
         itemsReady.store(true);
         // lock is released on leaving the scope
     }
+
+    // Finally obtain a copy of the current separator
+    currentSeparator = vbucket->getManifest().lock().getSeparator();
 }
 
 ActiveStream::~ActiveStream() {
-    transitionState(STREAM_DEAD);
+    transitionState(StreamState::Dead);
 }
 
 DcpResponse* ActiveStream::next() {
-    LockHolder lh(streamMutex);
-
-    stream_state_t initState = state_;
+    std::lock_guard<std::mutex> lh(streamMutex);
+    return next(lh);
+}
 
+DcpResponse* ActiveStream::next(std::lock_guard<std::mutex>& lh) {
     DcpResponse* response = NULL;
 
-    bool validTransition = false;
-    switch (initState) {
-        case STREAM_PENDING:
-            validTransition = true;
+    switch (state_.load()) {
+        case StreamState::Pending:
             break;
-        case STREAM_BACKFILLING:
-            validTransition = true;
-            response = backfillPhase();
+        case StreamState::Backfilling:
+            response = backfillPhase(lh);
             break;
-        case STREAM_IN_MEMORY:
-            validTransition = true;
+        case StreamState::InMemory:
             response = inMemoryPhase();
             break;
-        case STREAM_TAKEOVER_SEND:
-            validTransition = true;
+        case StreamState::TakeoverSend:
             response = takeoverSendPhase();
             break;
-        case STREAM_TAKEOVER_WAIT:
-            validTransition = true;
+        case StreamState::TakeoverWait:
             response = takeoverWaitPhase();
             break;
-        case STREAM_READING:
+        case StreamState::Reading:
             // Not valid for an active stream.
+            throw std::logic_error("ActiveStream::next: Invalid state "
+                    "StreamReading for stream " +
+                    std::string(producer->logHeader()) + " vb " +
+                    std::to_string(vb_));
             break;
-        case STREAM_DEAD:
-            validTransition = true;
+        case StreamState::Dead:
             response = deadPhase();
             break;
     }
 
-    if (!validTransition) {
-        throw std::invalid_argument("ActiveStream::transitionState:"
-                " invalid state " + std::to_string(state_) + " for stream " +
-                producer->logHeader() + " vb " + std::to_string(vb_));
-    }
-
-    stream_state_t newState = state_;
-
-    if (newState != STREAM_DEAD && newState != state_ && !response) {
-        lh.unlock();
-        return next();
-    }
-
     itemsReady.store(response ? true : false);
     return response;
 }
 
 void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
-    LockHolder lh(streamMutex);
-    uint64_t chkCursorSeqno = endSeqno;
+    {
+        LockHolder lh(streamMutex);
+        uint64_t chkCursorSeqno = endSeqno;
 
-    if (state_ != STREAM_BACKFILLING) {
-        return;
-    }
+        if (!isBackfilling()) {
+            producer->getLogger().log(EXTENSION_LOG_WARNING,
+                                      "(vb %" PRIu16 ") ActiveStream::"
+                                      "markDiskSnapshot: Unexpected state_:%s",
+                                      vb_, to_string(state_.load()).c_str());
+            return;
+        }
 
-    startSeqno = std::min(snap_start_seqno_, startSeqno);
-    firstMarkerSent = true;
+        startSeqno = std::min(snap_start_seqno_, startSeqno);
+        firstMarkerSent = true;
 
-    RCPtr<VBucket> vb = engine->getVBucket(vb_);
-    // An atomic read of vbucket state without acquiring the
-    // reader lock for state should suffice here.
-    if (vb && vb->getState() == vbucket_state_replica) {
-        if (end_seqno_ > endSeqno) {
-            /* We possibly have items in the open checkpoint
-               (incomplete snapshot) */
-            snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
-            producer->getLogger().log(EXTENSION_LOG_NOTICE,
-                "(vb %" PRIu16 ") Merging backfill and memory snapshot for a "
-                "replica vbucket, backfill start seqno %" PRIu64 ", "
-                "backfill end seqno %" PRIu64 ", "
-                "snapshot end seqno after merge %" PRIu64,
-                vb_, startSeqno, endSeqno, info.range.end);
-            endSeqno = info.range.end;
+        VBucketPtr vb = engine->getVBucket(vb_);
+        if (!vb) {
+            producer->getLogger().log(EXTENSION_LOG_WARNING,"(vb %" PRIu16 ") "
+                                      "ActiveStream::markDiskSnapshot, vbucket "
+                                      "does not exist", vb_);
+            return;
+        }
+        // An atomic read of vbucket state without acquiring the
+        // reader lock for state should suffice here.
+        if (vb->getState() == vbucket_state_replica) {
+            if (end_seqno_ > endSeqno) {
+                /* We possibly have items in the open checkpoint
+                   (incomplete snapshot) */
+                snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
+                producer->getLogger().log(EXTENSION_LOG_NOTICE,
+                    "(vb %" PRIu16 ") Merging backfill and memory snapshot for a "
+                    "replica vbucket, backfill start seqno %" PRIu64 ", "
+                    "backfill end seqno %" PRIu64 ", "
+                    "snapshot end seqno after merge %" PRIu64,
+                    vb_, startSeqno, endSeqno, info.range.end);
+                endSeqno = info.range.end;
+            }
         }
-    }
-
-    producer->getLogger().log(EXTENSION_LOG_NOTICE,
-        "(vb %" PRIu16 ") Sending disk snapshot with start seqno %" PRIu64
-        " and end seqno %" PRIu64, vb_, startSeqno, endSeqno);
-    pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
-                                    MARKER_FLAG_DISK));
-    lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
 
-    if (!vb) {
-        endStream(END_STREAM_STATE);
-    } else if (!(flags_ & DCP_ADD_STREAM_FLAG_DISKONLY)) {
-        // Only re-register the cursor if we still need to get memory snapshots
-        CursorRegResult result =
-            vb->checkpointManager.registerCursorBySeqno(
-                                                name_, chkCursorSeqno,
-                                                MustSendCheckpointEnd::NO);
-        curChkSeqno = result.first;
+        producer->getLogger().log(EXTENSION_LOG_NOTICE,
+            "(vb %" PRIu16 ") Sending disk snapshot with start seqno %" PRIu64
+            " and end seqno %" PRIu64, vb_, startSeqno, endSeqno);
+        pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
+                                        MARKER_FLAG_DISK));
+        lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
+
+        if (!(flags_ & DCP_ADD_STREAM_FLAG_DISKONLY)) {
+            // Only re-register the cursor if we still need to get memory
+            // snapshots
+            try {
+                CursorRegResult result =
+                        vb->checkpointManager.registerCursorBySeqno(
+                                name_, chkCursorSeqno,
+                                MustSendCheckpointEnd::NO);
+
+                curChkSeqno = result.first;
+            } catch(std::exception& error) {
+                producer->getLogger().log(EXTENSION_LOG_WARNING,
+                        "(vb %" PRIu16 ") Failed to register cursor: %s",
+                        vb_, error.what());
+                endStream(END_STREAM_STATE);
+            }
+        }
     }
-
-    lh.unlock();
     bool inverse = false;
     if (itemsReady.compare_exchange_strong(inverse, true)) {
         producer->notifyStreamReady(vb_);
     }
 }
 
-bool ActiveStream::backfillReceived(Item* itm, backfill_source_t backfill_source) {
-    if (nullptr == itm) {
+bool ActiveStream::backfillReceived(std::unique_ptr<Item> itm,
+                                    backfill_source_t backfill_source,
+                                    bool force) {
+    if (!itm) {
         return false;
     }
 
     if (itm->shouldReplicate()) {
-        LockHolder lh(streamMutex);
-        if (state_ == STREAM_BACKFILLING) {
-            if (!producer->recordBackfillManagerBytesRead(itm->size())) {
-                delete itm;
+        std::unique_lock<std::mutex> lh(streamMutex);
+        if (isBackfilling()) {
+            queued_item qi(std::move(itm));
+            std::unique_ptr<DcpResponse> resp(makeResponseFromItem(qi));
+            if (!producer->recordBackfillManagerBytesRead(
+                        resp->getApproximateSize(), force)) {
+                // Deleting resp may also delete itm (which is owned by resp)
+                resp.reset();
                 return false;
             }
 
-            bufferedBackfill.bytes.fetch_add(itm->size());
+            bufferedBackfill.bytes.fetch_add(resp->getApproximateSize());
             bufferedBackfill.items++;
+            lastReadSeqno.store(uint64_t(*resp->getBySeqno()));
 
-            pushToReadyQ(new MutationResponse(itm, opaque_, nullptr));
+            pushToReadyQ(resp.release());
 
-            lastReadSeqno.store(itm->getBySeqno());
             lh.unlock();
             bool inverse = false;
             if (itemsReady.compare_exchange_strong(inverse, true)) {
@@ -362,11 +454,7 @@ bool ActiveStream::backfillReceived(Item* itm, backfill_source_t backfill_source
             } else {
                 backfillItems.disk++;
             }
-        } else {
-            delete itm;
         }
-    } else {
-        delete itm;
     }
 
     return true;
@@ -375,33 +463,20 @@ bool ActiveStream::backfillReceived(Item* itm, backfill_source_t backfill_source
 void ActiveStream::completeBackfill() {
     {
         LockHolder lh(streamMutex);
-        if (state_ == STREAM_BACKFILLING) {
+        if (isBackfilling()) {
             producer->getLogger().log(EXTENSION_LOG_NOTICE,
                     "(vb %" PRIu16 ") Backfill complete, %" PRIu64 " items "
                     "read from disk, %" PRIu64 " from memory, last seqno read: "
-                    "%" PRIu64 "\n", vb_, uint64_t(backfillItems.disk.load()),
+                    "%" PRIu64 ", pendingBackfill : %s",
+                    vb_, uint64_t(backfillItems.disk.load()),
                     uint64_t(backfillItems.memory.load()),
-                    lastReadSeqno.load());
-
-            isBackfillTaskRunning = false;
-            if (pendingBackfill) {
-                scheduleBackfill_UNLOCKED(true);
-                pendingBackfill = false;
-            }
-
-            bool expected = false;
-            if (itemsReady.compare_exchange_strong(expected, true)) {
-                producer->notifyStreamReady(vb_);
-            }
-
-            /**
-              * MB-22451: It is important that we return here because
-              * scheduleBackfill_UNLOCKED(true) can set
-              * isBackfillTaskRunning to true.  Therefore if we don't return we
-              * will set isBackfillTaskRunning prematurely back to false, (see
-              * below).
-              */
-            return;
+                    lastReadSeqno.load(),
+                    pendingBackfill ? "True" : "False");
+        } else {
+            producer->getLogger().log(EXTENSION_LOG_WARNING,
+                    "(vb %" PRIu16 ") ActiveStream::completeBackfill: "
+                    "Unexpected state_:%s",
+                    vb_, to_string(state_.load()).c_str());
         }
     }
 
@@ -422,20 +497,20 @@ void ActiveStream::snapshotMarkerAckReceived() {
 }
 
 void ActiveStream::setVBucketStateAckRecieved() {
-    LockHolder lh(streamMutex);
-    if (state_ == STREAM_TAKEOVER_WAIT) {
+    std::unique_lock<std::mutex> lh(streamMutex);
+    if (isTakeoverWait()) {
         if (takeoverState == vbucket_state_pending) {
             producer->getLogger().log(EXTENSION_LOG_INFO,
                 "(vb %" PRIu16 ") Receive ack for set vbucket state to pending "
                 "message", vb_);
 
             takeoverState = vbucket_state_active;
-            transitionState(STREAM_TAKEOVER_SEND);
+            transitionState(StreamState::TakeoverSend);
             lh.unlock();
 
-            engine->getEpStore()->setVBucketState(vb_, vbucket_state_dead,
-                                                  false, false);
-            RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+            engine->getKVBucket()->setVBucketState(vb_, vbucket_state_dead,
+                                                   false, false);
+            VBucketPtr vbucket = engine->getVBucket(vb_);
             producer->getLogger().log(EXTENSION_LOG_NOTICE,
                 "(vb %" PRIu16 ") Vbucket marked as dead, last sent seqno: %"
                 PRIu64 ", high seqno: %" PRIu64,
@@ -455,40 +530,50 @@ void ActiveStream::setVBucketStateAckRecieved() {
     } else {
         producer->getLogger().log(EXTENSION_LOG_WARNING,
             "(vb %" PRIu16 ") Unexpected ack for set vbucket op on stream '%s' "
-            "state '%s'", vb_, name_.c_str(), stateName(state_));
+            "state '%s'", vb_, name_.c_str(), to_string(state_.load()).c_str());
     }
 
 }
 
-DcpResponse* ActiveStream::backfillPhase() {
+DcpResponse* ActiveStream::backfillPhase(std::lock_guard<std::mutex>& lh) {
     DcpResponse* resp = nextQueuedItem();
 
-    if (resp && (resp->getEvent() == DCP_MUTATION ||
-         resp->getEvent() == DCP_DELETION ||
-         resp->getEvent() == DCP_EXPIRATION)) {
-        MutationResponse* m = static_cast<MutationResponse*>(resp);
-        producer->recordBackfillManagerBytesSent(m->getItem()->size());
-        bufferedBackfill.bytes.fetch_sub(m->getItem()->size());
+    if (resp) {
+        producer->recordBackfillManagerBytesSent(resp->getApproximateSize());
+        bufferedBackfill.bytes.fetch_sub(resp->getApproximateSize());
         bufferedBackfill.items--;
-        if (backfillRemaining.load(std::memory_order_relaxed) > 0) {
-            backfillRemaining.fetch_sub(1, std::memory_order_relaxed);
+
+        // Only DcpResponse objects representing items from "disk" have a size
+        // so only update backfillRemaining when non-zero
+        if (resp->getApproximateSize()) {
+            if (backfillRemaining.load(std::memory_order_relaxed) > 0) {
+                backfillRemaining.fetch_sub(1, std::memory_order_relaxed);
+            }
         }
     }
 
     if (!isBackfillTaskRunning && readyQ.empty()) {
+        // Given readyQ.empty() is True resp will be NULL
         backfillRemaining.store(0, std::memory_order_relaxed);
-        if (lastReadSeqno.load() >= end_seqno_) {
-            endStream(END_STREAM_OK);
-        } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
-            transitionState(STREAM_TAKEOVER_SEND);
-        } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
-            endStream(END_STREAM_OK);
+        // The previous backfill has completed.  Check to see if another
+        // backfill needs to be scheduled.
+        if (pendingBackfill) {
+            scheduleBackfill_UNLOCKED(true);
+            pendingBackfill = false;
         } else {
-            transitionState(STREAM_IN_MEMORY);
-        }
+            if (lastReadSeqno.load() >= end_seqno_) {
+                endStream(END_STREAM_OK);
+            } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
+                transitionState(StreamState::TakeoverSend);
+            } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
+                endStream(END_STREAM_OK);
+            } else {
+                transitionState(StreamState::InMemory);
+            }
 
-        if (!resp) {
-            resp = nextQueuedItem();
+            if (!resp) {
+                resp = nextQueuedItem();
+            }
         }
     }
 
@@ -500,20 +585,21 @@ DcpResponse* ActiveStream::inMemoryPhase() {
         endStream(END_STREAM_OK);
     } else if (readyQ.empty()) {
         if (pendingBackfill) {
-            transitionState(STREAM_BACKFILLING);
+            // Moving the state from InMemory to Backfilling will result in a
+            // backfill being scheduled
+            transitionState(StreamState::Backfilling);
             pendingBackfill = false;
             return NULL;
         } else if (nextCheckpointItem()) {
             return NULL;
         }
     }
-
     return nextQueuedItem();
 }
 
 DcpResponse* ActiveStream::takeoverSendPhase() {
 
-    RCPtr<VBucket> vb = engine->getVBucket(vb_);
+    VBucketPtr vb = engine->getVBucket(vb_);
     if (vb && takeoverStart != 0 &&
         !vb->isTakeoverBackedUp() &&
         (ep_current_time() - takeoverStart) > takeoverSendMaxTime) {
@@ -540,7 +626,7 @@ DcpResponse* ActiveStream::takeoverSendPhase() {
     DcpResponse* resp = NULL;
     if (producer->bufferLogInsert(SetVBucketState::baseMsgBytes)) {
         resp = new SetVBucketState(opaque_, vb_, takeoverState);
-        transitionState(STREAM_TAKEOVER_WAIT);
+        transitionState(StreamState::TakeoverWait);
     }
     return resp;
 }
@@ -612,7 +698,7 @@ void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
                          name_.c_str(), vb_);
         add_casted_stat(buffer, bufferedBackfill.items, add_stat, c);
 
-        if ((state_ == STREAM_TAKEOVER_SEND) && takeoverStart != 0) {
+        if (isTakeoverSend() && takeoverStart != 0) {
             checked_snprintf(buffer, bsize, "%s:stream_%d_takeover_since",
                              name_.c_str(), vb_);
             add_casted_stat(buffer, ep_current_time() - takeoverStart, add_stat,
@@ -624,20 +710,31 @@ void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
     }
 }
 
-void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
+void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie,
+                                    const VBucket& vb) {
     LockHolder lh(streamMutex);
 
-    RCPtr<VBucket> vb = engine->getVBucket(vb_);
     add_casted_stat("name", name_, add_stat, cookie);
-    if (!vb || state_ == STREAM_DEAD) {
-        add_casted_stat("status", "completed", add_stat, cookie);
+    if (!isActive()) {
+        /**
+         *  There is not a legitimate case where the stream is already dead.
+         *  ns-server cleans-up old streams towards the end of a vbucket move.
+         *
+         *  But just in case it does happen: log the event and return status of
+         *  does_not_exist to ensure rebalance does not hang.
+         */
+        producer->getLogger().log(EXTENSION_LOG_WARNING,
+                                  "(vb %" PRIu16 ") "
+                                  "ActiveStream::addTakeoverStats: Stream has "
+                                  "status StreamDead", vb_);
+        add_casted_stat("status", "does_not_exist", add_stat, cookie);
         add_casted_stat("estimate", 0, add_stat, cookie);
         add_casted_stat("backfillRemaining", 0, add_stat, cookie);
         return;
     }
 
     size_t total = backfillRemaining.load(std::memory_order_relaxed);
-    if (state_ == STREAM_BACKFILLING) {
+    if (isBackfilling()) {
         add_casted_stat("status", "backfilling", add_stat, cookie);
     } else {
         add_casted_stat("status", "in-memory", add_stat, cookie);
@@ -646,15 +743,13 @@ void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
                     backfillRemaining.load(std::memory_order_relaxed),
                     add_stat, cookie);
 
-    item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
-    size_t vb_items = vb->getNumItems(iep);
+    size_t vb_items = vb.getNumItems();
     size_t chk_items = vb_items > 0 ?
-                vb->checkpointManager.getNumItemsForCursor(name_) : 0;
+                vb.checkpointManager.getNumItemsForCursor(name_) : 0;
 
     size_t del_items = 0;
     try {
-        del_items = engine->getEpStore()->getRWUnderlying(vb_)->
-                                                        getNumPersistedDeletes(vb_);
+        del_items = engine->getKVBucket()->getNumPersistedDeletes(vb_);
     } catch (std::runtime_error& e) {
         producer->getLogger().log(EXTENSION_LOG_WARNING,
             "ActiveStream:addTakeoverStats: exception while getting num persisted "
@@ -679,18 +774,19 @@ DcpResponse* ActiveStream::nextQueuedItem() {
     if (!readyQ.empty()) {
         DcpResponse* response = readyQ.front();
         if (producer->bufferLogInsert(response->getMessageSize())) {
-            if (response->getEvent() == DCP_MUTATION ||
-                    response->getEvent() == DCP_DELETION ||
-                    response->getEvent() == DCP_EXPIRATION) {
-                lastSentSeqno.store(
-                        dynamic_cast<MutationResponse*>(response)->getBySeqno());
+            auto seqno = response->getBySeqno();
+            if (seqno) {
+                lastSentSeqno.store(*seqno);
 
-                if (state_ == STREAM_BACKFILLING) {
+                if (isBackfilling()) {
                     backfillItems.sent++;
                 } else {
                     itemsFromMemoryPhase++;
                 }
             }
+
+            // See if the currentSeparator needs changing
+            maybeChangeSeparator(response);
             popFromReadyQ();
             return response;
         }
@@ -699,7 +795,7 @@ DcpResponse* ActiveStream::nextQueuedItem() {
 }
 
 bool ActiveStream::nextCheckpointItem() {
-    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+    VBucketPtr vbucket = engine->getVBucket(vb_);
     if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(name_) > 0) {
         // schedule this stream to build the next checkpoint
         producer->scheduleCheckpointProcessorTask(this);
@@ -750,7 +846,7 @@ void ActiveStreamCheckpointProcessorTask::wakeup() {
     ExecutorPool::get()->wake(getId());
 }
 
-void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
+void ActiveStreamCheckpointProcessorTask::schedule(const stream_t& stream) {
     pushUnique(stream);
 
     bool expected = false;
@@ -768,7 +864,7 @@ void ActiveStreamCheckpointProcessorTask::clearQueues() {
 }
 
 void ActiveStream::nextCheckpointItemTask() {
-    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+    VBucketPtr vbucket = engine->getVBucket(vb_);
     if (vbucket) {
         std::vector<queued_item> items;
         getOutstandingItems(vbucket, items);
@@ -782,14 +878,29 @@ void ActiveStream::nextCheckpointItemTask() {
     }
 }
 
-void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
+void ActiveStream::getOutstandingItems(VBucketPtr &vb,
                                        std::vector<queued_item> &items) {
     // Commencing item processing - set guard flag.
     chkptItemsExtractionInProgress.store(true);
 
+    hrtime_t _begin_ = gethrtime();
     vb->checkpointManager.getAllItemsForCursor(name_, items);
+    engine->getEpStats().dcpCursorsGetItemsHisto.add(
+                                            (gethrtime() - _begin_) / 1000);
+
     if (vb->checkpointManager.getNumCheckpoints() > 1) {
-        engine->getEpStore()->wakeUpCheckpointRemover();
+        engine->getKVBucket()->wakeUpCheckpointRemover();
+    }
+}
+
+std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
+        queued_item& item) {
+    if (item->getOperation() != queue_op::system_event) {
+        auto cKey = Collections::DocKey::make(item->getKey(), currentSeparator);
+        return std::make_unique<MutationProducerResponse>(
+                item, opaque_, isKeyOnly(), cKey.getCollectionLen());
+    } else {
+        return SystemEventProducerMessage::make(opaque_, item);
     }
 }
 
@@ -800,18 +911,15 @@ void ActiveStream::processItems(std::vector<queued_item>& items) {
             mark = true;
         }
 
-        std::deque<MutationResponse*> mutations;
+        std::deque<DcpResponse*> mutations;
         std::vector<queued_item>::iterator itr = items.begin();
         for (; itr != items.end(); ++itr) {
             queued_item& qi = *itr;
 
-            if (qi->shouldReplicate()) {
+            if (SystemEventReplicate::process(*qi) == ProcessStatus::Continue) {
                 curChkSeqno = qi->getBySeqno();
                 lastReadSeqnoUnSnapshotted = qi->getBySeqno();
-
-                mutations.push_back(new MutationResponse(qi, opaque_, nullptr,
-                            isSendMutationKeyOnlyEnabled() ? KEY_ONLY :
-                                                             KEY_VALUE));
+                mutations.push_back(makeResponseFromItem(qi).release());
             } else if (qi->getOperation() == queue_op::checkpoint_start) {
                 /* if there are already other mutations, then they belong to the
                    previous checkpoint and hence we must create a snapshot and
@@ -841,23 +949,22 @@ void ActiveStream::processItems(std::vector<queued_item>& items) {
     producer->notifyStreamReady(vb_);
 }
 
-void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
+void ActiveStream::snapshot(std::deque<DcpResponse*>& items, bool mark) {
     if (items.empty()) {
         return;
     }
 
     LockHolder lh(streamMutex);
 
-    if ((state_ == STREAM_DEAD) || (state_ == STREAM_BACKFILLING)) {
+    if (!isActive() || isBackfilling()) {
         // If stream was closed forcefully by the time the checkpoint items
         // retriever task completed, or if we decided to switch the stream to
         // backfill state from in-memory state, none of the acquired mutations
         // should be added on the stream's readyQ. We must drop items in case
         // we switch state from in-memory to backfill because we schedule
         // backfill from lastReadSeqno + 1
-        std::deque<MutationResponse *>::iterator itr = items.begin();
-        for (; itr != items.end(); ++itr) {
-            delete *itr;
+        for (auto& item : items) {
+            delete item;
         }
         items.clear();
         return;
@@ -868,14 +975,26 @@ void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
 
     if (isCurrentSnapshotCompleted()) {
         uint32_t flags = MARKER_FLAG_MEMORY;
-        uint64_t snapStart = items.front()->getBySeqno();
-        uint64_t snapEnd = items.back()->getBySeqno();
+
+        // Get OptionalSeqnos which for the items list types should have values
+        auto seqnoStart = items.front()->getBySeqno();
+        auto seqnoEnd = items.back()->getBySeqno();
+        if (!seqnoStart || !seqnoEnd) {
+            throw std::logic_error(
+                    "ActiveStream::snapshot incorrect DcpEvent, missing a "
+                    "seqno " +
+                    std::string(items.front()->to_string()) + " " +
+                    std::string(items.back()->to_string()));
+        }
+
+        uint64_t snapStart = *seqnoStart;
+        uint64_t snapEnd = *seqnoEnd;
 
         if (mark) {
             flags |= MARKER_FLAG_CHK;
         }
 
-        if (state_ == STREAM_TAKEOVER_SEND) {
+        if (isTakeoverSend()) {
             waitForSnapshot++;
             flags |= MARKER_FLAG_ACK;
         }
@@ -889,9 +1008,8 @@ void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
         lastSentSnapEndSeqno.store(snapEnd, std::memory_order_relaxed);
     }
 
-    std::deque<MutationResponse*>::iterator itemItr;
-    for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
-        pushToReadyQ(*itemItr);
+    for (const auto& item : items) {
+        pushToReadyQ(item);
     }
 }
 
@@ -910,7 +1028,7 @@ uint32_t ActiveStream::setDead(end_stream_status_t status) {
 }
 
 void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
-    if (state_ != STREAM_DEAD) {
+    if (isActive()) {
         bool inverse = false;
         if (itemsReady.compare_exchange_strong(inverse, true)) {
             producer->notifyStreamReady(vb_);
@@ -919,9 +1037,9 @@ void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
 }
 
 void ActiveStream::endStream(end_stream_status_t reason) {
-    if (state_ != STREAM_DEAD) {
+    if (isActive()) {
         pendingBackfill = false;
-        if (state_ == STREAM_BACKFILLING) {
+        if (isBackfilling()) {
             // If Stream were in Backfilling state, clear out the
             // backfilled items to clear up the backfill buffer.
             clear_UNLOCKED();
@@ -929,7 +1047,7 @@ void ActiveStream::endStream(end_stream_status_t reason) {
             bufferedBackfill.bytes = 0;
             bufferedBackfill.items = 0;
         }
-        transitionState(STREAM_DEAD);
+        transitionState(StreamState::Dead);
         if (reason != END_STREAM_DISCONNECTED) {
             pushToReadyQ(new StreamEndResponse(opaque_, reason, vb_));
         }
@@ -956,8 +1074,15 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
         return;
     }
 
-    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+    VBucketPtr vbucket = engine->getVBucket(vb_);
     if (!vbucket) {
+        producer->getLogger().log(EXTENSION_LOG_WARNING,
+                                  "(vb %" PRIu16 ") Failed to schedule "
+                                  "backfill as unable to get vbucket; "
+                                  "lastReadSeqno : %" PRIu64 ", "
+                                  "reschedule : %s",
+                                  vb_, lastReadSeqno.load(),
+                                  reschedule ? "True" : "False");
         return;
     }
 
@@ -981,19 +1106,23 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
                DCP_ADD_STREAM_FLAG_DISKONLY (the else part), end_seqno_ is
                set to last persisted seqno befor calling
                scheduleBackfill_UNLOCKED() */
-            backfillEnd = engine->getEpStore()->getLastPersistedSeqno(vb_);
+            backfillEnd = engine->getKVBucket()->getLastPersistedSeqno(vb_);
         } else {
             backfillEnd = end_seqno_;
         }
         tryBackfill = true;
     } else {
-        CursorRegResult result =
-            vbucket->checkpointManager.registerCursorBySeqno(
-                                                name_,
-                                                lastReadSeqno.load(),
-                                                MustSendCheckpointEnd::NO);
-        curChkSeqno = result.first;
-        tryBackfill = result.second;
+        try {
+            std::tie(curChkSeqno, tryBackfill) =
+                    vbucket->checkpointManager.registerCursorBySeqno(
+                            name_, lastReadSeqno.load(),
+                            MustSendCheckpointEnd::NO);
+        } catch(std::exception& error) {
+            producer->getLogger().log(EXTENSION_LOG_WARNING,
+                                      "(vb %" PRIu16 ") Failed to register "
+                                      "cursor: %s", vb_, error.what());
+            endStream(END_STREAM_STATE);
+        }
 
         if (lastReadSeqno.load() > curChkSeqno) {
             throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
@@ -1027,28 +1156,72 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
                                   "from %" PRIu64 " to %" PRIu64 ", reschedule "
                                   "flag : %s", vb_, backfillStart, backfillEnd,
                                   reschedule ? "True" : "False");
-        producer->scheduleBackfillManager(this, backfillStart, backfillEnd);
+        producer->scheduleBackfillManager(
+                *vbucket, this, backfillStart, backfillEnd);
         isBackfillTaskRunning.store(true);
     } else {
+        if (reschedule) {
+            // Infrequent code path, see comment below.
+            producer->getLogger().log(EXTENSION_LOG_NOTICE,
+                                      "(vb %" PRIu16 ") Did not schedule "
+                                      "backfill with reschedule : True, "
+                                      "tryBackfill : True; "
+                                      "backfillStart : %" PRIu64 ", "
+                                      "backfillEnd : %" PRIu64 ", "
+                                      "flags_ : %" PRIu32 ", "
+                                      "start_seqno_ : %" PRIu64 ", "
+                                      "end_seqno_ : %" PRIu64 ", "
+                                      "lastReadSeqno : %" PRIu64 ", "
+                                      "lastSentSeqno : %" PRIu64 ", "
+                                      "curChkSeqno : %" PRIu64 ", "
+                                      "itemsReady : %s",
+                                      vb_, backfillStart, backfillEnd, flags_,
+                                      start_seqno_, end_seqno_,
+                                      lastReadSeqno.load(),
+                                      lastSentSeqno.load(), curChkSeqno.load(),
+                                      itemsReady ? "True" : "False");
+
+            /* Cursor was dropped, but we will not do backfill.
+             * This may happen in a corner case where, the memory usage is high
+             * due to other vbuckets and persistence cursor moves ahead of
+             * replication cursor to new checkpoint open but does not persist
+             * items yet.
+             *
+             * Because we dropped the cursor but did not do a backfill (and
+             * therefore did not re-register a cursor in markDiskSnapshot) we
+             * must re-register the cursor here.
+             */
+            try {
+                CursorRegResult result =
+                            vbucket->checkpointManager.registerCursorBySeqno(
+                            name_, lastReadSeqno.load(),
+                            MustSendCheckpointEnd::NO);
+
+                    curChkSeqno = result.first;
+            } catch (std::exception& error) {
+                producer->getLogger().log(EXTENSION_LOG_WARNING,
+                                          "(vb %" PRIu16 ") Failed to register "
+                                          "cursor: %s", vb_, error.what());
+                endStream(END_STREAM_STATE);
+            }
+        }
         if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
             endStream(END_STREAM_OK);
         } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
-            transitionState(STREAM_TAKEOVER_SEND);
+            transitionState(StreamState::TakeoverSend);
         } else {
-            transitionState(STREAM_IN_MEMORY);
+            transitionState(StreamState::InMemory);
         }
         if (reschedule) {
-            /* Cursor was dropped, but we will not do backfill.
-               This may happen in a corner case where, the memory
-               usage is high due to other vbuckets and persistence cursor moves
-               ahead of replication cursor to new checkpoint open but does not
-               persist items yet.
-               Note: (1) We must not notify when we schedule backfill for the
-                         first time because the stream is not yet in producer
-                         conn list of streams 
-                     (2) It is not absolutely necessary to notify immediately
-                         as conn manager or an incoming items will cause a
-                         notification eventually, but wouldn't hurt to do so */
+            /*
+             * It is not absolutely necessary to notify immediately as conn
+             * manager or an incoming item will cause a notification eventually,
+             * but wouldn't hurt to do so.
+             *
+             * Note: must not notify when we schedule a backfill for the first
+             * time (i.e. when reschedule is false) because the stream is not
+             * yet in producer conn list of streams.
+             */
             bool inverse = false;
             if (itemsReady.compare_exchange_strong(inverse, true)) {
                 producer->notifyStreamReady(vb_);
@@ -1060,33 +1233,35 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
 void ActiveStream::handleSlowStream()
 {
     LockHolder lh(streamMutex);
+    producer->getLogger().log(EXTENSION_LOG_NOTICE,
+                              "(vb %" PRIu16 ") Handling slow stream; "
+                              "state_ : %s, "
+                              "lastReadSeqno : %" PRIu64 ", "
+                              "lastSentSeqno : %" PRIu64 ", "
+                              "isBackfillTaskRunning : %s",
+                              vb_, to_string(state_.load()).c_str(),
+                              lastReadSeqno.load(),
+                              lastSentSeqno.load(),
+                              isBackfillTaskRunning.load() ? "True" : "False");
     switch (state_.load()) {
-        case STREAM_BACKFILLING:
-            if (isBackfillTaskRunning.load()) {
-                /* Drop the existing cursor and set pending backfill */
-                dropCheckpointCursor_UNLOCKED();
-                pendingBackfill = true;
-            } else {
-                scheduleBackfill_UNLOCKED(true);
-            }
-            break;
-        case STREAM_IN_MEMORY:
+        case StreamState::Backfilling:
+        case StreamState::InMemory:
             /* Drop the existing cursor and set pending backfill */
             dropCheckpointCursor_UNLOCKED();
             pendingBackfill = true;
             break;
-        case STREAM_TAKEOVER_SEND:
+        case StreamState::TakeoverSend:
             /* To be handled later if needed */
-        case STREAM_TAKEOVER_WAIT:
+        case StreamState::TakeoverWait:
             /* To be handled later if needed */
-        case STREAM_DEAD:
+        case StreamState::Dead:
             /* To be handled later if needed */
             break;
-        case STREAM_PENDING:
-        case STREAM_READING:
+        case StreamState::Pending:
+        case StreamState::Reading:
             throw std::logic_error("ActiveStream::handleSlowStream: "
                                    "called with state " +
-                                   std::to_string(state_.load()) + " . " +
+                                   to_string(state_.load()) + " "
                                    "for stream " + producer->logHeader() +
                                    "; vb " + std::to_string(vb_));
     }
@@ -1105,16 +1280,19 @@ const char* ActiveStream::getEndStreamStatusStr(end_stream_status_t status)
         return "The stream closed early because the conn was disconnected";
     case END_STREAM_SLOW:
         return "The stream was closed early because it was too slow";
+    case END_STREAM_BACKFILL_FAIL:
+        return "The stream closed early due to backfill failure";
     }
     std::string msg("Status unknown: " + std::to_string(status) +
                     "; this should not have happened!");
     return msg.c_str();
 }
 
-void ActiveStream::transitionState(stream_state_t newState) {
+void ActiveStream::transitionState(StreamState newState) {
     producer->getLogger().log(EXTENSION_LOG_DEBUG,
                               "(vb %d) Transitioning from %s to %s",
-                              vb_, stateName(state_), stateName(newState));
+                              vb_, to_string(state_.load()).c_str(),
+                              to_string(newState).c_str());
 
     if (state_ == newState) {
         return;
@@ -1122,38 +1300,42 @@ void ActiveStream::transitionState(stream_state_t newState) {
 
     bool validTransition = false;
     switch (state_.load()) {
-        case STREAM_PENDING:
-            if (newState == STREAM_BACKFILLING || newState == STREAM_DEAD) {
+        case StreamState::Pending:
+            if (newState == StreamState::Backfilling ||
+                    newState == StreamState::Dead) {
                 validTransition = true;
             }
             break;
-        case STREAM_BACKFILLING:
-            if(newState == STREAM_IN_MEMORY ||
-               newState == STREAM_TAKEOVER_SEND ||
-               newState == STREAM_DEAD) {
+        case StreamState::Backfilling:
+            if(newState == StreamState::InMemory ||
+               newState == StreamState::TakeoverSend ||
+               newState == StreamState::Dead) {
                 validTransition = true;
             }
             break;
-        case STREAM_IN_MEMORY:
-            if (newState == STREAM_BACKFILLING || newState == STREAM_DEAD) {
+        case StreamState::InMemory:
+            if (newState == StreamState::Backfilling ||
+                    newState == StreamState::Dead) {
                 validTransition = true;
             }
             break;
-        case STREAM_TAKEOVER_SEND:
-            if (newState == STREAM_TAKEOVER_WAIT || newState == STREAM_DEAD) {
+        case StreamState::TakeoverSend:
+            if (newState == StreamState::TakeoverWait ||
+                    newState == StreamState::Dead) {
                 validTransition = true;
             }
             break;
-        case STREAM_TAKEOVER_WAIT:
-            if (newState == STREAM_TAKEOVER_SEND || newState == STREAM_DEAD) {
+        case StreamState::TakeoverWait:
+            if (newState == StreamState::TakeoverSend ||
+                    newState == StreamState::Dead) {
                 validTransition = true;
             }
             break;
-        case STREAM_READING:
+        case StreamState::Reading:
             // Active stream should never be in READING state.
             validTransition = false;
             break;
-        case STREAM_DEAD:
+        case StreamState::Dead:
             // Once DEAD, no other transitions should occur.
             validTransition = false;
             break;
@@ -1161,23 +1343,23 @@ void ActiveStream::transitionState(stream_state_t newState) {
 
     if (!validTransition) {
         throw std::invalid_argument("ActiveStream::transitionState:"
-                " newState (which is " + std::to_string(newState) +
+                " newState (which is " + to_string(newState) +
                 ") is not valid for current state (which is " +
-                std::to_string(state_) + ")");
+                to_string(state_.load()) + ")");
     }
 
-    stream_state_t oldState = state_.load();
+    StreamState oldState = state_.load();
     state_ = newState;
 
     switch (newState) {
-        case STREAM_BACKFILLING:
-            if (STREAM_PENDING == oldState) {
+        case StreamState::Backfilling:
+            if (StreamState::Pending == oldState) {
                 scheduleBackfill_UNLOCKED(false /* reschedule */);
-            } else if (STREAM_IN_MEMORY == oldState) {
+            } else if (StreamState::InMemory == oldState) {
                 scheduleBackfill_UNLOCKED(true /* reschedule */);
             }
             break;
-        case STREAM_IN_MEMORY:
+        case StreamState::InMemory:
             // Check if the producer has sent up till the last requested
             // sequence number already, if not - move checkpoint items into
             // the ready queue.
@@ -1192,31 +1374,32 @@ void ActiveStream::transitionState(stream_state_t newState) {
                 nextCheckpointItem();
             }
             break;
-        case STREAM_TAKEOVER_SEND:
+        case StreamState::TakeoverSend:
             takeoverStart = ep_current_time();
             nextCheckpointItem();
             break;
-        case STREAM_DEAD:
+        case StreamState::Dead:
             {
-                RCPtr<VBucket> vb = engine->getVBucket(vb_);
+                VBucketPtr vb = engine->getVBucket(vb_);
                 if (vb) {
                     vb->checkpointManager.removeCursor(name_);
                 }
                 break;
             }
-        case STREAM_TAKEOVER_WAIT:
-        case STREAM_PENDING:
+        case StreamState::TakeoverWait:
+        case StreamState::Pending:
             break;
-        case STREAM_READING:
+        case StreamState::Reading:
             throw std::logic_error("ActiveStream::transitionState:"
-                    " newState can't be " + std::to_string(newState) + "!");
+                    " newState can't be " + to_string(newState) +
+                    "!");
     }
 }
 
 size_t ActiveStream::getItemsRemaining() {
-    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+    VBucketPtr vbucket = engine->getVBucket(vb_);
 
-    if (!vbucket || state_ == STREAM_DEAD) {
+    if (!vbucket || !isActive()) {
         return 0;
     }
 
@@ -1240,14 +1423,9 @@ const Logger& ActiveStream::getLogger() const
     return producer->getLogger();
 }
 
-bool ActiveStream::isSendMutationKeyOnlyEnabled() const
-{
-    return (KEY_ONLY == payloadType);
-}
-
 bool ActiveStream::isCurrentSnapshotCompleted() const
 {
-    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+    VBucketPtr vbucket = engine->getVBucket(vb_);
     // An atomic read of vbucket state without acquiring the
     // reader lock for state should suffice here.
     if (vbucket && vbucket->getState() == vbucket_state_replica) {
@@ -1261,7 +1439,7 @@ bool ActiveStream::isCurrentSnapshotCompleted() const
 
 void ActiveStream::dropCheckpointCursor_UNLOCKED()
 {
-    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+    VBucketPtr vbucket = engine->getVBucket(vb_);
     if (!vbucket) {
         endStream(END_STREAM_STATE);
         bool inverse = false;
@@ -1273,6 +1451,16 @@ void ActiveStream::dropCheckpointCursor_UNLOCKED()
     vbucket->checkpointManager.removeCursor(name_);
 }
 
+void ActiveStream::maybeChangeSeparator(DcpResponse* response) {
+    if (response->getEvent() == DcpResponse::Event::SystemEvent) {
+        auto se = static_cast<SystemEventProducerMessage*>(response);
+        if (se->getSystemEvent() == SystemEvent::CollectionsSeparatorChanged) {
+            currentSeparator =
+                    std::string(se->getKey().data(), se->getKey().size());
+        }
+    }
+}
+
 NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
                                const std::string &name, uint32_t flags,
                                uint32_t opaque, uint16_t vb, uint64_t st_seqno,
@@ -1280,27 +1468,25 @@ NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
                                uint64_t snap_start_seqno,
                                uint64_t snap_end_seqno)
     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
-             snap_start_seqno, snap_end_seqno),
+             snap_start_seqno, snap_end_seqno, Type::Notifier),
       producer(p) {
     LockHolder lh(streamMutex);
-    RCPtr<VBucket> vbucket = e->getVBucket(vb_);
+    VBucketPtr vbucket = e->getVBucket(vb_);
     if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
         pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
-        transitionState(STREAM_DEAD);
+        transitionState(StreamState::Dead);
         itemsReady.store(true);
     }
 
-    type_ = STREAM_NOTIFIER;
-
     producer->getLogger().log(EXTENSION_LOG_NOTICE,
         "(vb %d) stream created with start seqno %" PRIu64 " and end seqno %"
         PRIu64, vb, st_seqno, en_seqno);
 }
 
 uint32_t NotifierStream::setDead(end_stream_status_t status) {
-    LockHolder lh(streamMutex);
-    if (state_ != STREAM_DEAD) {
-        transitionState(STREAM_DEAD);
+    std::unique_lock<std::mutex> lh(streamMutex);
+    if (isActive()) {
+        transitionState(StreamState::Dead);
         if (status != END_STREAM_DISCONNECTED) {
             pushToReadyQ(new StreamEndResponse(opaque_, status, vb_));
             lh.unlock();
@@ -1314,10 +1500,10 @@ uint32_t NotifierStream::setDead(end_stream_status_t status) {
 }
 
 void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
-    LockHolder lh(streamMutex);
-    if (state_ != STREAM_DEAD && start_seqno_ < seqno) {
+    std::unique_lock<std::mutex> lh(streamMutex);
+    if (isActive() && start_seqno_ < seqno) {
         pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
-        transitionState(STREAM_DEAD);
+        transitionState(StreamState::Dead);
         lh.unlock();
         bool inverse = false;
         if (itemsReady.compare_exchange_strong(inverse, true)) {
@@ -1344,10 +1530,10 @@ DcpResponse* NotifierStream::next() {
     return response;
 }
 
-void NotifierStream::transitionState(stream_state_t newState) {
+void NotifierStream::transitionState(StreamState newState) {
     producer->getLogger().log(EXTENSION_LOG_DEBUG,
         "(vb %d) Transitioning from %s to %s", vb_,
-        stateName(state_), stateName(newState));
+        to_string(state_.load()).c_str(), to_string(newState).c_str());
 
     if (state_ == newState) {
         return;
@@ -1355,27 +1541,27 @@ void NotifierStream::transitionState(stream_state_t newState) {
 
     bool validTransition = false;
     switch (state_.load()) {
-        case STREAM_PENDING:
-            if (newState == STREAM_DEAD) {
+        case StreamState::Pending:
+            if (newState == StreamState::Dead) {
                 validTransition = true;
             }
             break;
 
-        case STREAM_BACKFILLING:
-        case STREAM_IN_MEMORY:
-        case STREAM_TAKEOVER_SEND:
-        case STREAM_TAKEOVER_WAIT:
-        case STREAM_READING:
-        case STREAM_DEAD:
+        case StreamState::Backfilling:
+        case StreamState::InMemory:
+        case StreamState::TakeoverSend:
+        case StreamState::TakeoverWait:
+        case StreamState::Reading:
+        case StreamState::Dead:
             // No other state transitions are valid for a notifier stream.
             break;
     }
 
     if (!validTransition) {
         throw std::invalid_argument("NotifierStream::transitionState:"
-                " newState (which is " + std::to_string(newState) +
+                " newState (which is " + to_string(newState) +
                 ") is not valid for current state (which is " +
-                std::to_string(state_) + ")");
+                to_string(state_.load()) + ")");
     }
     state_ = newState;
 }
@@ -1387,28 +1573,17 @@ PassiveStream::PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t c,
                              uint64_t snap_start_seqno, uint64_t snap_end_seqno,
                              uint64_t vb_high_seqno)
     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
-             snap_start_seqno, snap_end_seqno),
+             snap_start_seqno, snap_end_seqno, Type::Passive),
       engine(e), consumer(c), last_seqno(vb_high_seqno), cur_snapshot_start(0),
-      cur_snapshot_end(0), cur_snapshot_type(none), cur_snapshot_ack(false) {
+      cur_snapshot_end(0), cur_snapshot_type(Snapshot::None), cur_snapshot_ack(false) {
     LockHolder lh(streamMutex);
-    pushToReadyQ(new StreamRequest(vb, opaque, flags, st_seqno, en_seqno,
-                                  vb_uuid, snap_start_seqno, snap_end_seqno));
+    streamRequest_UNLOCKED(vb_uuid);
     itemsReady.store(true);
-    type_ = STREAM_PASSIVE;
-
-    const char* type = (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) ? "takeover" : "";
-    consumer->getLogger().log(EXTENSION_LOG_NOTICE,
-        "(vb %" PRId16 ") Attempting to add %s stream"
-        " with start seqno %" PRIu64 ", end seqno %" PRIu64 ","
-        " vbucket uuid %" PRIu64 ", snap start seqno %" PRIu64 ","
-        " snap end seqno %" PRIu64 ", and vb_high_seqno %" PRIu64 "",
-        vb, type, st_seqno, en_seqno, vb_uuid,
-        snap_start_seqno, snap_end_seqno, vb_high_seqno);
 }
 
 PassiveStream::~PassiveStream() {
     uint32_t unackedBytes = clearBuffer_UNLOCKED();
-    if (transitionState(STREAM_DEAD)) {
+    if (transitionState(StreamState::Dead)) {
         // Destructed a "live" stream, log it.
         consumer->getLogger().log(EXTENSION_LOG_NOTICE,
             "(vb %" PRId16 ") Destructing stream."
@@ -1417,16 +1592,57 @@ PassiveStream::~PassiveStream() {
     }
 }
 
+void PassiveStream::streamRequest(uint64_t vb_uuid) {
+    {
+        std::unique_lock<std::mutex> lh(streamMutex);
+        streamRequest_UNLOCKED(vb_uuid);
+    }
+
+    bool expected = false;
+    if (itemsReady.compare_exchange_strong(expected, true)) {
+        consumer->notifyStreamReady(vb_);
+    }
+}
+
+void PassiveStream::streamRequest_UNLOCKED(uint64_t vb_uuid) {
+    pushToReadyQ(new StreamRequest(vb_,
+                                   opaque_,
+                                   flags_,
+                                   start_seqno_,
+                                   end_seqno_,
+                                   vb_uuid,
+                                   snap_start_seqno_,
+                                   snap_end_seqno_));
+
+    const char* type = (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER)
+        ? "takeover stream" : "stream";
+    consumer->getLogger().log(
+            EXTENSION_LOG_NOTICE,
+            "(vb %" PRId16 ") Attempting to add %s: opaque_:%" PRIu32 ", "
+            "start_seqno_:%" PRIu64 ", end_seqno_:%" PRIu64 ", "
+            "vb_uuid:%" PRIu64 ", snap_start_seqno_:%" PRIu64 ", "
+            "snap_end_seqno_:%" PRIu64 ", last_seqno:%" PRIu64,
+            vb_,
+            type,
+            opaque_,
+            start_seqno_,
+            end_seqno_,
+            vb_uuid,
+            snap_start_seqno_,
+            snap_end_seqno_,
+            last_seqno.load());
+}
+
 uint32_t PassiveStream::setDead(end_stream_status_t status) {
     /* Hold buffer lock so that we clear out all items before we set the stream
        to dead state. We do not want to add any new message to the buffer or
        process any items in the buffer once we set the stream state to dead. */
-    LockHolder lh(buffer.bufMutex);
+    std::unique_lock<std::mutex> lg(buffer.bufMutex);
     uint32_t unackedBytes = clearBuffer_UNLOCKED();
     bool killed = false;
 
     LockHolder slh(streamMutex);
-    if (transitionState(STREAM_DEAD)) {
+    if (transitionState(StreamState::Dead)) {
         killed = true;
     }
 
@@ -1444,12 +1660,12 @@ uint32_t PassiveStream::setDead(end_stream_status_t status) {
 }
 
 void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
-    LockHolder lh(streamMutex);
-    if (state_ == STREAM_PENDING) {
+    std::unique_lock<std::mutex> lh(streamMutex);
+    if (isPending()) {
         if (status == ENGINE_SUCCESS) {
-            transitionState(STREAM_READING);
+            transitionState(StreamState::Reading);
         } else {
-            transitionState(STREAM_DEAD);
+            transitionState(StreamState::Dead);
         }
         pushToReadyQ(new AddStreamResponse(add_opaque, opaque_, status));
         lh.unlock();
@@ -1460,7 +1676,7 @@ void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
     }
 }
 
-void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
+void PassiveStream::reconnectStream(VBucketPtr &vb,
                                     uint32_t new_opaque,
                                     uint64_t start_seqno) {
     vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
@@ -1480,188 +1696,180 @@ void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
         ", snap start seqno %" PRIu64 ", and snap end seqno %" PRIu64,
         vb_, new_opaque, start_seqno, end_seqno_,
         snap_start_seqno_, snap_end_seqno_);
-
-    LockHolder lh(streamMutex);
-    last_seqno.store(start_seqno);
-    pushToReadyQ(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
-                                  end_seqno_, vb_uuid_, snap_start_seqno_,
-                                  snap_end_seqno_));
-    lh.unlock();
+    {
+        LockHolder lh(streamMutex);
+        last_seqno.store(start_seqno);
+        pushToReadyQ(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
+                                      end_seqno_, vb_uuid_, snap_start_seqno_,
+                                      snap_end_seqno_));
+    }
     bool inverse = false;
     if (itemsReady.compare_exchange_strong(inverse, true)) {
         consumer->notifyStreamReady(vb_);
     }
 }
 
-ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
-    if(nullptr == resp) {
+ENGINE_ERROR_CODE PassiveStream::messageReceived(std::unique_ptr<DcpResponse> dcpResponse) {
+    if (!dcpResponse) {
         return ENGINE_EINVAL;
     }
 
-    LockHolder lh(buffer.bufMutex);
-
-    if (state_ == STREAM_DEAD) {
-        delete resp;
+    if (!isActive()) {
         return ENGINE_KEY_ENOENT;
     }
 
-    switch (resp->getEvent()) {
-        case DCP_MUTATION:
-        case DCP_DELETION:
-        case DCP_EXPIRATION:
-        {
-            MutationResponse* m = static_cast<MutationResponse*>(resp);
-            uint64_t bySeqno = m->getBySeqno();
-            if (bySeqno <= last_seqno.load()) {
-                consumer->getLogger().log(EXTENSION_LOG_WARNING,
-                    "(vb %d) Erroneous (out of sequence) mutation received, "
-                    "with opaque: %" PRIu32 ", its seqno (%" PRIu64 ") is not "
-                    "greater than last received seqno (%" PRIu64 "); "
-                    "Dropping mutation!",
-                    vb_, opaque_, bySeqno, last_seqno.load());
-                delete m;
-                return ENGINE_ERANGE;
-            }
-            last_seqno.store(bySeqno);
-            break;
-        }
-        case DCP_SNAPSHOT_MARKER:
-        {
-            SnapshotMarker* s = static_cast<SnapshotMarker*>(resp);
-            uint64_t snapStart = s->getStartSeqno();
-            uint64_t snapEnd = s->getEndSeqno();
-            if (snapStart < last_seqno.load() && snapEnd <= last_seqno.load()) {
-                consumer->getLogger().log(EXTENSION_LOG_WARNING,
-                    "(vb %d) Erroneous snapshot marker received, with "
-                    "opaque: %" PRIu32 ", its start "
-                    "(%" PRIu64 "), and end (%" PRIu64 ") are less than last "
-                    "received seqno (%" PRIu64 "); Dropping marker!",
-                    vb_, opaque_, snapStart, snapEnd, last_seqno.load());
-                delete s;
-                return ENGINE_ERANGE;
-            }
-            break;
-        }
-        case DCP_SET_VBUCKET:
-        case DCP_STREAM_END:
-        {
-            /* No validations necessary */
-            break;
+    auto seqno = dcpResponse->getBySeqno();
+    if (seqno) {
+        if (uint64_t(*seqno) <= last_seqno.load()) {
+            consumer->getLogger().log(EXTENSION_LOG_WARNING,
+                "(vb %d) Erroneous (out of sequence) message (%s) received, "
+                "with opaque: %" PRIu32 ", its seqno (%" PRIu64 ") is not "
+                "greater than last received seqno (%" PRIu64 "); "
+                "Dropping mutation!",
+                vb_, dcpResponse->to_string(), opaque_, *seqno, last_seqno.load());
+            return ENGINE_ERANGE;
         }
-        default:
-        {
+        last_seqno.store(*seqno);
+    } else if(dcpResponse->getEvent() == DcpResponse::Event::SnapshotMarker) {
+        auto s = static_cast<SnapshotMarker*>(dcpResponse.get());
+        uint64_t snapStart = s->getStartSeqno();
+        uint64_t snapEnd = s->getEndSeqno();
+        if (snapStart < last_seqno.load() && snapEnd <= last_seqno.load()) {
             consumer->getLogger().log(EXTENSION_LOG_WARNING,
-                "(vb %d) Unknown DCP op received: %d; Disconnecting connection..",
-                vb_, resp->getEvent());
-            return ENGINE_DISCONNECT;
+                "(vb %d) Erroneous snapshot marker received, with "
+                "opaque: %" PRIu32 ", its start "
+                "(%" PRIu64 "), and end (%" PRIu64 ") are less than last "
+                "received seqno (%" PRIu64 "); Dropping marker!",
+                vb_, opaque_, snapStart, snapEnd, last_seqno.load());
+            return ENGINE_ERANGE;
         }
     }
 
-    if (engine->getReplicationThrottle().shouldProcess() && !buffer.items) {
+    if (engine->getReplicationThrottle().shouldProcess() && buffer.empty()) {
         /* Process the response here itself rather than buffering it */
         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
-        switch (resp->getEvent()) {
-            case DCP_MUTATION:
-                ret = processMutation(static_cast<MutationResponse*>(resp));
+        switch (dcpResponse->getEvent()) {
+            case DcpResponse::Event::Mutation:
+                ret = processMutation(static_cast<MutationResponse*>(dcpResponse.get()));
                 break;
-            case DCP_DELETION:
-            case DCP_EXPIRATION:
-                ret = processDeletion(static_cast<MutationResponse*>(resp));
+            case DcpResponse::Event::Deletion:
+            case DcpResponse::Event::Expiration:
+                ret = processDeletion(static_cast<MutationResponse*>(dcpResponse.get()));
                 break;
-            case DCP_SNAPSHOT_MARKER:
-                processMarker(static_cast<SnapshotMarker*>(resp));
+            case DcpResponse::Event::SnapshotMarker:
+                processMarker(static_cast<SnapshotMarker*>(dcpResponse.get()));
                 break;
-            case DCP_SET_VBUCKET:
-                processSetVBucketState(static_cast<SetVBucketState*>(resp));
+            case DcpResponse::Event::SetVbucket:
+                processSetVBucketState(static_cast<SetVBucketState*>(dcpResponse.get()));
                 break;
-            case DCP_STREAM_END:
+            case DcpResponse::Event::StreamEnd:
                 {
                     LockHolder lh(streamMutex);
-                    transitionState(STREAM_DEAD);
+                    transitionState(StreamState::Dead);
                 }
                 break;
+            case DcpResponse::Event::SystemEvent: {
+                    ret = processSystemEvent(*static_cast<SystemEventMessage*>(
+                            dcpResponse.get()));
+                    break;
+                }
             default:
-                // Above switch should've returned DISCONNECT, throw an exception
-                throw std::logic_error("PassiveStream::messageReceived: (vb " +
-                                       std::to_string(vb_) +
-                                       ") received unknown message type " +
-                                       std::to_string(resp->getEvent()));
+                consumer->getLogger().log(
+                        EXTENSION_LOG_WARNING,
+                        "(vb %d) Unknown event:%d, opaque:%" PRIu32,
+                        vb_,
+                        int(dcpResponse->getEvent()),
+                        opaque_);
+                return ENGINE_DISCONNECT;
         }
         if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
-            delete resp;
             return ret;
         }
     }
 
-    buffer.messages.push(resp);
-    buffer.items++;
-    buffer.bytes += resp->getMessageSize();
-
+    // Only buffer if the stream is not dead
+    if (isActive()) {
+        buffer.push(std::move(dcpResponse));
+    }
     return ENGINE_TMPFAIL;
 }
 
 process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes,
                                                              size_t batchSize) {
-    LockHolder lh(buffer.bufMutex);
+    std::unique_lock<std::mutex> lh(buffer.bufMutex);
     uint32_t count = 0;
     uint32_t message_bytes = 0;
     uint32_t total_bytes_processed = 0;
     bool failed = false;
-    if (buffer.messages.empty()) {
-        return all_processed;
-    }
-
     while (count < batchSize && !buffer.messages.empty()) {
         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
         /* If the stream is in dead state we should not process any remaining
            items in the buffer, we should rather clear them */
-        if (state_ == STREAM_DEAD) {
+        if (!isActive()) {
             total_bytes_processed += clearBuffer_UNLOCKED();
             processed_bytes = total_bytes_processed;
             return all_processed;
         }
 
-        DcpResponse *response = buffer.messages.front();
+        std::unique_ptr<DcpResponse> response = buffer.pop_front(lh);
+
+        // Release bufMutex whilst we attempt to process the message
+        // a lock inversion exists with connManager if we hold this.
+        lh.unlock();
+
         message_bytes = response->getMessageSize();
 
         switch (response->getEvent()) {
-            case DCP_MUTATION:
-                ret = processMutation(static_cast<MutationResponse*>(response));
+            case DcpResponse::Event::Mutation:
+                ret = processMutation(static_cast<MutationResponse*>(response.get()));
                 break;
-            case DCP_DELETION:
-            case DCP_EXPIRATION:
-                ret = processDeletion(static_cast<MutationResponse*>(response));
+            case DcpResponse::Event::Deletion:
+            case DcpResponse::Event::Expiration:
+                ret = processDeletion(static_cast<MutationResponse*>(response.get()));
                 break;
-            case DCP_SNAPSHOT_MARKER:
-                processMarker(static_cast<SnapshotMarker*>(response));
+            case DcpResponse::Event::SnapshotMarker:
+                processMarker(static_cast<SnapshotMarker*>(response.get()));
                 break;
-            case DCP_SET_VBUCKET:
-                processSetVBucketState(static_cast<SetVBucketState*>(response));
+            case DcpResponse::Event::SetVbucket:
+                processSetVBucketState(static_cast<SetVBucketState*>(response.get()));
                 break;
-            case DCP_STREAM_END:
+            case DcpResponse::Event::StreamEnd:
                 {
                     LockHolder lh(streamMutex);
-                    transitionState(STREAM_DEAD);
+                    transitionState(StreamState::Dead);
                 }
                 break;
+            case DcpResponse::Event::SystemEvent: {
+                    ret = processSystemEvent(
+                            *static_cast<SystemEventMessage*>(response.get()));
+                    break;
+                }
             default:
                 consumer->getLogger().log(EXTENSION_LOG_WARNING,
                                           "PassiveStream::processBufferedMessages:"
-                                          "(vb %" PRIu16 ") PassiveStream failing "
-                                          "unknown message type %d",
-                                          vb_, response->getEvent());
-                failed = true;
+                                          "(vb %" PRIu16 ") PassiveStream ignoring "
+                                          "unknown message type %s",
+                                          vb_,
+                                          response->to_string());
+                continue;
         }
 
         if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
             failed = true;
+        }
+
+        // Re-acquire bufMutex so that
+        // 1) we can update the buffer
+        // 2) safely re-check the while conditional statement
+        lh.lock();
+
+        // If we failed and the stream is not dead, stash the DcpResponse at the
+        // front of the queue and break the loop.
+        if (failed && isActive()) {
+            buffer.push_front(std::move(response), lh);
             break;
         }
 
-        delete response;
-        buffer.messages.pop();
-        buffer.items--;
-        buffer.bytes -= message_bytes;
         count++;
         if (ret != ENGINE_ERANGE) {
             total_bytes_processed += message_bytes;
@@ -1678,20 +1886,20 @@ process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed
 }
 
 ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
-    RCPtr<VBucket> vb = engine->getVBucket(vb_);
+    VBucketPtr vb = engine->getVBucket(vb_);
     if (!vb) {
         return ENGINE_NOT_MY_VBUCKET;
     }
 
-    if (mutation->getBySeqno() < cur_snapshot_start.load() ||
-        mutation->getBySeqno() > cur_snapshot_end.load()) {
+    if (uint64_t(*mutation->getBySeqno()) < cur_snapshot_start.load() ||
+        uint64_t(*mutation->getBySeqno()) > cur_snapshot_end.load()) {
         consumer->getLogger().log(EXTENSION_LOG_WARNING,
             "(vb %d) Erroneous mutation [sequence "
             "number does not fall in the expected snapshot range : "
             "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64 ") <= "
             "snapshot_end (%" PRIu64 ")]; Dropping the mutation!",
             vb_, cur_snapshot_start.load(),
-            mutation->getBySeqno(), cur_snapshot_end.load());
+            *mutation->getBySeqno(), cur_snapshot_end.load());
         return ENGINE_ERANGE;
     }
 
@@ -1711,48 +1919,46 @@ ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
 
     ENGINE_ERROR_CODE ret;
     if (vb->isBackfillPhase()) {
-        ret = engine->getEpStore()->addTAPBackfillItem(*mutation->getItem(),
-                                                    INITIAL_NRU_VALUE,
-                                                    false,
-                                                    mutation->getExtMetaData());
+        ret = engine->getKVBucket()->addBackfillItem(
+                *mutation->getItem(),
+                GenerateBySeqno::No,
+                mutation->getExtMetaData());
     } else {
-        ret = engine->getEpStore()->setWithMeta(*mutation->getItem(), 0, NULL,
-                                                consumer->getCookie(), true,
-                                                true, INITIAL_NRU_VALUE,
-                                                GenerateBySeqno::No,
-                                                GenerateCas::No,
-                                                mutation->getExtMetaData(),
-                                                true);
+        ret = engine->getKVBucket()->setWithMeta(*mutation->getItem(), 0,
+                                                 NULL,
+                                                 consumer->getCookie(),
+                                                 true, true,
+                                                 GenerateBySeqno::No,
+                                                 GenerateCas::No,
+                                                 mutation->getExtMetaData(),
+                                                 true);
     }
 
-    // We should probably handle these error codes in a better way, but since
-    // the producer side doesn't do anything with them anyways let's just log
-    // them for now until we come up with a better solution.
     if (ret != ENGINE_SUCCESS) {
         consumer->getLogger().log(EXTENSION_LOG_WARNING,
             "Got an error code %d while trying to process mutation", ret);
     } else {
-        handleSnapshotEnd(vb, mutation->getBySeqno());
+        handleSnapshotEnd(vb, *mutation->getBySeqno());
     }
 
     return ret;
 }
 
 ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
-    RCPtr<VBucket> vb = engine->getVBucket(vb_);
+    VBucketPtr vb = engine->getVBucket(vb_);
     if (!vb) {
         return ENGINE_NOT_MY_VBUCKET;
     }
 
-    if (deletion->getBySeqno() < cur_snapshot_start.load() ||
-        deletion->getBySeqno() > cur_snapshot_end.load()) {
+    if (uint64_t(*deletion->getBySeqno()) < cur_snapshot_start.load() ||
+        uint64_t(*deletion->getBySeqno()) > cur_snapshot_end.load()) {
         consumer->getLogger().log(EXTENSION_LOG_WARNING,
             "(vb %d) Erroneous deletion [sequence "
             "number does not fall in the expected snapshot range : "
             "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64 ") <= "
             "snapshot_end (%" PRIu64 ")]; Dropping the deletion!",
             vb_, cur_snapshot_start.load(),
-            deletion->getBySeqno(), cur_snapshot_end.load());
+            *deletion->getBySeqno(), cur_snapshot_end.load());
         return ENGINE_ERANGE;
     }
 
@@ -1765,42 +1971,129 @@ ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
         LOG(EXTENSION_LOG_WARNING,
             "%s Invalid CAS (0x%" PRIx64 ") received for deletion {vb:%" PRIu16
             ", seqno:%" PRId64 "}. Regenerating new CAS",
-            consumer->logHeader(), meta.cas, vb_, deletion->getBySeqno());
+            consumer->logHeader(), meta.cas, vb_, *deletion->getBySeqno());
         meta.cas = Item::nextCas();
     }
 
-    ret = engine->getEpStore()->deleteWithMeta(deletion->getItem()->getKey(),
-                                               &delCas, NULL, deletion->getVBucket(),
-                                               consumer->getCookie(), true,
-                                               &meta, vb->isBackfillPhase(),
-                                               GenerateBySeqno::No,
-                                               GenerateCas::No,
-                                               deletion->getBySeqno(),
-                                               deletion->getExtMetaData(),
-                                               true);
+    ret = engine->getKVBucket()->deleteWithMeta(deletion->getItem()->getKey(),
+                                                delCas,
+                                                nullptr,
+                                                deletion->getVBucket(),
+                                                consumer->getCookie(),
+                                                true,
+                                                meta,
+                                                vb->isBackfillPhase(),
+                                                GenerateBySeqno::No,
+                                                GenerateCas::No,
+                                                *deletion->getBySeqno(),
+                                                deletion->getExtMetaData(),
+                                                true);
     if (ret == ENGINE_KEY_ENOENT) {
         ret = ENGINE_SUCCESS;
     }
 
-    // We should probably handle these error codes in a better way, but since
-    // the producer side doesn't do anything with them anyways let's just log
-    // them for now until we come up with a better solution.
     if (ret != ENGINE_SUCCESS) {
         consumer->getLogger().log(EXTENSION_LOG_WARNING,
             "Got an error code %d while trying to process deletion", ret);
     } else {
-        handleSnapshotEnd(vb, deletion->getBySeqno());
+        handleSnapshotEnd(vb, *deletion->getBySeqno());
     }
 
     return ret;
 }
 
+ENGINE_ERROR_CODE PassiveStream::processSystemEvent(
+        const SystemEventMessage& event) {
+    VBucketPtr vb = engine->getVBucket(vb_);
+
+    if (!vb) {
+        return ENGINE_NOT_MY_VBUCKET;
+    }
+
+    ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
+    // Depending on the event, extras is different and key may even be empty
+    // The specific handler will know how to interpret.
+    switch (event.getSystemEvent()) {
+    case SystemEvent::CreateCollection: {
+        rv = processCreateCollection(*vb, {event});
+        break;
+    }
+    case SystemEvent::BeginDeleteCollection: {
+        rv = processBeginDeleteCollection(*vb, {event});
+        break;
+    }
+    case SystemEvent::CollectionsSeparatorChanged: {
+        rv = processSeparatorChanged(*vb, {event});
+        break;
+    }
+    case SystemEvent::DeleteCollectionSoft:
+    case SystemEvent::DeleteCollectionHard: {
+        rv = ENGINE_EINVAL; // Producer won't send
+        break;
+    }
+    }
+
+    if (rv != ENGINE_SUCCESS) {
+        consumer->getLogger().log(EXTENSION_LOG_WARNING,
+            "Got an error code %d while trying to process system event", rv);
+    } else {
+        handleSnapshotEnd(vb, *event.getBySeqno());
+    }
+
+    return rv;
+}
+
+ENGINE_ERROR_CODE PassiveStream::processCreateCollection(
+        VBucket& vb, const CollectionsEvent& event) {
+    try {
+        vb.replicaAddCollection(event.getKey(),
+                                event.getRevision(),
+                                event.getBySeqno());
+    } catch (std::exception& e) {
+        LOG(EXTENSION_LOG_WARNING,
+            "PassiveStream::processCreateCollection exception %s",
+            e.what());
+        return ENGINE_EINVAL;
+    }
+    return ENGINE_SUCCESS;
+}
+
+ENGINE_ERROR_CODE PassiveStream::processBeginDeleteCollection(
+        VBucket& vb, const CollectionsEvent& event) {
+    try {
+        vb.replicaBeginDeleteCollection(event.getKey(),
+                                        event.getRevision(),
+                                        event.getBySeqno());
+    } catch (std::exception& e) {
+        LOG(EXTENSION_LOG_WARNING,
+            "PassiveStream::processBeginDeleteCollection exception %s",
+            e.what());
+        return ENGINE_EINVAL;
+    }
+    return ENGINE_SUCCESS;
+}
+
+ENGINE_ERROR_CODE PassiveStream::processSeparatorChanged(
+        VBucket& vb, const CollectionsEvent& event) {
+    try {
+        vb.replicaChangeCollectionSeparator(
+                event.getKey(), event.getRevision(), event.getBySeqno());
+    } catch (std::exception& e) {
+        LOG(EXTENSION_LOG_WARNING,
+            "PassiveStream::processSeparatorChanged exception %s",
+            e.what());
+        return ENGINE_EINVAL;
+    }
+    return ENGINE_SUCCESS;
+}
+
 void PassiveStream::processMarker(SnapshotMarker* marker) {
-    RCPtr<VBucket> vb = engine->getVBucket(vb_);
+    VBucketPtr vb = engine->getVBucket(vb_);
 
     cur_snapshot_start.store(marker->getStartSeqno());
     cur_snapshot_end.store(marker->getEndSeqno());
-    cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory);
+    cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK) ?
+            Snapshot::Disk : Snapshot::Memory);
 
     if (vb) {
         if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
@@ -1827,23 +2120,24 @@ void PassiveStream::processMarker(SnapshotMarker* marker) {
 }
 
 void PassiveStream::processSetVBucketState(SetVBucketState* state) {
-    engine->getEpStore()->setVBucketState(vb_, state->getState(), true);
-
-    LockHolder lh (streamMutex);
-    pushToReadyQ(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
-    lh.unlock();
+    engine->getKVBucket()->setVBucketState(vb_, state->getState(), true);
+    {
+        LockHolder lh (streamMutex);
+        pushToReadyQ(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
+    }
     bool inverse = false;
     if (itemsReady.compare_exchange_strong(inverse, true)) {
         consumer->notifyStreamReady(vb_);
     }
 }
 
-void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
+void PassiveStream::handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno) {
     if (byseqno == cur_snapshot_end.load()) {
-        if (cur_snapshot_type.load() == disk && vb->isBackfillPhase()) {
+        if (cur_snapshot_type.load() == Snapshot::Disk &&
+                vb->isBackfillPhase()) {
             vb->setBackfillPhase(false);
             uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
-            vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
+            vb->checkpointManager.checkAndAddNewCheckpoint(id, *vb);
         } else {
             size_t mem_threshold = engine->getEpStats().mem_high_wat.load();
             size_t mem_used = engine->getEpStats().getTotalMemoryUsed();
@@ -1851,21 +2145,22 @@ void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
                high watermark (85%) */
             if (mem_threshold < mem_used) {
                 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
-                vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
+                vb->checkpointManager.checkAndAddNewCheckpoint(id, *vb);
             }
         }
 
         if (cur_snapshot_ack) {
-            LockHolder lh(streamMutex);
-            pushToReadyQ(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
-            lh.unlock();
+            {
+                LockHolder lh(streamMutex);
+                pushToReadyQ(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
+            }
             bool inverse = false;
             if (itemsReady.compare_exchange_strong(inverse, true)) {
                 consumer->notifyStreamReady(vb_);
             }
             cur_snapshot_ack = false;
         }
-        cur_snapshot_type.store(none);
+        cur_snapshot_type.store(Snapshot::None);
     }
 }
 
@@ -1875,12 +2170,19 @@ void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
     try {
         const int bsize = 1024;
         char buf[bsize];
+        size_t bufferItems = 0;
+        size_t bufferBytes = 0;
+        {
+            std::lock_guard<std::mutex> lg(buffer.bufMutex);
+            bufferItems = buffer.messages.size();
+            bufferBytes = buffer.bytes;
+        }
         checked_snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(),
                          vb_);
-        add_casted_stat(buf, buffer.items, add_stat, c);
+        add_casted_stat(buf, bufferItems, add_stat, c);
         checked_snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(),
                          vb_);
-        add_casted_stat(buf, buffer.bytes, add_stat, c);
+        add_casted_stat(buf, bufferBytes, add_stat, c);
         checked_snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(),
                          vb_);
         add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
@@ -1893,10 +2195,10 @@ void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
 
         checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type",
                          name_.c_str(), vb_);
-        add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type.load()),
+        add_casted_stat(buf, ::to_string(cur_snapshot_type.load()),
                         add_stat, c);
 
-        if (cur_snapshot_type.load() != none) {
+        if (cur_snapshot_type.load() != Snapshot::None) {
             checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start",
                              name_.c_str(), vb_);
             add_casted_stat(buf, cur_snapshot_start.load(), add_stat, c);
@@ -1925,22 +2227,15 @@ DcpResponse* PassiveStream::next() {
 
 uint32_t PassiveStream::clearBuffer_UNLOCKED() {
     uint32_t unackedBytes = buffer.bytes;
-
-    while (!buffer.messages.empty()) {
-        DcpResponse* resp = buffer.messages.front();
-        buffer.messages.pop();
-        delete resp;
-    }
-
+    buffer.messages.clear();
     buffer.bytes = 0;
-    buffer.items = 0;
     return unackedBytes;
 }
 
-bool PassiveStream::transitionState(stream_state_t newState) {
+bool PassiveStream::transitionState(StreamState newState) {
     consumer->getLogger().log(EXTENSION_LOG_DEBUG,
         "(vb %d) Transitioning from %s to %s",
-        vb_, stateName(state_), stateName(newState));
+        vb_, to_string(state_.load()).c_str(), to_string(newState).c_str());
 
     if (state_ == newState) {
         return false;
@@ -1948,35 +2243,37 @@ bool PassiveStream::transitionState(stream_state_t newState) {
 
     bool validTransition = false;
     switch (state_.load()) {
-        case STREAM_PENDING:
-            if (newState == STREAM_READING || newState == STREAM_DEAD) {
+        case StreamState::Pending:
+            if (newState == StreamState::Reading ||
+                    newState == StreamState::Dead) {
                 validTransition = true;
             }
             break;
 
-        case STREAM_BACKFILLING:
-        case STREAM_IN_MEMORY:
-        case STREAM_TAKEOVER_SEND:
-        case STREAM_TAKEOVER_WAIT:
+        case StreamState::Backfilling:
+        case StreamState::InMemory:
+        case StreamState::TakeoverSend:
+        case StreamState::TakeoverWait:
             // Not valid for passive streams
             break;
 
-        case STREAM_READING:
-            if (newState == STREAM_PENDING || newState == STREAM_DEAD) {
+        case StreamState::Reading:
+            if (newState == StreamState::Pending ||
+                    newState == StreamState::Dead) {
                 validTransition = true;
             }
             break;
 
-        case STREAM_DEAD:
+        case StreamState::Dead:
             // Once 'dead' shouldn't transition away from it.
             break;
     }
 
     if (!validTransition) {
         throw std::invalid_argument("PassiveStream::transitionState:"
-                " newState (which is" + std::to_string(newState) +
+                " newState (which is" + to_string(newState) +
                 ") is not valid for current state (which is " +
-                std::to_string(state_) + ")");
+                to_string(state_.load()) + ")");
     }
 
     state_ = newState;