MB-20852 [17/N]: Serialize VB state changes 50/69150/11
authorDave Rigby <daver@couchbase.com>
Wed, 21 Sep 2016 15:03:47 +0000 (16:03 +0100)
committerDave Rigby <daver@couchbase.com>
Tue, 1 Nov 2016 08:06:23 +0000 (08:06 +0000)
Background/Problem:

MB-20852 exposed an issue with how VBucket state was persisted to disk
- specifically that state can be persisted to disk out of order, and
intermediate state changes could be dropped (not persisted at all).

These problems are caused by the asynchronous tasks which are
responsible for persisting state to disk -
VBStatePersistTask{Low,High}. There are essentially two interrelated
issues:

1. We allow multiple tasks which persist VBState to exist concurrently
   (VBStatePersistTask{Low,High}, Flusher,
   VBSnapshotTask{Low,High}. Furthermore, multiple instances of the
   same task (e.g. VBStatePersistTask) can also exist concurrenlty.

2. We do not maintain a queue of states to persist, instead we just
   mark that a given vbid's state is pending.

(1) Can occur when a VBPersistTask has started to run on a BG writer
    thread (and has cleared the schedule_vbstate_persist flag), but
    then another scheduleVBStatePersist() call is made. As the
    schedule_vbstate_persist flag is clear, this second task is
    allowed to be created. There is then no guarantee which task will
    complete first (they could be scheduled to different OS writer
    threads, the first thread could be descheduled and the second one
    then runs and completes first).

    We could attempt to solve this by changing when
    schedule_vbstate_persist is cleared (say move it later in the
    persistVBState() function), but then the inverse problem is
    exposed - we may fail to schedule a second (different) state to be
    persisted if the current task is just finishing up (and will exit
    without persisting the now-outstanding work).

(2) Presents a subtle problem relating to when the state of a VBucket
    is materialized. As we only record the vbid to persist (and not
    the state), by the time the VBucketPersistTask runs the actual
    state we /wanted/ to write may have moved forward. Even worse, the
    state could have "gone backwards" (as shown in the MB) if the
    state of the VBucket is read before the vbucket is deleted,
    meaning the task has a 'stale' view of the VBucket object (due to
    us using ref-counted pointers for VBucket objects).

    Additionally, not persisting all intermediate states makes
    debugging harder. We don't actually change VBucket state /that/
    often, and so having all the intermediate states a VBucket went
    through on disk is extremely valuable in debugging.

Solution:

Instead of having multiple different tasks which can persist state
(and attempting to manage when they are created / when they run / what
state they persist), we instead use a single task (the Flusher task)
to persist state for a given vBucket. Note the Flusher *already*
persists vbucket state if necessary during commit (see
EventuallyPersistentStore::flushVBucket), so this path just adopts the
Flusher as the canonical Task to perform vbstate persistence.

To ensure that state is persisted even when there are no outstanding
'normal' items in the vbucket's checkpoint queue, we use the new
queue_op::set_vbucket_state meta-item to signify that a persist is
pending (and to essentially make the pending items queue non-empty so
the flusher will run).

After this patch all the other Tasks which used to persist vbucket
state are redundent - a subsequent patch will remove them.

Change-Id: Ic44360a1dd14fb882ebfa6f28f4ccfe6127d17a8
Reviewed-on: http://review.couchbase.org/69150
Reviewed-by: Jim Walker <jim@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
22 files changed:
src/checkpoint.cc
src/checkpoint.h
src/couch-kvstore/couch-kvstore.cc
src/couch-kvstore/couch-kvstore.h
src/dcp/consumer.cc
src/ep.cc
src/ep.h
src/ep_engine.cc
src/item.h
src/kvshard.h
src/stats.h
src/vbucket.cc
src/vbucket.h
src/vbucketmap.h
tests/ep_testsuite.cc
tests/ep_testsuite_basic.cc
tests/ep_testsuite_checkpoint.cc
tests/ep_testsuite_dcp.cc
tests/ep_testsuite_xdcr.cc
tests/module_tests/checkpoint_test.cc
tests/module_tests/evp_engine_test.cc
tests/module_tests/evp_store_single_threaded_test.cc

index 1009f67..f306147 100644 (file)
@@ -168,21 +168,50 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
             std::list<queued_item>::iterator currPos = it->second.position;
             uint64_t currMutationId = it->second.mutation_id;
 
+            // Given the key already exists, need to check all cursors in this
+            // Checkpoint and see if the existing item for this key is to
+            // the "left" of the cursor (i.e. has already been processed) - in
+            // which case we need to adjust the cursor's offset to ensure that
+            // we correctly account for the updated item which will need to be
+            // iterated over.
             for (auto& cursor : checkpointManager->connCursors) {
 
                 if (*(cursor.second.currentCheckpoint) == this) {
-                    queued_item &tqi = *(cursor.second.currentPos);
-                    const std::string &key = tqi->getKey();
-                    checkpoint_index::iterator ita = keyIndex.find(key);
-                    if (ita != keyIndex.end() && (!tqi->isCheckPointMetaItem()))
-                    {
-                        uint64_t mutationId = ita->second.mutation_id;
-                        if (currMutationId <= mutationId) {
-                            cursor.second.decrOffset(1);
-                            if (cursor.second.name.compare(CheckpointManager::pCursorName)
-                                == 0) {
-                                rv = PERSIST_AGAIN;
-                            }
+
+                    queued_item& cursor_item = *(cursor.second.currentPos);
+
+                    auto& index =
+                            cursor_item->isCheckPointMetaItem() ? metaKeyIndex
+                                                                : keyIndex;
+
+                    auto cursor_item_idx = index.find(cursor_item->getKey());
+                    if (cursor_item_idx == keyIndex.end()) {
+                        throw std::logic_error("Checkpoint::queueDirty: Unable "
+                                "to find key with"
+                                " op:" + to_string(cursor_item->getOperation()) +
+                                " seqno:" + std::to_string(cursor_item->getBySeqno()) +
+                                "for cursor:" + cursor.first + " in current checkpoint.");
+                    }
+
+                    // If the cursor item is non-meta, then we need to decrement
+                    // offset if existing item is either before or on the cursor
+                    // - as the cursor points to the "last processed" item.
+                    // However if the cursor item is meta, then we only
+                    // decrement if the the existing item is strictly less than
+                    // the cursor, as meta-items can share a seqno with
+                    // a non-meta item but are logically before them.
+                    uint64_t cursor_mutation_id = cursor_item_idx->second.mutation_id;
+                    if (cursor_item->isCheckPointMetaItem()) {
+                        --cursor_mutation_id;
+                    }
+                    if (currMutationId <= cursor_mutation_id) {
+                        // Cursor has already processed the previous value for
+                        // this key - need to logically move the cursor
+                        // backwards one so it will pick up the new value for
+                        // this key.
+                        cursor.second.decrOffset(1);
+                        if (cursor.second.name == CheckpointManager::pCursorName) {
+                            rv = PERSIST_AGAIN;
                         }
                     }
                     /* If an TAP cursor points to the existing item for the same
@@ -230,9 +259,11 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
         }
     }
 
-    // Notify flusher if in case queued item is a checkpoint meta item
+    // Notify flusher if in case queued item is a checkpoint meta item or
+    // vbpersist state.
     if (qi->getOperation() == queue_op::checkpoint_start ||
-        qi->getOperation() == queue_op::checkpoint_end) {
+        qi->getOperation() == queue_op::checkpoint_end ||
+        qi->getOperation() == queue_op::set_vbucket_state) {
         checkpointManager->notifyFlusher();
     }
 
@@ -242,7 +273,6 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
 size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
     size_t numNewItems = 0;
     size_t newEntryMemOverhead = 0;
-    std::list<queued_item>::reverse_iterator rit = pPrevCheckpoint->rbegin();
 
     LOG(EXTENSION_LOG_INFO,
         "Collapse the checkpoint %" PRIu64 " into the checkpoint %" PRIu64
@@ -259,26 +289,70 @@ size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
     ++itr;
     (*itr)->setBySeqno(seqno);
 
-    for (; rit != pPrevCheckpoint->rend(); ++rit) {
+    // Iterate in reverse over the previous checkpoints' items, inserting them
+    // into the current checkpoint as necessary.
+    for (auto rit = pPrevCheckpoint->rbegin(); rit != pPrevCheckpoint->rend();
+            ++rit) {
         const std::string &key = (*rit)->getKey();
-        if ((*rit)->isCheckPointMetaItem()) {
-            continue;
-        }
-        checkpoint_index::iterator it = keyIndex.find(key);
-        if (it == keyIndex.end()) {
-            std::list<queued_item>::iterator pos = toWrite.begin();
-            // Skip the first two meta items
-            ++pos; ++pos;
-            toWrite.insert(pos, *rit);
-            index_entry entry = {--pos, static_cast<int64_t>(pPrevCheckpoint->
-                                            getMutationIdForKey(key, false))};
-            keyIndex[key] = entry;
-            newEntryMemOverhead += key.size() + sizeof(index_entry);
-            ++numItems;
-            ++numNewItems;
+        switch ((*rit)->getOperation()) {
+            case queue_op::set:
+            case queue_op::del:
+                // For the two 'normal' operations, re-insert into the current
+                // checkpoint if the key isn't already present (if it is already
+                // present then it must be an older revision and hence we can
+                // safely discard it).
+                if (keyIndex.find(key) == keyIndex.end()) {
+                    // Skip the first two meta items (empty & checkpoint start).
+                    auto pos = std::next(toWrite.begin(), 2);
+                    toWrite.insert(pos, *rit);
+                    index_entry entry = {--pos, static_cast<int64_t>(pPrevCheckpoint->
+                                                    getMutationIdForKey(key, false))};
+                    keyIndex[key] = entry;
+                    newEntryMemOverhead += key.size() + sizeof(index_entry);
+                    ++numItems;
+                    ++numNewItems;
+
+                    // Update new checkpoint's memory usage
+                    incrementMemConsumption((*rit)->size());
+                }
+                break;
+
+            case queue_op::flush:
+                // Should expect to see any `flush` items actually queued.
+                throw std::logic_error("Checkpoint::mergePrevCheckpoint: "
+                        "Unexpected flush item in checkpoint");
+                break;
+
+            case queue_op::empty:
+                // Empty will be the first item in the checkpoint (and handled
+                // already above) - ignore.
+                break;
+
+            case queue_op::checkpoint_start:
+                // Similarly - handled in prologue of this method - ignore.
+                break;
+
+            case queue_op::checkpoint_end:
+                // Can also ignore checkpoint_end.
+                break;
 
-            // Update new checkpoint's memory usage
-            incrementMemConsumption((*rit)->size());
+            case queue_op::set_vbucket_state:
+                // Need to re-insert these into the correct place in the index.
+                if (metaKeyIndex.find(key) == metaKeyIndex.end()) {
+                    // Skip the first two meta items (empty & checkpoint start).
+                    auto pos = std::next(toWrite.begin(), 2);
+                    toWrite.insert(pos, *rit);
+                    auto mutationId = static_cast<int64_t>(
+                            pPrevCheckpoint->getMutationIdForKey(key, true));
+                    metaKeyIndex[key] = {--pos, mutationId};
+                    newEntryMemOverhead += key.size() + sizeof(index_entry);
+                    ++numMetaItems;
+                    ++numNewItems;
+
+                    // Update new checkpoint's memory usage
+                    incrementMemConsumption((*rit)->size());
+                }
+                break;
         }
     }
 
@@ -377,19 +451,29 @@ uint64_t CheckpointManager::getLastClosedCheckpointId() {
 void CheckpointManager::setOpenCheckpointId_UNLOCKED(uint64_t id) {
     if (!checkpointList.empty()) {
         // Update the checkpoint_start item with the new Id.
-        std::list<queued_item>::iterator it =
-            ++(checkpointList.back()->begin());
-        (*it)->setRevSeqno(id);
+        const auto ckpt_start = ++(checkpointList.back()->begin());
+        (*ckpt_start)->setRevSeqno(id);
         if (checkpointList.back()->getId() == 0) {
-            (*it)->setBySeqno(lastBySeqno + 1);
+            (*ckpt_start)->setBySeqno(lastBySeqno + 1);
             checkpointList.back()->setSnapshotStartSeqno(lastBySeqno);
             checkpointList.back()->setSnapshotEndSeqno(lastBySeqno);
         }
 
+        // Update any set_vbstate items to have the same seqno as the
+        // checkpoint_start.
+        const auto ckpt_start_seqno = (*ckpt_start)->getBySeqno();
+        for (auto item = std::next(ckpt_start);
+             item != checkpointList.back()->end();
+             item++) {
+            if ((*item)->getOperation() == queue_op::set_vbucket_state) {
+                (*item)->setBySeqno(ckpt_start_seqno);
+            }
+        }
+
         checkpointList.back()->setId(id);
         LOG(EXTENSION_LOG_INFO, "Set the current open checkpoint id to %" PRIu64
             " for vbucket %d, bySeqno is %" PRId64 ", max is %" PRId64,
-            id, vbucketId, (*it)->getBySeqno(), lastBySeqno);
+            id, vbucketId, (*ckpt_start)->getBySeqno(), lastBySeqno);
     }
 }
 
@@ -1018,6 +1102,16 @@ std::vector<std::string> CheckpointManager::getListOfCursorsToDrop() {
     return cursorsToDrop;
 }
 
+void CheckpointManager::updateStatsForNewQueuedItem_UNLOCKED(const LockHolder&,
+                                                             VBucket& vb,
+                                                             const queued_item& qi) {
+    ++stats.totalEnqueued;
+    ++stats.diskQueueSize;
+    vb.doStatsForQueueing(*qi, qi->size());
+    // Update the checkpoint's memory usage
+    checkpointList.back()->incrementMemConsumption(qi->size());
+}
+
 bool CheckpointManager::queueDirty(VBucket& vb, queued_item& qi,
                                    const GenerateBySeqno generateBySeqno,
                                    const GenerateCas generateCas) {
@@ -1089,17 +1183,32 @@ bool CheckpointManager::queueDirty(VBucket& vb, queued_item& qi,
     }
 
     if (result != EXISTING_ITEM) {
-        ++stats.totalEnqueued;
-        ++stats.diskQueueSize;
-        vb.doStatsForQueueing(*qi, qi->size());
-
-        // Update the checkpoint's memory usage
-        checkpointList.back()->incrementMemConsumption(qi->size());
+        updateStatsForNewQueuedItem_UNLOCKED(lh, vb, qi);
     }
 
     return result != EXISTING_ITEM;
 }
 
+void CheckpointManager::queueSetVBState(VBucket& vb) {
+    // Take lock to serialize use of {lastBySeqno} and to queue op.
+    LockHolder lh(queueLock);
+
+    // Create the setVBState operation, and enqueue it.
+    queued_item item = createCheckpointItem(/*id*/0, vbucketId,
+                                            queue_op::set_vbucket_state);
+
+    auto result = checkpointList.back()->queueDirty(item, this);
+
+    if (result == NEW_ITEM) {
+        ++numItems;
+        updateStatsForNewQueuedItem_UNLOCKED(lh, vb, item);
+    } else {
+        throw std::logic_error("CheckpointManager::queueSetVBState: "
+                "expected: NEW_ITEM, got:" + std::to_string(result) +
+                "after queueDirty. vbid:" + std::to_string(vbucketId));
+    }
+}
+
 snapshot_range_t CheckpointManager::getAllItemsForCursor(
                                              const std::string& name,
                                              std::vector<queued_item> &items) {
@@ -1659,7 +1768,7 @@ bool CheckpointManager::hasNext(const std::string &name) {
 }
 
 queued_item CheckpointManager::createCheckpointItem(uint64_t id, uint16_t vbid,
-                                          queue_op checkpoint_op) {
+                                                    queue_op checkpoint_op) {
     uint64_t bySeqno;
     std::string key;
 
index 2b53bef..8f03ee8 100644 (file)
@@ -701,6 +701,12 @@ public:
                     const GenerateBySeqno generateBySeqno,
                     const GenerateCas generateCas);
 
+    /*
+     * Queue writing of the VBucket's state to persistent layer.
+     * @param vb the vbucket that a new item is pushed into.
+     */
+    void queueSetVBState(VBucket& vb);
+
     /**
      * Return the next item to be sent to a given connection
      * @param name the name of a given connection
@@ -853,6 +859,15 @@ public:
 
     static const std::string pCursorName;
 
+protected:
+
+    // Helper method for queueing methods - update the global and per-VBucket
+    // stats after queueing a new item to a checkpoint.
+    // Must be called with queueLock held (LockHolder passed in as argument to
+    // 'prove' this).
+    void updateStatsForNewQueuedItem_UNLOCKED(const LockHolder&,
+                                     VBucket& vb, const queued_item& qi);
+
 private:
 
     // Pair of {sequence number, cursor at checkpoint start} used when
index da7aa54..087378d 100644 (file)
@@ -851,7 +851,8 @@ vbucket_state * CouchKVStore::getVBucketState(uint16_t vbucketId) {
     return cachedVBStates[vbucketId];
 }
 
-bool CouchKVStore::setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
+bool CouchKVStore::setVBucketState(uint16_t vbucketId,
+                                   const vbucket_state &vbstate,
                                    Callback<kvstats_ctx> *kvcb, bool reset) {
     Db *db = NULL;
     uint64_t fileRev, newFileRev;
@@ -2030,7 +2031,8 @@ ENGINE_ERROR_CODE CouchKVStore::readVBState(Db *db, uint16_t vbId) {
     return couchErr2EngineErr(errCode);
 }
 
-couchstore_error_t CouchKVStore::saveVBState(Db *db, vbucket_state &vbState) {
+couchstore_error_t CouchKVStore::saveVBState(Db *db,
+                                             const vbucket_state &vbState) {
     std::stringstream jsonState;
 
     jsonState << "{\"state\": \"" << VBucket::toString(vbState.state) << "\""
index 6af081b..b6b8d70 100644 (file)
@@ -488,7 +488,7 @@ public:
     void destroyScanContext(ScanContext* ctx);
 
 protected:
-    bool setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
+    bool setVBucketState(uint16_t vbucketId, const vbucket_state &vbstate,
                          Callback<kvstats_ctx> *cb, bool reset=false);
 
     template <typename T>
@@ -517,7 +517,7 @@ protected:
     void commitCallback(std::vector<CouchRequest *> &committedReqs,
                         kvstats_ctx &kvctx,
                         couchstore_error_t errCode);
-    couchstore_error_t saveVBState(Db *db, vbucket_state &vbState);
+    couchstore_error_t saveVBState(Db *db, const vbucket_state &vbState);
     void setDocsCommitted(uint16_t docs);
     void closeDatabaseHandle(Db *db);
 
index a24d2a1..cc46bea 100644 (file)
@@ -923,8 +923,7 @@ void DcpConsumer::streamAccepted(uint32_t opaque, uint16_t status, uint8_t* body
                 RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
                 vb->failovers->replaceFailoverLog(body, bodylen);
                 EventuallyPersistentStore* st = engine_.getEpStore();
-                st->scheduleVBSnapshot(VBSnapshotTask::Priority::HIGH,
-                                st->getVBuckets().getShardByVbId(vbucket)->getId());
+                st->scheduleVBStatePersist(vbucket);
             }
             LOG(EXTENSION_LOG_INFO, "%s (vb %d) Add stream for opaque %" PRIu32
                 " %s with error code %d", logHeader(), vbucket, opaque,
index d2a30d2..19ccecd 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -445,9 +445,6 @@ bool EventuallyPersistentStore::initialize() {
                                                    checkpointRemoverInterval);
     ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
 
-    ExTask vbSnapshotTask = new DaemonVBSnapshotTask(&engine);
-    ExecutorPool::get()->schedule(vbSnapshotTask, WRITER_TASK_IDX);
-
     ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
     ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
 
@@ -1198,7 +1195,7 @@ void EventuallyPersistentStore::snapshotVBuckets(VBSnapshotTask::Priority prio,
     }
 
     if (!success) {
-        scheduleVBSnapshot(prio, shard->getId());
+        scheduleVBSnapshot(prio);
     } else {
         stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
     }
@@ -1304,7 +1301,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
             ExTask notifyTask = new PendingOpsNotification(engine, vb);
             ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
         }
-        scheduleVBStatePersist(VBStatePersistTask::Priority::LOW, vbid);
+        scheduleVBStatePersist(vbid);
     } else if (vbid < vbMap.getSize()) {
         FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
         KVShard* shard = vbMap.getShardByVbId(vbid);
@@ -1333,7 +1330,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
         vbMap.setPersistenceSeqno(vbid, 0);
         vbMap.setBucketCreation(vbid, true);
         lh.unlock();
-        scheduleVBStatePersist(VBStatePersistTask::Priority::HIGH, vbid);
+        scheduleVBStatePersist(vbid);
     } else {
         return ENGINE_ERANGE;
     }
@@ -1365,36 +1362,23 @@ bool EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio
     return true;
 }
 
-void EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio,
-                                                   uint16_t shardId,
-                                                   bool force) {
-    KVShard *shard = vbMap.shards[shardId];
-    if (prio == VBSnapshotTask::Priority::HIGH) {
-        if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
-            ExTask task = new VBSnapshotTaskHigh(&engine, shardId, true);
-            ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
-        }
-    } else {
-        if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
-            ExTask task = new VBSnapshotTaskLow(&engine, shardId, true);
-            ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
-        }
+void EventuallyPersistentStore::scheduleVBStatePersist() {
+    for (auto vbid : vbMap.getBuckets()) {
+        scheduleVBStatePersist(vbid);
     }
 }
 
-void EventuallyPersistentStore::scheduleVBStatePersist(VBStatePersistTask::Priority priority,
-                                                       uint16_t vbid) {
-
-
-    bool inverse = false;
-    if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
+void EventuallyPersistentStore::scheduleVBStatePersist(VBucket::id_type vbid) {
+    RCPtr<VBucket> vb = getVBucket(vbid);
 
-        if (priority == VBStatePersistTask::Priority::HIGH) {
-            ExecutorPool::get()->schedule(new VBStatePersistTaskHigh(&engine, vbid, true), WRITER_TASK_IDX);
-        } else {
-            ExecutorPool::get()->schedule(new VBStatePersistTaskLow(&engine, vbid, true), WRITER_TASK_IDX);
-        }
+    if (!vb) {
+        LOG(EXTENSION_LOG_WARNING,
+            "EPStore::scheduleVBStatePersist: vb:%" PRIu16
+            " does not not exist. Unable to schedule persistence.", vbid);
+        return;
     }
+
+    vb->checkpointManager.queueSetVBState(*vb);
 }
 
 bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
@@ -3264,7 +3248,10 @@ void EventuallyPersistentStore::setFlushAllComplete() {
 void EventuallyPersistentStore::flushOneDeleteAll() {
     for (VBucketMap::id_type i = 0; i < vbMap.getSize(); ++i) {
         RCPtr<VBucket> vb = getVBucket(i);
-        if (vb) {
+        // Reset the vBucket if it's non-null and not already in the middle of
+        // being created / destroyed.
+        if (vb &&
+            !(vbMap.isBucketCreation(i) || vbMap.isBucketDeletion(i))) {
             LockHolder lh(vb_mutexes[vb->getId()]);
             getRWUnderlying(vb->getId())->reset(i);
         }
@@ -3285,12 +3272,8 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
         }
     }
 
-    if (vbMap.isBucketCreation(vbid)) {
-        return RETRY_FLUSH_VBUCKET;
-    }
-
     int items_flushed = 0;
-    rel_time_t flush_start = ep_current_time();
+    const rel_time_t flush_start = ep_current_time();
 
     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
     if (vb) {
@@ -3325,15 +3308,36 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
             rwUnderlying->optimizeWrites(items);
 
             Item *prev = NULL;
+            auto vbstate = vb->getVBucketState();
             uint64_t maxSeqno = 0;
-            uint64_t maxCas = 0;
-            uint64_t maxDeletedRevSeqno = 0;
+            range.start = std::max(range.start, vbstate.lastSnapStart);
+
+            bool mustCheckpointVBState = false;
             std::list<PersistenceCallback*>& pcbs = rwUnderlying->getPersistenceCbList();
 
             for (const auto& item : items) {
 
                 if (!item->shouldPersist()) {
                     continue;
+                }
+
+                if (item->getOperation() == queue_op::set_vbucket_state) {
+                    // No actual item explicitly persisted to (this op exists
+                    // to ensure a commit occurs with the current vbstate);
+                    // flag that we must trigger a snapshot even if there are
+                    // no 'real' items in the checkpoint.
+                    mustCheckpointVBState = true;
+
+                    // Update maxSeqno to ensure the snap {start,end} range
+                    // is correct if no other normal item is included in this
+                    // checkpoint.
+                    maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
+
+                    // Update queuing stats how this item has logically been
+                    // processed.
+                    stats.decrDiskQueueSize(1);
+                    vb->doStatsForFlushing(*item, item->size());
+
                 } else if (!prev || prev->getKey() != item->getKey()) {
                     prev = item.get();
                     ++items_flushed;
@@ -3343,15 +3347,23 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
                     }
 
                     maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
-                    maxCas = std::max(maxCas, item->getCas());
+                    vbstate.maxCas = std::max(vbstate.maxCas, item->getCas());
                     if (item->isDeleted()) {
-                        maxDeletedRevSeqno = std::max(maxDeletedRevSeqno,
-                                                      item->getRevSeqno());
+                        vbstate.maxDeletedSeqno =
+                                std::max(vbstate.maxDeletedSeqno,
+                                         item->getRevSeqno());
                     }
                     ++stats.flusher_todo;
+
                 } else {
-                    // Item is the same key as the previous one - don't need
+                    // Item is the same key as the previous[1] one - don't need
                     // to flush to disk.
+                    // [1] Previous here really means 'next' - optimizeWrites()
+                    //     above has actually re-ordered items such that items
+                    //     with the same key are ordered from high->low seqno.
+                    //     This means we only write the highest (i.e. newest)
+                    //     item for a given key, and discard any duplicate,
+                    //     older items.
                     stats.decrDiskQueueSize(1);
                     vb->doStatsForFlushing(*item, item->size());
                 }
@@ -3367,23 +3379,39 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
                     }
                 }
 
-                // Get current vbstate, then update based on the changes we
-                // have just made.
-                auto vbstate = vb->getVBucketState();
+                // Update VBstate based on the changes we have just made,
+                // then tell the rwUnderlying the 'new' state
+                // (which will persisted as part of the commit() below).
                 vbstate.lastSnapStart = range.start;
                 vbstate.lastSnapEnd = range.end;
-                vbstate.maxCas = maxCas;
-                vbstate.maxDeletedSeqno = maxDeletedRevSeqno;
 
+                // Do we need to trigger a persist of the state?
+                // If there are no "real" items to flush, and we encountered
+                // a set_vbucket_state meta-item.
+                const bool persist = (items_flushed == 0) && mustCheckpointVBState;
+
+                KVStatsCallback kvcb(this);
                 if (rwUnderlying->snapshotVBucket(vb->getId(), vbstate,
-                                                  NULL, false) != true) {
+                                                  &kvcb, persist) != true) {
                     return RETRY_FLUSH_VBUCKET;
                 }
+
+                if (vbMap.setBucketCreation(vbid, false)) {
+                    LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
+                }
             }
 
-            //commit all mutations to disk if the commit interval is zero
-            if (decrCommitInterval(shard->getId()) == 0) {
+            // Commit all mutations to disk if there is a non-zero number
+            // of items to flush, and the commit interval is zero.
+            if ((items_flushed > 0) &&
+                (decrCommitInterval(shard->getId()) == 0)) {
+
                 commit(shard->getId());
+
+                // Now the commit is complete, vBucket file must exist.
+                if (vbMap.setBucketCreation(vbid, false)) {
+                    LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
+                }
             }
 
             hrtime_t end = gethrtime();
@@ -3395,6 +3423,8 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
             stats.cumulativeFlushTime.fetch_add(ep_current_time()
                                                 - flush_start);
             stats.flusher_todo.store(0);
+            stats.totalPersistVBState++;
+
             if (vb->rejectQueue.empty()) {
                 vb->setPersistedSnapshot(range.start, range.end);
                 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
@@ -3473,15 +3503,6 @@ EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
     stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
                                          stats.dirtyAgeHighWat.load()));
 
-    // Wait until the vbucket database is created by the vbucket state
-    // snapshot task.
-    if (vbMap.isBucketCreation(qi->getVBucketId()) ||
-        vbMap.isBucketDeletion(qi->getVBucketId())) {
-        vb->rejectQueue.push(qi);
-        ++vb->opsReject;
-        return NULL;
-    }
-
     KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
     if (!deleted) {
         // TODO: Need to separate disk_insert from disk_update because
@@ -3578,8 +3599,9 @@ std::vector<vbucket_state *> EventuallyPersistentStore::loadVBucketState()
 }
 
 void EventuallyPersistentStore::warmupCompleted() {
-    // Run the vbucket state snapshot job once after the warmup
-    scheduleVBSnapshot(VBSnapshotTask::Priority::HIGH);
+    // Snapshot VBucket state after warmup to ensure Failover table is
+    // persisted.
+    scheduleVBStatePersist();
 
     if (engine.getConfiguration().getAlogPath().length() > 0) {
 
@@ -4131,7 +4153,7 @@ bool EventuallyPersistentStore::runAccessScannerTask() {
 }
 
 void EventuallyPersistentStore::runVbStatePersistTask(int vbid) {
-    scheduleVBStatePersist(VBStatePersistTask::Priority::LOW, vbid);
+    scheduleVBStatePersist(vbid);
 }
 
 void EventuallyPersistentStore::setCursorDroppingLowerUpperThresholds(
index f5163a2..cb941de 100644 (file)
--- a/src/ep.h
+++ b/src/ep.h
@@ -692,16 +692,14 @@ public:
     bool scheduleVBSnapshot(VBSnapshotTask::Priority prio);
 
     /**
-     * schedule a vb_state snapshot task for a given shard.
+     * Schedule a vbstate persistence operation for all vbuckets.
      */
-    void scheduleVBSnapshot(VBSnapshotTask::Priority prio, uint16_t shardId,
-                            bool force = false);
+    void scheduleVBStatePersist();
 
     /**
-     * Schedule a vbstate persistence task for a given vbucket.
+     * Schedule a vbstate persistence operation for a given vbucket.
      */
-    void scheduleVBStatePersist(VBStatePersistTask::Priority prio,
-                                uint16_t vbid);
+    void scheduleVBStatePersist(VBucket::id_type vbid);
 
     /**
      * Persist a vbucket's state.
index 919638b..c1c4442 100644 (file)
@@ -3411,7 +3411,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
                     epstats.snapshotVbucketHisto.total(), add_stat, cookie);
 
     add_casted_stat("ep_persist_vbstate_total",
-                    epstats.persistVBStateHisto.total(), add_stat, cookie);
+                    epstats.totalPersistVBState, add_stat, cookie);
 
     add_casted_stat("ep_vb_total",
                     activeCountVisitor.getVBucketNumber() +
index f1b7b7e..8baf6dd 100644 (file)
@@ -429,7 +429,7 @@ public:
     Item(const Item& other, bool copyKeyOnly = false) :
         metaData(other.metaData),
         key(other.key),
-        bySeqno(other.bySeqno),
+        bySeqno(other.bySeqno.load()),
         queuedTime(other.queuedTime),
         vbucketId(other.vbucketId),
         op(other.op),
@@ -517,11 +517,11 @@ public:
     }
 
     int64_t getBySeqno() const {
-        return bySeqno;
+        return bySeqno.load();
     }
 
     void setBySeqno(int64_t to) {
-        bySeqno = to;
+        bySeqno.store(to);
     }
 
     int getNKey() const {
@@ -752,7 +752,12 @@ private:
     ItemMetaData metaData;
     value_t value;
     std::string key;
-    int64_t bySeqno;
+
+    // bySeqno is atomic because it (rarely) needs to be changed after
+    // the item has been added to a Checkpoint - for meta-items in
+    // checkpoints when updating a the open checkpointID - see
+    // CheckpointManager::setOpenCheckpointId_UNLOCKED
+    std::atomic<int64_t> bySeqno;
     uint32_t queuedTime;
     uint16_t vbucketId;
     queue_op op;
index 17adf13..459bd5b 100644 (file)
@@ -74,7 +74,7 @@ public:
     void setBucket(const RCPtr<VBucket> &b);
     void resetBucket(VBucket::id_type id);
 
-    KVShard::id_type getId() { return shardId; }
+    KVShard::id_type getId() const { return shardId; }
     std::vector<VBucket::id_type> getVBucketsSortedByState();
     std::vector<VBucket::id_type> getVBuckets();
     size_t getMaxNumVbuckets() { return maxVbuckets; }
index ca7b347..f91bb75 100644 (file)
@@ -59,6 +59,7 @@ public:
         tooYoung(0),
         tooOld(0),
         totalPersisted(0),
+        totalPersistVBState(0),
         totalEnqueued(0),
         flushFailed(0),
         flushExpired(0),
@@ -229,6 +230,8 @@ public:
     AtomicValue<size_t> tooOld;
     //! Number of items persisted.
     AtomicValue<size_t> totalPersisted;
+    //! Number of times VBucket state persisted.
+    AtomicValue<size_t> totalPersistVBState;
     //! Cumulative number of items added to the queue.
     AtomicValue<size_t> totalEnqueued;
     //! Number of times an item flush failed.
@@ -571,9 +574,11 @@ public:
     void reset() {
         tooYoung.store(0);
         tooOld.store(0);
+        totalPersistVBState.store(0);
         dirtyAge.store(0);
         dirtyAgeHighWat.store(0);
         commit_time.store(0);
+        cursorsDropped.store(0);
         pagerRuns.store(0);
         itemsRemovedFromCheckpoints.store(0);
         numValueEjects.store(0);
index d2ae0df..b1cd4c2 100644 (file)
@@ -228,7 +228,7 @@ vbucket_state VBucket::getVBucketState() const {
 
 
 
-void VBucket::doStatsForQueueing(Item& qi, size_t itemBytes)
+void VBucket::doStatsForQueueing(const Item& qi, size_t itemBytes)
 {
     ++dirtyQueueSize;
     dirtyQueueMem.fetch_add(sizeof(Item));
index 8cd9663..f8057ca 100644 (file)
@@ -304,7 +304,7 @@ public:
         return true;
     }
 
-    void doStatsForQueueing(Item& item, size_t itemBytes);
+    void doStatsForQueueing(const Item& item, size_t itemBytes);
     void doStatsForFlushing(Item& item, size_t itemBytes);
     void incrMetaDataDisk(Item& qi);
     void decrMetaDataDisk(Item& qi);
index 972e195..0eb128c 100644 (file)
@@ -77,6 +77,9 @@ public:
     void setHLCDriftAheadThreshold(uint64_t threshold);
     void setHLCDriftBehindThreshold(uint64_t threshold);
 
+    // Returns the current state of the given vBucket.
+    vbucket_state getVBucketState(VBucket::id_type id) const;
+
 private:
 
     std::vector<KVShard*> shards;
index c4334e7..f75e78e 100644 (file)
@@ -3853,7 +3853,7 @@ static enum test_result test_disk_gt_ram_incr_race(ENGINE_HANDLE *h,
 
     // Give incr time to finish (it's doing another background fetch)
     wait_for_stat_change(h, h1, "ep_bg_fetched", 1);
-    wait_for_stat_change(h, h1, "ep_total_enqueued", 1);
+    wait_for_stat_to_be(h, h1, "ep_total_enqueued", initial_enqueued + 2);
 
     // The incr mutated the value.
     check_key_value(h, h1, "k1", "14", 2);
index dd13dca..a38ba69 100644 (file)
@@ -2357,7 +2357,6 @@ static enum test_result test_multiple_flush(ENGINE_HANDLE *h,
 
 static enum test_result test_flush_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     item *i = NULL;
-    int overhead = get_int_stat(h, h1, "ep_overhead");
     int cacheSize = get_int_stat(h, h1, "ep_total_cache_size");
     int nonResident = get_int_stat(h, h1, "ep_num_non_resident");
 
@@ -2387,11 +2386,9 @@ static enum test_result test_flush_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)
 
     wait_for_flusher_to_settle(h, h1);
 
-    int overhead2 = get_int_stat(h, h1, "ep_overhead");
     int cacheSize2 = get_int_stat(h, h1, "ep_total_cache_size");
     int nonResident2 = get_int_stat(h, h1, "ep_num_non_resident");
 
-    cb_assert(overhead2 == overhead);
     cb_assert(nonResident2 == nonResident);
     cb_assert(cacheSize2 == cacheSize);
 
index 224a065..5189e58 100644 (file)
@@ -121,7 +121,8 @@ static enum test_result test_checkpoint_deduplication(ENGINE_HANDLE *h, ENGINE_H
             h1->release(h, NULL, itm);
         }
     }
-    wait_for_stat_to_be(h, h1, "vb_0:num_checkpoint_items", 4501, "checkpoint");
+    // 4500 keys + 1x checkpoint_start + 1x set_vbucket_state.
+    wait_for_stat_to_be(h, h1, "vb_0:num_checkpoint_items", 4502, "checkpoint");
     return SUCCESS;
 }
 
index 52b8fe6..cea3cf6 100644 (file)
@@ -3345,11 +3345,9 @@ static enum test_result test_chk_manager_rollback(ENGINE_HANDLE *h,
 
     int items = get_int_stat(h, h1, "curr_items_tot");
     int seqno = get_int_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
-    int chk = get_int_stat(h, h1, "vb_0:num_checkpoint_items", "checkpoint");
 
     checkeq(40, items, "Got invalid amount of items");
     checkeq(40, seqno, "Seqno should be 40 after rollback");
-    checkeq(1, chk, "There should only be one checkpoint item");
     checkeq(num_items/2, get_int_stat(h, h1, "vb_replica_rollback_item_count"),
             "Replica rollback count does not match");
     checkeq(num_items/2, get_int_stat(h, h1, "rollback_item_count"),
index 077d8a5..f1dc39a 100644 (file)
@@ -149,8 +149,6 @@ static enum test_result test_get_meta_nonexistent(ENGINE_HANDLE *h, ENGINE_HANDL
 {
     char const *key = "k1";
 
-    // wait until the vb snapshot has run
-    wait_for_stat_change(h, h1, "ep_vb_snapshot_total", 0);
     // check the stat
     int temp = get_int_stat(h, h1, "ep_num_ops_get_meta");
     check(temp == 0, "Expect zero getMeta ops");
@@ -515,9 +513,6 @@ static enum test_result test_delete_with_meta_nonexistent(ENGINE_HANDLE *h,
     checkeq(0, get_int_stat(h, h1, "ep_num_ops_del_meta"),
             "Expect zero setMeta ops");
 
-    // wait until the vb snapshot has run
-    wait_for_stat_change(h, h1, "ep_vb_snapshot_total", 0);
-
     // get metadata of nonexistent key
     check(!get_meta(h, h1, key), "Expected get meta to return false");
     checkeq(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, last_status.load(),
@@ -985,8 +980,6 @@ static enum test_result test_set_with_meta_nonexistent(ENGINE_HANDLE *h, ENGINE_
     // check the stat
     checkeq(0, get_int_stat(h, h1, "ep_num_ops_set_meta"), "Expect zero ops");
 
-    // wait until the vb snapshot has run
-    wait_for_stat_change(h, h1, "ep_vb_snapshot_total", 0);
     // get metadata for the key
     check(!get_meta(h, h1, key), "Expected get meta to return false");
     checkeq(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, last_status.load(), "Expected enoent");
index 1d06536..7b76c4c 100644 (file)
@@ -977,3 +977,88 @@ TEST_F(CheckpointTest, SeqnoAndHLCOrdering) {
         previousCas = items[ii]->getCas();
     }
 }
+
+// Test cursor is correctly updated when enqueuing a key which already exists
+// in the checkpoint (and needs de-duping), where the cursor points at a
+// meta-item at the head of the checkpoint:
+//
+//  Before:
+//      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET(key), 2:SET_VBSTATE() ]
+//                                                               ^
+//                                                            Cursor
+//
+//  After:
+//      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 2:SET_VBSTATE(), 2:SET(key) ]
+//                                                     ^
+//                                                   Cursor
+//
+TEST_F(CheckpointTest, CursorUpdateForExistingItemWithMetaItemAtHead) {
+    // Setup the checkpoint and cursor.
+    ASSERT_EQ(1, manager->getNumItems());
+    ASSERT_TRUE(queueNewItem("key"));
+    ASSERT_EQ(2, manager->getNumItems());
+    manager->queueSetVBState(*vbucket.get());
+
+    ASSERT_EQ(3, manager->getNumItems());
+
+    // Advance persistence cursor so all items have been consumed.
+    std::vector<queued_item> items;
+    manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+    ASSERT_EQ(3, items.size());
+    ASSERT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+    // Queue an item with a duplicate key.
+    queueNewItem("key");
+
+    // Test: Should have one item for cursor (the one we just added).
+    EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+    // Should have another item to read (new version of 'key')
+    items.clear();
+    manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+    EXPECT_EQ(1, items.size());
+}
+
+// Test cursor is correctly updated when enqueuing a key which already exists
+// in the checkpoint (and needs de-duping), where the cursor points at a
+// meta-item *not* at the head of the checkpoint:
+//
+//  Before:
+//      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET_VBSTATE(key), 1:SET() ]
+//                                                     ^
+//                                                    Cursor
+//
+//  After:
+//      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET_VBSTATE(key), 2:SET() ]
+//                                                     ^
+//                                                   Cursor
+//
+TEST_F(CheckpointTest, CursorUpdateForExistingItemWithNonMetaItemAtHead) {
+    // Setup the checkpoint and cursor.
+    ASSERT_EQ(1, manager->getNumItems());
+    manager->queueSetVBState(*vbucket.get());
+    ASSERT_EQ(2, manager->getNumItems());
+
+    // Advance persistence cursor so all items have been consumed.
+    std::vector<queued_item> items;
+    manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+    ASSERT_EQ(2, items.size());
+    ASSERT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+    // Queue a set (cursor will now be one behind).
+    ASSERT_TRUE(queueNewItem("key"));
+    ASSERT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+    // Test: queue an item with a duplicate key.
+    queueNewItem("key");
+
+    // Test: Should have one item for cursor (the one we just added).
+    EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+    // Should an item to read (new version of 'key')
+    items.clear();
+    manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+    EXPECT_EQ(1, items.size());
+    EXPECT_EQ(1002, items.at(0)->getBySeqno());
+    EXPECT_EQ("key", items.at(0)->getKey());
+}
index e10885f..6b7240f 100644 (file)
@@ -43,12 +43,13 @@ void EventuallyPersistentEngineTest::SetUp() {
     EXPECT_EQ(ENGINE_SUCCESS, engine->initialize(config.c_str()))
         << "Failed to initialize engine.";
 
-    engine->setVBucketState(vbid, vbucket_state_active, false);
-
     // Wait for warmup to complete.
     while (engine->getEpStore()->isWarmingUp()) {
         usleep(10);
     }
+
+    // Once warmup is complete, set VB to active.
+    engine->setVBucketState(vbid, vbucket_state_active, false);
 }
 
 void EventuallyPersistentEngineTest::TearDown() {
index ef4fbc8..e2eecb9 100644 (file)
@@ -74,14 +74,13 @@ protected:
      * On return the state will be changed and the task completed.
      */
     void setVBucketStateAndRunPersistTask(uint16_t vbid, vbucket_state_t newState) {
-        auto& lpWriterQ = *task_executor->getLpTaskQ()[WRITER_TASK_IDX];
-
-        // Change state - this should add 1 VBStatePersistTask to the WRITER queue.
+        // Change state - this should add 1 set_vbucket_state op to the
+        //VBuckets' persistence queue.
         EXPECT_EQ(ENGINE_SUCCESS,
                   store->setVBucketState(vbid, newState, /*transfer*/false));
 
-        runNextTask(lpWriterQ, "Persisting a vbucket state for vbucket: "
-                               + std::to_string(vbid));
+        // Trigger the flusher to flush state to disk.
+        EXPECT_EQ(0, store->flushVBucket(vbid));
     }
 
     /*
@@ -141,12 +140,12 @@ TEST_F(SingleThreadedEPStoreTest, MB19695_doTapVbTakeoverStats) {
     auto& lpNonioQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
 
     // [[2]] Perform a vbucket reset. This will perform some work synchronously,
-    // but also schedules 3 tasks:
+    // but also created 2 tasks and notifies the flusher:
     //   1. vbucket memory deletion (NONIO)
     //   2. vbucket disk deletion (WRITER)
-    //   3. VBStatePersistTask (WRITER)
+    //   3. FlusherTask notified (WRITER)
     // MB-19695: If we try to get the number of persisted deletes between
-    // tasks (2) and (3) running then an exception is thrown (and client
+    // steps (2) and (3) running then an exception is thrown (and client
     // disconnected).
     EXPECT_TRUE(store->resetVBucket(vbid));
 
@@ -167,8 +166,8 @@ TEST_F(SingleThreadedEPStoreTest, MB19695_doTapVbTakeoverStats) {
     EXPECT_NO_THROW(engine->public_doDcpVbTakeoverStats
                     (nullptr, dummy_cb, key, vbid));
 
-    // Cleanup - run the 3rd task - VBStatePersistTask.
-    runNextTask(lpWriterQ, "Persisting a vbucket state for vbucket: 0");
+    // Cleanup - run flusher.
+    EXPECT_EQ(0, store->flushVBucket(vbid));
 }
 
 /*