MB-16181: Add replicate/persist traits to Item 90/67990/4
authorJim Walker <jim@couchbase.com>
Mon, 26 Sep 2016 09:29:10 +0000 (10:29 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 29 Sep 2016 12:49:07 +0000 (12:49 +0000)
Provide an abstraction for the flusher and DCP that tells them
if an item should be persisted or replicated.

This provides a stepping stone towards system owned items in
checkpoints.

Change-Id: Ie5e65a2b20d0d162e1b8fe3e439219c34fb7b508
Reviewed-on: http://review.couchbase.org/67990
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/dcp/stream.cc
src/ep.cc
src/item.h

index 0b92522..257e3c6 100644 (file)
@@ -336,31 +336,36 @@ bool ActiveStream::backfillReceived(Item* itm, backfill_source_t backfill_source
     if (nullptr == itm) {
         return false;
     }
-    LockHolder lh(streamMutex);
-    if (state_ == STREAM_BACKFILLING) {
-        if (!producer->recordBackfillManagerBytesRead(itm->size())) {
-            delete itm;
-            return false;
-        }
 
-        bufferedBackfill.bytes.fetch_add(itm->size());
-        bufferedBackfill.items++;
+    if (itm->shouldReplicate()) {
+        LockHolder lh(streamMutex);
+        if (state_ == STREAM_BACKFILLING) {
+            if (!producer->recordBackfillManagerBytesRead(itm->size())) {
+                delete itm;
+                return false;
+            }
 
-        pushToReadyQ(new MutationResponse(itm, opaque_,
-                          prepareExtendedMetaData(itm->getVBucketId(),
-                                                  itm->getConflictResMode())));
+            bufferedBackfill.bytes.fetch_add(itm->size());
+            bufferedBackfill.items++;
 
-        lastReadSeqno.store(itm->getBySeqno());
-        lh.unlock();
-        bool inverse = false;
-        if (itemsReady.compare_exchange_strong(inverse, true)) {
-            producer->notifyStreamReady(vb_, false);
-        }
+            pushToReadyQ(new MutationResponse(itm, opaque_,
+                              prepareExtendedMetaData(itm->getVBucketId(),
+                                                      itm->getConflictResMode())));
+
+            lastReadSeqno.store(itm->getBySeqno());
+            lh.unlock();
+            bool inverse = false;
+            if (itemsReady.compare_exchange_strong(inverse, true)) {
+                producer->notifyStreamReady(vb_, false);
+            }
 
-        if (backfill_source == BACKFILL_FROM_MEMORY) {
-            backfillItems.memory++;
+            if (backfill_source == BACKFILL_FROM_MEMORY) {
+                backfillItems.memory++;
+            } else {
+                backfillItems.disk++;
+            }
         } else {
-            backfillItems.disk++;
+            delete itm;
         }
     } else {
         delete itm;
@@ -788,8 +793,7 @@ void ActiveStream::processItems(std::vector<queued_item>& items) {
         for (; itr != items.end(); ++itr) {
             queued_item& qi = *itr;
 
-            if (qi->getOperation() == queue_op_set ||
-                qi->getOperation() == queue_op_del) {
+            if (qi->shouldReplicate()) {
                 curChkSeqno = qi->getBySeqno();
                 lastReadSeqnoUnSnapshotted = qi->getBySeqno();
 
index 57dcf15..7e8b198 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -3384,9 +3384,10 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
             uint64_t maxDeletedRevSeqno = 0;
             std::list<PersistenceCallback*>& pcbs = rwUnderlying->getPersistenceCbList();
             std::vector<queued_item>::iterator it = items.begin();
+
             for(; it != items.end(); ++it) {
-                if ((*it)->getOperation() != queue_op_set &&
-                    (*it)->getOperation() != queue_op_del) {
+
+                if (!(*it)->shouldPersist()) {
                     continue;
                 } else if (!prev || prev->getKey() != (*it)->getKey()) {
                     prev = (*it).get();
index 0094016..23a83f5 100644 (file)
@@ -377,18 +377,18 @@ public:
         ObjectRegistry::onCreateItem(this);
     }
 
-   Item(const std::string &k, const uint16_t vb,
-        enum queue_operation o, const uint64_t revSeq,
-        const int64_t bySeq, uint8_t nru_value = INITIAL_NRU_VALUE,
-        uint8_t conflict_res_value = revision_seqno) :
-       metaData(),
-       key(k),
-       bySeqno(bySeq),
-       queuedTime(ep_current_time()),
-       vbucketId(vb),
-       op(static_cast<uint16_t>(o)),
-       nru(nru_value),
-       conflictResMode(conflict_res_value)
+    Item(const std::string &k, const uint16_t vb,
+         enum queue_operation o, const uint64_t revSeq,
+         const int64_t bySeq, uint8_t nru_value = INITIAL_NRU_VALUE,
+         uint8_t conflict_res_value = revision_seqno) :
+        metaData(),
+        key(k),
+        bySeqno(bySeq),
+        queuedTime(ep_current_time()),
+        vbucketId(vb),
+        op(static_cast<uint16_t>(o)),
+        nru(nru_value),
+        conflictResMode(conflict_res_value)
     {
        if (bySeqno < 0) {
            throw std::invalid_argument("Item(): bySeqno must be non-negative");
@@ -634,6 +634,22 @@ public:
         return static_cast<enum queue_operation>(op);
     }
 
+    /*
+     * Should this item be persisted?
+     */
+    bool shouldPersist() const {
+        return (op == queue_op_set) ||
+               (op == queue_op_del);
+    }
+
+    /*
+     * Should this item be replicated (e.g. by DCP)
+     */
+    bool shouldReplicate() const {
+        return (op == queue_op_set) ||
+               (op == queue_op_del);
+    }
+
     void setOperation(enum queue_operation o) {
         op = static_cast<uint8_t>(o);
     }