Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / ep.cc
index 0e0895c..9902709 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -32,6 +32,7 @@
 #include "access_scanner.h"
 #include "checkpoint_remover.h"
 #include "conflict_resolution.h"
+#include "defragmenter.h"
 #include "ep.h"
 #include "ep_engine.h"
 #include "failover-table.h"
 
 class StatsValueChangeListener : public ValueChangedListener {
 public:
-    StatsValueChangeListener(EPStats &st) : stats(st) {
+    StatsValueChangeListener(EPStats &st, EventuallyPersistentStore &str)
+        : stats(st), store(str) {
         // EMPTY
     }
 
     virtual void sizeValueChanged(const std::string &key, size_t value) {
         if (key.compare("max_size") == 0) {
             stats.setMaxDataSize(value);
+            store.getEPEngine().getDcpConnMap(). \
+                                     updateMaxActiveSnoozingBackfills(value);
             size_t low_wat = static_cast<size_t>
-                             (static_cast<double>(value) * 0.6);
+                             (static_cast<double>(value) * 0.75);
             size_t high_wat = static_cast<size_t>(
-                              static_cast<double>(value) * 0.75);
+                              static_cast<double>(value) * 0.85);
             stats.mem_low_wat.store(low_wat);
             stats.mem_high_wat.store(high_wat);
         } else if (key.compare("mem_low_wat") == 0) {
@@ -80,6 +84,7 @@ public:
 
 private:
     EPStats &stats;
+    EventuallyPersistentStore &store;
 };
 
 /**
@@ -129,6 +134,14 @@ public:
             } else {
                 store.disableAccessScannerTask();
             }
+        } else if (key.compare("bfilter_enabled") == 0) {
+            store.setAllBloomFilters(value);
+        }
+    }
+
+    virtual void floatValueChanged(const std::string &key, float value) {
+        if (key.compare("bfilter_residency_threshold") == 0) {
+            store.setBfiltersResidencyThreshold(value);
         }
     }
 
@@ -189,8 +202,10 @@ EventuallyPersistentStore::EventuallyPersistentStore(
     EventuallyPersistentEngine &theEngine) :
     engine(theEngine), stats(engine.getEpStats()),
     vbMap(theEngine.getConfiguration(), *this),
+    defragmenterTask(NULL),
     bgFetchQueue(0),
-    diskFlushAll(false), bgFetchDelay(0), backfillMemoryThreshold(0.95),
+    diskFlushAll(false), bgFetchDelay(0),
+    backfillMemoryThreshold(0.95),
     statsSnapshotTaskId(0), lastTransTimePerItem(0)
 {
     cachedResidentRatio.activeRatio.store(0);
@@ -229,26 +244,27 @@ EventuallyPersistentStore::EventuallyPersistentStore(
     stats.memOverhead = sizeof(EventuallyPersistentStore);
 
     if (config.getConflictResolutionType().compare("seqno") == 0) {
-        conflictResolver = new SeqBasedResolution();
+        conflictResolver = new ConflictResolution();
     }
 
     stats.setMaxDataSize(config.getMaxSize());
     config.addValueChangedListener("max_size",
-                                   new StatsValueChangeListener(stats));
+                                   new StatsValueChangeListener(stats, *this));
+    getEPEngine().getDcpConnMap().updateMaxActiveSnoozingBackfills(config.getMaxSize());
 
     stats.mem_low_wat.store(config.getMemLowWat());
     config.addValueChangedListener("mem_low_wat",
-                                   new StatsValueChangeListener(stats));
+                                   new StatsValueChangeListener(stats, *this));
 
     stats.mem_high_wat.store(config.getMemHighWat());
     config.addValueChangedListener("mem_high_wat",
-                                   new StatsValueChangeListener(stats));
+                                   new StatsValueChangeListener(stats, *this));
 
     stats.tapThrottleThreshold.store(static_cast<double>
                                     (config.getTapThrottleThreshold())
                                      / 100.0);
     config.addValueChangedListener("tap_throttle_threshold",
-                                   new StatsValueChangeListener(stats));
+                                   new StatsValueChangeListener(stats, *this));
 
     stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
     config.addValueChangedListener("tap_throttle_queue_cap",
@@ -263,11 +279,11 @@ EventuallyPersistentStore::EventuallyPersistentStore(
     stats.warmupMemUsedCap.store(static_cast<double>
                                (config.getWarmupMinMemoryThreshold()) / 100.0);
     config.addValueChangedListener("warmup_min_memory_threshold",
-                                   new StatsValueChangeListener(stats));
+                                   new StatsValueChangeListener(stats, *this));
     stats.warmupNumReadCap.store(static_cast<double>
                                 (config.getWarmupMinItemsThreshold()) / 100.0);
     config.addValueChangedListener("warmup_min_items_threshold",
-                                   new StatsValueChangeListener(stats));
+                                   new StatsValueChangeListener(stats, *this));
 
     double mem_threshold = static_cast<double>
                                       (config.getMutationMemThreshold()) / 100;
@@ -281,6 +297,13 @@ EventuallyPersistentStore::EventuallyPersistentStore(
     config.addValueChangedListener("backfill_mem_threshold",
                                    new EPStoreValueChangeListener(*this));
 
+    config.addValueChangedListener("bfilter_enabled",
+                                   new EPStoreValueChangeListener(*this));
+
+    bfilterResidencyThreshold = config.getBfilterResidencyThreshold();
+    config.addValueChangedListener("bfilter_residency_threshold",
+                                   new EPStoreValueChangeListener(*this));
+
     compactionExpMemThreshold = config.getCompactionExpMemThreshold();
     config.addValueChangedListener("compaction_exp_mem_threshold",
                                    new EPStoreValueChangeListener(*this));
@@ -348,13 +371,22 @@ bool EventuallyPersistentStore::initialize() {
     ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
     ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
 
+#if HAVE_JEMALLOC
+    /* Only create the defragmenter task if we have an underlying memory
+     * allocator which can facilitate defragmenting memory.
+     */
+    defragmenterTask = new DefragmenterTask(&engine, stats);
+    ExecutorPool::get()->schedule(defragmenterTask, NONIO_TASK_IDX);
+#endif
+
     return true;
 }
 
 EventuallyPersistentStore::~EventuallyPersistentStore() {
     stopWarmup();
     stopBgFetcher();
-    ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX);
+    ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX,
+                                       stats.forceShutdown);
 
     ExecutorPool::get()->cancel(statsSnapshotTaskId);
     LockHolder lh(accessScanner.mutex);
@@ -362,7 +394,8 @@ EventuallyPersistentStore::~EventuallyPersistentStore() {
     lh.unlock();
 
     stopFlusher();
-    ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine());
+    ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine(),
+                                          stats.forceShutdown);
 
     delete [] vb_mutexes;
     delete [] schedule_vbstate_persist;
@@ -371,6 +404,7 @@ EventuallyPersistentStore::~EventuallyPersistentStore() {
     delete conflictResolver;
     delete warmupTask;
     delete storageProperties;
+    defragmenterTask.reset();
 
     std::vector<MutationLog*>::iterator it;
     for (it = accessLog.begin(); it != accessLog.end(); it++) {
@@ -397,6 +431,8 @@ bool EventuallyPersistentStore::startFlusher() {
 void EventuallyPersistentStore::stopFlusher() {
     for (uint16_t i = 0; i < vbMap.numShards; i++) {
         Flusher *flusher = vbMap.shards[i]->getFlusher();
+        LOG(EXTENSION_LOG_WARNING, "Attempting to stop the flusher for "
+            "shard:%" PRIu16, i);
         bool rv = flusher->stop(stats.forceShutdown);
         if (rv && !stats.forceShutdown) {
             flusher->wait();
@@ -445,7 +481,7 @@ bool EventuallyPersistentStore::startBgFetcher() {
         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
         if (bgfetcher == NULL) {
             LOG(EXTENSION_LOG_WARNING,
-                "Falied to start bg fetcher for shard %d", i);
+                "Failed to start bg fetcher for shard %d", i);
             return false;
         }
         bgfetcher->start();
@@ -461,7 +497,7 @@ void EventuallyPersistentStore::stopBgFetcher() {
                 "Shutting down engine while there are still pending data "
                 "read for shard %d from database storage", i);
         }
-        LOG(EXTENSION_LOG_INFO, "Stopping bg fetcher for underlying storage");
+        LOG(EXTENSION_LOG_WARNING, "Stopping bg fetcher for shard:%" PRIu16, i);
         bgfetcher->stop();
     }
 }
@@ -488,22 +524,25 @@ EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
                     cb_assert(deleted);
                 } else if (v->isExpired(startTime) && !v->isDeleted()) {
                     vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
-                    queueDirty(vb, v, &lh, false);
+                    queueDirty(vb, v, &lh, NULL, false);
                 }
             } else {
                 if (eviction_policy == FULL_EVICTION) {
                     // Create a temp item and delete and push it
-                    // into the checkpoint queue.
-                    add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
-                                                                eviction_policy);
-                    if (rv == ADD_NOMEM) {
-                        return;
+                    // into the checkpoint queue, only if the bloomfilter
+                    // predicts that the item may exist on disk.
+                    if (vb->maybeKeyExistsInFilter(key)) {
+                        add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
+                                                                    eviction_policy);
+                        if (rv == ADD_NOMEM) {
+                            return;
+                        }
+                        v = vb->ht.unlocked_find(key, bucket_num, true, false);
+                        v->setStoredValueState(StoredValue::state_deleted_key);
+                        v->setRevSeqno(revSeqno);
+                        vb->ht.unlocked_softDelete(v, 0, eviction_policy);
+                        queueDirty(vb, v, &lh, NULL, false);
                     }
-                    v = vb->ht.unlocked_find(key, bucket_num, true, false);
-                    v->setStoredValueState(StoredValue::state_deleted_key);
-                    v->setRevSeqno(revSeqno);
-                    vb->ht.unlocked_softDelete(v, 0, eviction_policy);
-                    queueDirty(vb, v, &lh, false);
                 }
             }
         }
@@ -539,7 +578,7 @@ StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
             if (queueExpired && vb->getState() == vbucket_state_active) {
                 incExpirationStat(vb, false);
                 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
-                queueDirty(vb, v, NULL, false, true);
+                queueDirty(vb, v, NULL, NULL, false, true);
             }
             return wantDeleted ? v : NULL;
         }
@@ -547,6 +586,21 @@ StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
     return v;
 }
 
+bool EventuallyPersistentStore::isMetaDataResident(RCPtr<VBucket> &vb,
+                                                   const std::string &key) {
+
+    cb_assert(vb);
+    int bucket_num(0);
+    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
+    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, false, false);
+
+    if (v && !v->isTempItem()) {
+        return true;
+    } else {
+        return false;
+    }
+}
+
 protocol_binary_response_status EventuallyPersistentStore::evictKey(
                                                         const std::string &key,
                                                         uint16_t vbucket,
@@ -572,6 +626,11 @@ protocol_binary_response_status EventuallyPersistentStore::evictKey(
         if (v->isResident()) {
             if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
                 *msg = "Ejected.";
+
+                // Add key to bloom filter incase of full eviction mode
+                if (getItemEvictionPolicy() == FULL_EVICTION) {
+                    vb->addToFilter(key);
+                }
             } else {
                 *msg = "Can't eject: Dirty object.";
                 rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
@@ -614,7 +673,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
             abort();
         case ADD_BG_FETCH:
             lock.unlock();
-            bgFetch(key, vb->getId(), -1, cookie, metadataOnly);
+            bgFetch(key, vb->getId(), cookie, metadataOnly);
     }
     return ENGINE_EWOULDBLOCK;
 }
@@ -655,10 +714,23 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
          vb->getState() == vbucket_state_pending)) {
         v->unlock();
     }
-    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(),
-                                                true, false,
-                                                eviction_policy, nru);
 
+    bool maybeKeyExists = true;
+    // Check Bloomfilter's prediction if in full eviction policy
+    // and for a CAS operation only.
+    if (eviction_policy == FULL_EVICTION && itm.getCas() != 0) {
+        // Check Bloomfilter's prediction
+        if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
+            maybeKeyExists = false;
+        }
+    }
+
+    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(), true, false,
+                                                eviction_policy, nru,
+                                                maybeKeyExists);
+
+    Item& it = const_cast<Item&>(itm);
+    uint64_t seqno = 0;
     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
     switch (mtype) {
     case NOMEM:
@@ -678,20 +750,23 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
         // Even if the item was dirty, push it into the vbucket's open
         // checkpoint.
     case WAS_CLEAN:
-        queueDirty(vb, v, &lh);
+        it.setCas(vb->nextHLCCas());
+        v->setCas(it.getCas());
+        queueDirty(vb, v, &lh, &seqno);
+        it.setBySeqno(seqno);
         break;
     case NEED_BG_FETCH:
-        { // CAS operation with non-resident item + full eviction.
-            if (v) {
-                // temp item is already created. Simply schedule a bg fetch job
-                lh.unlock();
-                bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
-                return ENGINE_EWOULDBLOCK;
-            }
-            ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
-                                        cookie, true);
-            break;
+    {   // CAS operation with non-resident item + full eviction.
+        if (v) {
+            // temp item is already created. Simply schedule a bg fetch job
+            lh.unlock();
+            bgFetch(itm.getKey(), vb->getId(), cookie, true);
+            return ENGINE_EWOULDBLOCK;
         }
+        ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
+                                    cookie, true);
+        break;
+    }
     case INVALID_VBUCKET:
         ret = ENGINE_NOT_MY_VBUCKET;
         break;
@@ -731,26 +806,43 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
                                           false);
+
+    bool maybeKeyExists = true;
+    if (eviction_policy == FULL_EVICTION) {
+        // Check bloomfilter's prediction
+        if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
+            maybeKeyExists = false;
+        }
+    }
+
     add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
-                                           eviction_policy);
+                                           eviction_policy,
+                                           true, true,
+                                           maybeKeyExists);
 
+    Item& it = const_cast<Item&>(itm);
+    uint64_t seqno = 0;
     switch (atype) {
     case ADD_NOMEM:
         return ENGINE_ENOMEM;
     case ADD_EXISTS:
         return ENGINE_NOT_STORED;
     case ADD_TMP_AND_BG_FETCH:
-        return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
+        return addTempItemForBgFetch(lh, bucket_num, it.getKey(), vb,
                                      cookie, true);
     case ADD_BG_FETCH:
         lh.unlock();
-        bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
+        bgFetch(it.getKey(), vb->getId(), cookie, true);
         return ENGINE_EWOULDBLOCK;
     case ADD_SUCCESS:
     case ADD_UNDEL:
-        queueDirty(vb, v, &lh);
+        it.setCas(vb->nextHLCCas());
+        v->setCas(it.getCas());
+        queueDirty(vb, v, &lh, &seqno);
+        it.setBySeqno(seqno);
         break;
     }
+
     return ENGINE_SUCCESS;
 }
 
@@ -793,6 +885,8 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
                                         0xff);
         }
 
+        Item& it = const_cast<Item&>(itm);
+        uint64_t seqno = 0;
         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
         switch (mtype) {
             case NOMEM:
@@ -810,13 +904,16 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
                 // Even if the item was dirty, push it into the vbucket's open
                 // checkpoint.
             case WAS_CLEAN:
-                queueDirty(vb, v, &lh);
+                it.setCas(vb->nextHLCCas());
+                v->setCas(it.getCas());
+                queueDirty(vb, v, &lh, &seqno);
+                it.setBySeqno(seqno);
                 break;
             case NEED_BG_FETCH:
             {
                 // temp item is already created. Simply schedule a bg fetch job
                 lh.unlock();
-                bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
+                bgFetch(it.getKey(), vb->getId(), cookie, true);
                 ret = ENGINE_EWOULDBLOCK;
                 break;
             }
@@ -831,14 +928,22 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
             return ENGINE_KEY_ENOENT;
         }
 
-        return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
-                                     cookie, false);
+        if (vb->maybeKeyExistsInFilter(itm.getKey())) {
+            return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
+                                         cookie, false);
+        } else {
+            // As bloomfilter predicted that item surely doesn't exist
+            // on disk, return ENOENT for replace().
+            return ENGINE_KEY_ENOENT;
+        }
     }
 }
 
-ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
-                                                                uint8_t nru,
-                                                                bool genBySeqno) {
+ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(
+                                                        const Item &itm,
+                                                        uint8_t nru,
+                                                        bool genBySeqno,
+                                                        ExtendedMetaData *emd) {
 
     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
     if (!vb) {
@@ -855,6 +960,11 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
         return ENGINE_NOT_MY_VBUCKET;
     }
 
+    //check for the incoming item's CAS validity
+    if (!Item::isValidCas(itm.getCas())) {
+        return ENGINE_KEY_EEXISTS;
+    }
+
     int bucket_num(0);
     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
@@ -883,7 +993,16 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
     case NOT_FOUND:
         // FALLTHROUGH
     case WAS_CLEAN:
-        queueDirty(vb, v, &lh, true, true, genBySeqno);
+        /* set the conflict resolution mode from the extended meta data *
+         * Given that the mode is already set, we don't need to set the *
+         * conflict resolution mode in queueDirty */
+        if (emd) {
+            v->setConflictResMode(
+                 static_cast<enum conflict_resolution_mode>(
+                                      emd->getConflictResMode()));
+        }
+        vb->setMaxCas(v->getCas());
+        queueDirty(vb, v, &lh, NULL,true, true, genBySeqno, false);
         break;
     case INVALID_VBUCKET:
         ret = ENGINE_NOT_MY_VBUCKET;
@@ -893,6 +1012,11 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
         abort();
     }
 
+    // Update drift counter for vbucket upon a success only
+    if (ret == ENGINE_SUCCESS && emd) {
+        vb->setDriftCounter(emd->getAdjustedTime());
+    }
+
     return ret;
 }
 
@@ -922,15 +1046,15 @@ void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
             : vbuckets(vb_map), shardId(sid) { }
         bool visitBucket(RCPtr<VBucket> &vb) {
             if (vbuckets.getShard(vb->getId())->getId() == shardId) {
-                uint64_t snapStart = 0;
-                uint64_t snapEnd = 0;
+                snapshot_range_t range;
+                vb->getPersistedSnapshot(range);
                 std::string failovers = vb->failovers->toJSON();
                 uint64_t chkId = vbuckets.getPersistenceCheckpointId(vb->getId());
 
-                vb->getCurrentSnapshot(snapStart, snapEnd);
                 vbucket_state vb_state(vb->getState(), chkId, 0,
                                        vb->getHighSeqno(), vb->getPurgeSeqno(),
-                                       snapStart, snapEnd, failovers);
+                                       range.start, range.end, vb->getMaxCas(),
+                                       vb->getDriftCounter() ,failovers);
                 states.insert(std::pair<uint16_t, vbucket_state>(vb->getId(), vb_state));
             }
             return false;
@@ -1014,12 +1138,13 @@ bool EventuallyPersistentStore::persistVBState(const Priority &priority,
     KVStatsCallback kvcb(this);
     uint64_t chkId = vbMap.getPersistenceCheckpointId(vbid);
     std::string failovers = vb->failovers->toJSON();
-    uint64_t snapStart = 0;
-    uint64_t snapEnd = 0;
 
-    vb->getCurrentSnapshot(snapStart, snapEnd);
+    snapshot_range_t range;
+    vb->getPersistedSnapshot(range);
     vbucket_state vb_state(vb->getState(), chkId, 0, vb->getHighSeqno(),
-                           vb->getPurgeSeqno(), snapStart, snapEnd, failovers);
+                           vb->getPurgeSeqno(), range.start, range.end,
+                           vb->getMaxCas(), vb->getDriftCounter(),
+                           failovers);
 
     KVStore *rwUnderlying = getRWUnderlying(vbid);
     if (rwUnderlying->snapshotVBucket(vbid, vb_state, &kvcb)) {
@@ -1058,14 +1183,23 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
         }
 
         vb->setState(to, engine.getServerApi());
+
+        if (to == vbucket_state_active && oldstate == vbucket_state_replica) {
+            /**
+             * Update snapshot range when vbucket goes from being a replica
+             * to active, to maintain the correct snapshot sequence numbers
+             * even in a failover scenario.
+             */
+            vb->checkpointManager.resetSnapshotRange();
+        }
+
         if (to == vbucket_state_active && !transfer) {
-            uint64_t snapStart = 0;
-            uint64_t snapEnd = 0;
-            vb->getCurrentSnapshot(snapStart, snapEnd);
-            if (snapEnd == vbMap.getPersistenceSeqno(vbid)) {
-                vb->failovers->createEntry(snapEnd);
+            snapshot_range_t range;
+            vb->getPersistedSnapshot(range);
+            if (range.end == vbMap.getPersistenceSeqno(vbid)) {
+                vb->failovers->createEntry(range.end);
             } else {
-                vb->failovers->createEntry(snapStart);
+                vb->failovers->createEntry(range.start);
             }
         }
 
@@ -1078,9 +1212,11 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
         scheduleVBStatePersist(Priority::VBucketPersistLowPriority, vbid);
     } else {
         FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
+        KVShard* shard = vbMap.getShard(vbid);
+        shared_ptr<Callback<uint16_t> > cb(new NotifyFlusherCB(shard));
         RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
                                          engine.getCheckpointConfig(),
-                                         vbMap.getShard(vbid), 0, 0, 0, ft));
+                                         shard, 0, 0, 0, ft, cb));
         // The first checkpoint for active vbucket should start with id 2.
         uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
         newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
@@ -1103,7 +1239,7 @@ bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
         for (size_t i = 0; i < vbMap.numShards; ++i) {
             shard = vbMap.shards[i];
             if (shard->setHighPriorityVbSnapshotFlag(true)) {
-                ExTask task = new VBSnapshotTask(&engine, p, i, false);
+                ExTask task = new VBSnapshotTask(&engine, p, i, true);
                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
             }
         }
@@ -1111,7 +1247,7 @@ bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
         for (size_t i = 0; i < vbMap.numShards; ++i) {
             shard = vbMap.shards[i];
             if (shard->setLowPriorityVbSnapshotFlag(true)) {
-                ExTask task = new VBSnapshotTask(&engine, p, i, false);
+                ExTask task = new VBSnapshotTask(&engine, p, i, true);
                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
             }
         }
@@ -1128,12 +1264,12 @@ void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p,
     KVShard *shard = vbMap.shards[shardId];
     if (p == Priority::VBucketPersistHighPriority) {
         if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
-            ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
+            ExTask task = new VBSnapshotTask(&engine, p, shardId, true);
             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
         }
     } else {
         if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
-            ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
+            ExTask task = new VBSnapshotTask(&engine, p, shardId, true);
             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
         }
     }
@@ -1145,7 +1281,7 @@ void EventuallyPersistentStore::scheduleVBStatePersist(const Priority &priority,
     bool inverse = false;
     if (force ||
         schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
-        ExTask task = new VBStatePersistTask(&engine, priority, vbid, false);
+        ExTask task = new VBStatePersistTask(&engine, priority, vbid, true);
         ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
     }
 }
@@ -1278,6 +1414,62 @@ bool EventuallyPersistentStore::compactVBucket(const uint16_t vbid,
             return true; // Schedule a compaction task again.
         }
 
+        Configuration &config = getEPEngine().getConfiguration();
+        if (config.isBfilterEnabled()) {
+            size_t initial_estimation = config.getBfilterKeyCount();
+            size_t estimated_count;
+            size_t num_deletes =
+                    getROUnderlying(vbid)->getNumPersistedDeletes(vbid);
+            if (eviction_policy == VALUE_ONLY) {
+                /**
+                 * VALUE-ONLY EVICTION POLICY
+                 * Obtain number of persisted deletes from underlying kvstore.
+                 * Bloomfilter's estimated_key_count = 1.25 * deletes
+                 */
+
+                estimated_count = round(1.25 * num_deletes);
+                ctx->bfcb = new BfilterCB(this, vbid, false);
+            } else {
+                /**
+                 * FULL EVICTION POLICY
+                 * First determine if the resident ratio of vbucket is less than
+                 * the threshold from configuration.
+                 */
+
+                bool residentRatioAlert = vb->isResidentRatioUnderThreshold(
+                                                getBfiltersResidencyThreshold(),
+                                                eviction_policy);
+                ctx->bfcb = new BfilterCB(this, vbid, residentRatioAlert);
+
+                /**
+                 * Based on resident ratio against threshold, estimate count.
+                 *
+                 * 1. If resident ratio is greater than the threshold:
+                 * Obtain number of persisted deletes from underlying kvstore.
+                 * Obtain number of non-resident-items for vbucket.
+                 * Bloomfilter's estimated_key_count =
+                 *                              1.25 * (deletes + non-resident)
+                 *
+                 * 2. Otherwise:
+                 * Obtain number of items for vbucket.
+                 * Bloomfilter's estimated_key_count =
+                 *                              1.25 * (num_items)
+                 */
+
+                if (residentRatioAlert) {
+                    estimated_count = round(1.25 *
+                                            vb->getNumItems(eviction_policy));
+                } else {
+                    estimated_count = round(1.25 * (num_deletes +
+                                vb->getNumNonResidentItems(eviction_policy)));
+                }
+            }
+            if (estimated_count < initial_estimation) {
+                estimated_count = initial_estimation;
+            }
+            vb->initTempFilter(estimated_count, config.getBfilterFpProb());
+        }
+
         if (vb->getState() == vbucket_state_active) {
             // Set the current time ONLY for active vbuckets.
             ctx->curr_time = ep_real_time();
@@ -1286,7 +1478,17 @@ bool EventuallyPersistentStore::compactVBucket(const uint16_t vbid,
         }
         ExpiredItemsCallback cb(this, vbid);
         KVStatsCallback kvcb(this);
-        getRWUnderlying(vbid)->compactVBucket(vbid, ctx, cb, kvcb);
+        if (getRWUnderlying(vbid)->compactVBucket(vbid, ctx, cb, kvcb)) {
+            if (config.isBfilterEnabled()) {
+                vb->swapFilter();
+            } else {
+                vb->clearFilter();
+            }
+        } else {
+            LOG(EXTENSION_LOG_WARNING, "Compaction: Not successful for vb %u, "
+                    "clearing bloom filter, if any.", vb->getId());
+            vb->clearFilter();
+        }
         vb->setPurgeSeqno(ctx->max_purged_seq);
     } else {
         err = ENGINE_NOT_MY_VBUCKET;
@@ -1337,14 +1539,14 @@ bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
         lh.unlock();
 
         std::list<std::string> tap_cursors = vb->checkpointManager.
-                                             getTAPCursorNames();
+                                             getCursorNames();
         // Delete and recreate the vbucket database file
         scheduleVBDeletion(vb, NULL, 0);
         setVBucketState(vbid, vbstate, false);
 
         // Copy the all cursors from the old vbucket into the new vbucket
         RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
-        newvb->checkpointManager.resetTAPCursors(tap_cursors);
+        newvb->checkpointManager.resetCursors(tap_cursors);
 
         rv = true;
     }
@@ -1414,7 +1616,6 @@ void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
 
 void EventuallyPersistentStore::completeBGFetch(const std::string &key,
                                                 uint16_t vbucket,
-                                                uint64_t rowid,
                                                 const void *cookie,
                                                 hrtime_t init,
                                                 bool isMeta) {
@@ -1427,7 +1628,7 @@ void EventuallyPersistentStore::completeBGFetch(const std::string &key,
     } else {
         ++stats.bg_fetched;
     }
-    getROUnderlying(vbucket)->get(key, rowid, vbucket, gcb);
+    getROUnderlying(vbucket)->get(key, vbucket, gcb);
     gcb.waitForValue();
     cb_assert(gcb.fired);
     ENGINE_ERROR_CODE status = gcb.val.getStatus();
@@ -1441,8 +1642,15 @@ void EventuallyPersistentStore::completeBGFetch(const std::string &key,
         LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
         StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
         if (isMeta) {
-            if (v && v->unlocked_restoreMeta(gcb.val.getValue(),
-                                             gcb.val.getStatus(), vb->ht)) {
+            if ((v && v->unlocked_restoreMeta(gcb.val.getValue(),
+                                             gcb.val.getStatus(), vb->ht))
+                || ENGINE_KEY_ENOENT == status) {
+                /* If ENGINE_KEY_ENOENT is the status from storage and the temp
+                 key is removed from hash table by the time bgfetch returns
+                 (in case multiple bgfetch is scheduled for a key), we still
+                 need to return ENGINE_SUCCESS to the memcached worker thread,
+                 so that the worker thread can visit the ep-engine and figure
+                 out the correct flow */
                 status = ENGINE_SUCCESS;
             }
         } else {
@@ -1481,7 +1689,7 @@ void EventuallyPersistentStore::completeBGFetch(const std::string &key,
                         // returns, the item may have been updated and queued
                         // Hence test the CAS value to be the same first.
                         // exptime mutated, schedule it into new checkpoint
-                        queueDirty(vb, v, &hlh);
+                        queueDirty(vb, v, &hlh, NULL);
                     }
                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
                     v->setStoredValueState(
@@ -1497,8 +1705,9 @@ void EventuallyPersistentStore::completeBGFetch(const std::string &key,
                     // underlying kvstore couldn't fetch requested data
                     // log returned error and notify TMPFAIL to client
                     LOG(EXTENSION_LOG_WARNING,
-                        "Warning: failed background fetch for vb=%d seq=%d "
-                        "key=%s", vbucket, v->getBySeqno(), key.c_str());
+                        "Warning: failed background fetch for vb=%d "
+                        "seq=%" PRId64 " key=%s", vbucket, v->getBySeqno(),
+                        key.c_str());
                     status = ENGINE_TMPFAIL;
                 }
             }
@@ -1548,7 +1757,14 @@ void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
         LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
         StoredValue *v = fetchValidValue(vb, key, bucket, true);
         if (bgitem->metaDataOnly) {
-            if (v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht)) {
+            if ((v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht))
+                || ENGINE_KEY_ENOENT == status) {
+                /* If ENGINE_KEY_ENOENT is the status from storage and the temp
+                 key is removed from hash table by the time bgfetch returns
+                 (in case multiple bgfetch is scheduled for a key), we still
+                 need to return ENGINE_SUCCESS to the memcached worker thread,
+                 so that the worker thread can visit the ep-engine and figure
+                 out the correct flow */
                 status = ENGINE_SUCCESS;
             }
         } else {
@@ -1588,7 +1804,7 @@ void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
                         // updated and queued
                         // Hence test the CAS value to be the same first.
                         // exptime mutated, schedule it into new checkpoint
-                        queueDirty(vb, v, &blh);
+                        queueDirty(vb, v, &blh, NULL);
                     }
                 } else if (status == ENGINE_KEY_ENOENT) {
                     v->setStoredValueState(StoredValue::state_non_existent_key);
@@ -1631,7 +1847,6 @@ void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
 
 void EventuallyPersistentStore::bgFetch(const std::string &key,
                                         uint16_t vbucket,
-                                        uint64_t rowid,
                                         const void *cookie,
                                         bool isMeta) {
     std::stringstream ss;
@@ -1655,7 +1870,7 @@ void EventuallyPersistentStore::bgFetch(const std::string &key,
         stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
                                             bgFetchQueue.load());
         ExecutorPool* iom = ExecutorPool::get();
-        ExTask task = new BGFetchTask(&engine, key, vbucket, rowid, cookie,
+        ExTask task = new BGFetchTask(&engine, key, vbucket, cookie,
                                       isMeta,
                                       Priority::BgFetcherGetMetaPriority,
                                       bgFetchDelay, false);
@@ -1672,7 +1887,9 @@ GetValue EventuallyPersistentStore::getInternal(const std::string &key,
                                                 bool queueBG,
                                                 bool honorStates,
                                                 vbucket_state_t allowedState,
-                                                bool trackReference) {
+                                                bool trackReference,
+                                                bool deleteTempItem,
+                                                bool hideLockedCAS) {
 
     vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
         vbucket_state_replica : vbucket_state_active;
@@ -1697,34 +1914,55 @@ GetValue EventuallyPersistentStore::getInternal(const std::string &key,
     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
                                      trackReference);
     if (v) {
-        if (v->isDeleted() || v->isTempDeletedItem() ||
-            v->isTempNonExistentItem()) {
+        if (v->isDeleted()) {
+            GetValue rv;
+            return rv;
+        }
+        if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
+            // Delete a temp non-existent item to ensure that
+            // if the get were issued over an item that doesn't
+            // exist, then we dont preserve a temp item.
+            if (deleteTempItem) {
+                vb->ht.unlocked_del(key, bucket_num);
+            }
             GetValue rv;
             return rv;
         }
+
         // If the value is not resident, wait for it...
         if (!v->isResident()) {
             if (queueBG) {
-                bgFetch(key, vbucket, v->getBySeqno(), cookie);
+                bgFetch(key, vbucket, cookie);
             }
             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
                             true, v->getNRUValue());
         }
 
-        GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
-                    ENGINE_SUCCESS, v->getBySeqno(), false, v->getNRUValue());
+        // Should we hide (return -1) for the items' CAS?
+        const bool hide_cas = hideLockedCAS &&
+                              v->isLocked(ep_current_time());
+        GetValue rv(v->toItem(hide_cas, vbucket), ENGINE_SUCCESS,
+                    v->getBySeqno(), false, v->getNRUValue());
         return rv;
     } else {
         if (eviction_policy == VALUE_ONLY || diskFlushAll) {
             GetValue rv;
             return rv;
         }
-        ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
-        if (queueBG) { // Full eviction and need a bg fetch.
-            ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
-                                       cookie, false);
+
+        if (vb->maybeKeyExistsInFilter(key)) {
+            ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
+            if (queueBG) { // Full eviction and need a bg fetch.
+                ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
+                                           cookie, false);
+            }
+            return GetValue(NULL, ec, -1, true);
+        } else {
+            // As bloomfilter predicted that item surely doesn't exist
+            // on disk, return ENONET, for getInternal().
+            GetValue rv;
+            return rv;
         }
-        return GetValue(NULL, ec, -1, true);
     }
 }
 
@@ -1773,6 +2011,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
                                                         const void *cookie,
                                                         ItemMetaData &metadata,
                                                         uint32_t &deleted,
+                                                        uint8_t &confResMode,
                                                         bool trackReferenced)
 {
     (void) cookie;
@@ -1793,7 +2032,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
         stats.numOpsGetMeta++;
 
         if (v->isTempInitialItem()) { // Need bg meta fetch.
-            bgFetch(key, vbucket, -1, cookie, true);
+            bgFetch(key, vbucket, cookie, true);
             return ENGINE_EWOULDBLOCK;
         } else if (v->isTempNonExistentItem()) {
             metadata.cas = v->getCas();
@@ -1812,6 +2051,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
             metadata.flags = v->getFlags();
             metadata.exptime = v->getExptime();
             metadata.revSeqno = v->getRevSeqno();
+            confResMode = v->getConflictResMode();
             return ENGINE_SUCCESS;
         }
     } else {
@@ -1820,18 +2060,29 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
         // So, add a temporary item corresponding to the key to the hash table
         // and schedule a background fetch for its metadata from the persistent
         // store. The item's state will be updated after the fetch completes.
-        return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
+        //
+        // Schedule this bgFetch only if the key is predicted to be may-be
+        // existent on disk by the bloomfilter.
+
+        if (vb->maybeKeyExistsInFilter(key)) {
+            return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
+        } else {
+            return ENGINE_KEY_ENOENT;
+        }
     }
 }
 
-ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
-                                                         uint64_t cas,
-                                                         const void *cookie,
-                                                         bool force,
-                                                         bool allowExisting,
-                                                         uint8_t nru,
-                                                         bool genBySeqno,
-                                                         bool isReplication)
+ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
+                                                     const Item &itm,
+                                                     uint64_t cas,
+                                                     uint64_t *seqno,
+                                                     const void *cookie,
+                                                     bool force,
+                                                     bool allowExisting,
+                                                     uint8_t nru,
+                                                     bool genBySeqno,
+                                                     ExtendedMetaData *emd,
+                                                     bool isReplication)
 {
     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
     if (!vb) {
@@ -1852,24 +2103,49 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
         }
     }
 
+    //check for the incoming item's CAS validity
+    if (!Item::isValidCas(itm.getCas())) {
+        return ENGINE_KEY_EEXISTS;
+    }
+
     int bucket_num(0);
     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
                                           false);
 
+    bool maybeKeyExists = true;
     if (!force) {
         if (v)  {
             if (v->isTempInitialItem()) {
-                bgFetch(itm.getKey(), itm.getVBucketId(), -1, cookie, true);
+                bgFetch(itm.getKey(), itm.getVBucketId(), cookie, true);
                 return ENGINE_EWOULDBLOCK;
             }
-            if (!conflictResolver->resolve(v, itm.getMetaData(), false)) {
+
+            enum conflict_resolution_mode confResMode = revision_seqno;
+            if (emd) {
+                confResMode = static_cast<enum conflict_resolution_mode>(
+                                                       emd->getConflictResMode());
+            }
+
+            if (!conflictResolver->resolve(vb, v, itm.getMetaData(), false,
+                                           confResMode)) {
                 ++stats.numOpsSetMetaResolutionFailed;
                 return ENGINE_KEY_EEXISTS;
             }
         } else {
-            return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
-                                         cookie, true, isReplication);
+            if (vb->maybeKeyExistsInFilter(itm.getKey())) {
+                return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
+                                             cookie, true, isReplication);
+            } else {
+                maybeKeyExists = false;
+            }
+        }
+    } else {
+        if (eviction_policy == FULL_EVICTION) {
+            // Check Bloomfilter's prediction
+            if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
+                maybeKeyExists = false;
+            }
         }
     }
 
@@ -1878,9 +2154,10 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
          vb->getState() == vbucket_state_pending)) {
         v->unlock();
     }
+
     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
                                                 true, eviction_policy, nru,
-                                                isReplication);
+                                                maybeKeyExists, isReplication);
 
     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
     switch (mtype) {
@@ -1896,7 +2173,16 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
         break;
     case WAS_DIRTY:
     case WAS_CLEAN:
-        queueDirty(vb, v, &lh, false, true, genBySeqno);
+        /* set the conflict resolution mode from the extended meta data *
+         * Given that the mode is already set, we don't need to set the *
+         * conflict resolution mode in queueDirty */
+        if (emd) {
+            v->setConflictResMode(
+                      static_cast<enum conflict_resolution_mode>(
+                                            emd->getConflictResMode()));
+        }
+        vb->setMaxCas(v->getCas());
+        queueDirty(vb, v, &lh, seqno, false, true, genBySeqno, false);
         break;
     case NOT_FOUND:
         ret = ENGINE_KEY_ENOENT;
@@ -1905,14 +2191,20 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
         {            // CAS operation with non-resident item + full eviction.
             if (v) { // temp item is already created. Simply schedule a
                 lh.unlock(); // bg fetch job.
-                bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
+                bgFetch(itm.getKey(), vb->getId(), cookie, true);
                 return ENGINE_EWOULDBLOCK;
             }
+
             ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
                                         cookie, true, isReplication);
         }
     }
 
+    // Update drift counter for vbucket upon a success only
+    if (ret == ENGINE_SUCCESS && emd) {
+        vb->setDriftCounter(emd->getAdjustedTime());
+    }
+
     return ret;
 }
 
@@ -1949,7 +2241,7 @@ GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
         }
 
         if (!v->isResident()) {
-            bgFetch(key, vbucket, v->getBySeqno(), cookie);
+            bgFetch(key, vbucket, cookie);
             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
         }
         if (v->isLocked(ep_current_time())) {
@@ -1971,7 +2263,7 @@ GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
             if (vb->getState() == vbucket_state_active) {
                 // persist the item in the underlying storage for
                 // mutated exptime but only if VB is active.
-                queueDirty(vb, v, &lh);
+                queueDirty(vb, v, &lh, NULL);
             }
         }
         return rv;
@@ -1980,9 +2272,17 @@ GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
             GetValue rv;
             return rv;
         } else {
-            ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
-                                                         vb, cookie, false);
-            return GetValue(NULL, ec, -1, true);
+            if (vb->maybeKeyExistsInFilter(key)) {
+                ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
+                                                             key, vb, cookie,
+                                                             false);
+                return GetValue(NULL, ec, -1, true);
+            } else {
+                // As bloomfilter predicted that item surely doesn't exist
+                // on disk, return ENOENT for getAndUpdateTtl().
+                GetValue rv;
+                return rv;
+            }
         }
     }
 }
@@ -2052,7 +2352,7 @@ void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
                                                   uint64_t bySeqNum) {
     RememberingCallback<GetValue> gcb;
 
-    getROUnderlying(vbid)->get(key, bySeqNum, vbid, gcb);
+    getROUnderlying(vbid)->get(key, vbid, gcb);
     gcb.waitForValue();
     cb_assert(gcb.fired);
 
@@ -2073,8 +2373,9 @@ void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
                     // underlying kvstore couldn't fetch requested data
                     // log returned error and notify TMPFAIL to client
                     LOG(EXTENSION_LOG_WARNING,
-                        "Warning: failed background fetch for vb=%d seq=%d "
-                        "key=%s", vbid, v->getBySeqno(), key.c_str());
+                        "Warning: failed background fetch for vb=%d "
+                        "seq=%" PRId64 " key=%s", vbid, v->getBySeqno(),
+                        key.c_str());
                 }
             }
         }
@@ -2126,7 +2427,7 @@ bool EventuallyPersistentStore::getLocked(const std::string &key,
         // If the value is not resident, wait for it...
         if (!v->isResident()) {
             if (cookie) {
-                bgFetch(key, vbucket, v->getBySeqno(), cookie);
+                bgFetch(key, vbucket, cookie);
             }
             GetValue rv(NULL, ENGINE_EWOULDBLOCK, -1, true);
             cb.callback(rv);
@@ -2137,7 +2438,7 @@ bool EventuallyPersistentStore::getLocked(const std::string &key,
         v->lock(currentTime + lockTimeout);
 
         Item *it = v->toItem(false, vbucket);
-        it->setCas();
+        it->setCas(vb->nextHLCCas());
         v->setCas(it->getCas());
 
         GetValue rv(it);
@@ -2149,11 +2450,20 @@ bool EventuallyPersistentStore::getLocked(const std::string &key,
             cb.callback(rv);
             return true;
         } else {
-            ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
-                                                         vb, cookie, false);
-            GetValue rv(NULL, ec, -1, true);
-            cb.callback(rv);
-            return false;
+            if (vb->maybeKeyExistsInFilter(key)) {
+                ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
+                                                             key, vb, cookie,
+                                                             false);
+                GetValue rv(NULL, ec, -1, true);
+                cb.callback(rv);
+                return false;
+            } else {
+                // As bloomfilter predicted that item surely doesn't exist
+                // on disk, return ENOENT for getLocked().
+                GetValue rv;
+                cb.callback(rv);
+                return true;
+            }
         }
     }
 }
@@ -2228,7 +2538,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
         if (eviction_policy == FULL_EVICTION &&
             v->isTempInitialItem() && bgfetch) {
             lh.unlock();
-            bgFetch(key, vbucket, -1, cookie, true);
+            bgFetch(key, vbucket, cookie, true);
             return ENGINE_EWOULDBLOCK;
         }
         kstats.logically_deleted = v->isDeleted();
@@ -2242,10 +2552,13 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
         if (eviction_policy == VALUE_ONLY) {
             return ENGINE_KEY_ENOENT;
         } else {
-            if (bgfetch) {
+            if (bgfetch && vb->maybeKeyExistsInFilter(key)) {
                 return addTempItemForBgFetch(lh, bucket_num, key, vb,
                                              cookie, true);
             } else {
+                // If bgFetch were false, or bloomfilter predicted that
+                // item surely doesn't exist on disk, return ENOENT for
+                // getKeyStats().
                 return ENGINE_KEY_ENOENT;
             }
         }
@@ -2283,11 +2596,12 @@ std::string EventuallyPersistentStore::validateKey(const std::string &key,
 }
 
 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
-                                                        uint64_tcas,
+                                                        uint64_t *cas,
                                                         uint16_t vbucket,
                                                         const void *cookie,
                                                         bool force,
                                                         ItemMetaData *itemMeta,
+                                                        mutation_descr_t *mutInfo,
                                                         bool tapBackfill)
 {
     RCPtr<VBucket> vb = getVBucket(vbucket);
@@ -2312,30 +2626,54 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
         } else { // Full eviction.
             if (!force) {
                 if (!v) { // Item might be evicted from cache.
-                    return addTempItemForBgFetch(lh, bucket_num, key, vb,
-                                                 cookie, true);
+                    if (vb->maybeKeyExistsInFilter(key)) {
+                        return addTempItemForBgFetch(lh, bucket_num, key, vb,
+                                                     cookie, true);
+                    } else {
+                        // As bloomfilter predicted that item surely doesn't
+                        // exist on disk, return ENOENT for deleteItem().
+                        return ENGINE_KEY_ENOENT;
+                    }
                 } else if (v->isTempInitialItem()) {
                     lh.unlock();
-                    bgFetch(key, vbucket, -1, cookie, true);
+                    bgFetch(key, vbucket, cookie, true);
                     return ENGINE_EWOULDBLOCK;
                 } else { // Non-existent or deleted key.
+                    if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
+                        // Delete a temp non-existent item to ensure that
+                        // if a delete were issued over an item that doesn't
+                        // exist, then we don't preserve a temp item.
+                        vb->ht.unlocked_del(key, bucket_num);
+                    }
                     return ENGINE_KEY_ENOENT;
                 }
             } else {
                 if (!v) { // Item might be evicted from cache.
                     // Create a temp item and delete it below as it is a
-                    // force deletion.
-                    add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num,
-                                                              key,
-                                                              eviction_policy);
-                    if (rv == ADD_NOMEM) {
-                        return ENGINE_ENOMEM;
+                    // force deletion, only if bloomfilter predicts that
+                    // item may exist on disk.
+                    if (vb->maybeKeyExistsInFilter(key)) {
+                        add_type_t rv = vb->ht.unlocked_addTempItem(
+                                                               bucket_num,
+                                                               key,
+                                                               eviction_policy);
+                        if (rv == ADD_NOMEM) {
+                            return ENGINE_ENOMEM;
+                        }
+                        v = vb->ht.unlocked_find(key, bucket_num, true, false);
+                        v->setStoredValueState(StoredValue::state_deleted_key);
+                    } else {
+                        return ENGINE_KEY_ENOENT;
                     }
-                    v = vb->ht.unlocked_find(key, bucket_num, true, false);
-                    v->setStoredValueState(StoredValue::state_deleted_key);
                 } else if (v->isTempInitialItem()) {
                     v->setStoredValueState(StoredValue::state_deleted_key);
                 } else { // Non-existent or deleted key.
+                    if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
+                        // Delete a temp non-existent item to ensure that
+                        // if a delete were issued over an item that doesn't
+                        // exist, then we don't preserve a temp item.
+                        vb->ht.unlocked_del(key, bucket_num);
+                    }
                     return ENGINE_KEY_ENOENT;
                 }
             }
@@ -2358,6 +2696,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
     }
     *cas = v ? v->getCas() : 0;
 
+    uint64_t seqno = 0;
     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
     switch (delrv) {
     case NOMEM:
@@ -2375,12 +2714,14 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
     case NOT_FOUND:
         ret = ENGINE_KEY_ENOENT;
         if (v) {
-            queueDirty(vb, v, &lh, tapBackfill);
+            queueDirty(vb, v, &lh, NULL, tapBackfill);
         }
         break;
     case WAS_DIRTY:
     case WAS_CLEAN:
-        queueDirty(vb, v, &lh, tapBackfill);
+        queueDirty(vb, v, &lh, &seqno, tapBackfill);
+        mutInfo->seqno = seqno;
+        mutInfo->vbucket_uuid = vb->failovers->getLatestUUID();
         break;
     case NEED_BG_FETCH:
         // We already figured out if a bg fetch is requred for a full-evicted
@@ -2391,16 +2732,18 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
 }
 
 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
-                                                        const std::string &key,
-                                                        uint64_t* cas,
-                                                        uint16_t vbucket,
-                                                        const void *cookie,
-                                                        bool force,
-                                                        ItemMetaData *itemMeta,
-                                                        bool tapBackfill,
-                                                        bool genBySeqno,
-                                                        uint64_t bySeqno,
-                                                        bool isReplication)
+                                                     const std::string &key,
+                                                     uint64_t *cas,
+                                                     uint64_t *seqno,
+                                                     uint16_t vbucket,
+                                                     const void *cookie,
+                                                     bool force,
+                                                     ItemMetaData *itemMeta,
+                                                     bool tapBackfill,
+                                                     bool genBySeqno,
+                                                     uint64_t bySeqno,
+                                                     ExtendedMetaData *emd,
+                                                     bool isReplication)
 {
     RCPtr<VBucket> vb = getVBucket(vbucket);
     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
@@ -2415,28 +2758,53 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
         }
     }
 
+    //check for the incoming item's CAS validity
+    if (!Item::isValidCas(itemMeta->cas)) {
+        return ENGINE_KEY_EEXISTS;
+    }
+
     int bucket_num(0);
     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
     if (!force) { // Need conflict resolution.
         if (v)  {
             if (v->isTempInitialItem()) {
-                bgFetch(key, vbucket, -1, cookie, true);
+                bgFetch(key, vbucket, cookie, true);
                 return ENGINE_EWOULDBLOCK;
             }
-            if (!conflictResolver->resolve(v, *itemMeta, true)) {
+
+            enum conflict_resolution_mode confResMode = revision_seqno;
+            if (emd) {
+                confResMode = static_cast<enum conflict_resolution_mode>(
+                                                       emd->getConflictResMode());
+            }
+
+            if (!conflictResolver->resolve(vb, v, *itemMeta, true, confResMode)) {
                 ++stats.numOpsDelMetaResolutionFailed;
                 return ENGINE_KEY_EEXISTS;
             }
         } else {
             // Item is 1) deleted or not existent in the value eviction case OR
             // 2) deleted or evicted in the full eviction.
-            return addTempItemForBgFetch(lh, bucket_num, key, vb,
-                                         cookie, true, isReplication);
+            if (vb->maybeKeyExistsInFilter(key)) {
+                return addTempItemForBgFetch(lh, bucket_num, key, vb,
+                                             cookie, true, isReplication);
+            } else {
+                // Even though bloomfilter predicted that item doesn't exist
+                // on disk, we must put this delete on disk if the cas is valid.
+                add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
+                                                            eviction_policy,
+                                                            isReplication);
+                if (rv == ADD_NOMEM) {
+                    return ENGINE_ENOMEM;
+                }
+                v = vb->ht.unlocked_find(key, bucket_num, true, false);
+                v->setStoredValueState(StoredValue::state_deleted_key);
+            }
         }
     } else {
         if (!v) {
-            // Create a temp item and delete it below as it is a force deletion
+            // We should always try to persist a delete here.
             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
                                                         eviction_policy,
                                                         isReplication);
@@ -2445,8 +2813,10 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
             }
             v = vb->ht.unlocked_find(key, bucket_num, true, false);
             v->setStoredValueState(StoredValue::state_deleted_key);
+            v->setCas(*cas);
         } else if (v->isTempInitialItem()) {
             v->setStoredValueState(StoredValue::state_deleted_key);
+            v->setCas(*cas);
         }
     }
 
@@ -2482,14 +2852,29 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
         if (!genBySeqno) {
             v->setBySeqno(bySeqno);
         }
-        queueDirty(vb, v, &lh, tapBackfill, true, genBySeqno);
+
+        /* set the conflict resolution mode from the extended meta data *
+         * Given that the mode is already set, we don't need to set the *
+         * conflict resolution mode in queueDirty */
+        if (emd) {
+            v->setConflictResMode(
+               static_cast<enum conflict_resolution_mode>(
+                                         emd->getConflictResMode()));
+        }
+        vb->setMaxCas(v->getCas());
+        queueDirty(vb, v, &lh, seqno, tapBackfill, true, genBySeqno, false);
         break;
     case NEED_BG_FETCH:
         lh.unlock();
-        bgFetch(key, vbucket, -1, cookie, true);
+        bgFetch(key, vbucket, cookie, true);
         ret = ENGINE_EWOULDBLOCK;
     }
 
+    // Update drift counter for vbucket upon a success only
+    if (ret == ENGINE_SUCCESS && emd) {
+        vb->setDriftCounter(emd->getAdjustedTime());
+    }
+
     return ret;
 }
 
@@ -2503,16 +2888,15 @@ void EventuallyPersistentStore::reset() {
             vb->ht.clear();
             vb->checkpointManager.clear(vb->getState());
             vb->resetStats();
-            vb->setCurrentSnapshot(0, 0);
+            vb->setPersistedSnapshot(0, 0);
         }
     }
 
-    bool inverse = false;
-    if (diskFlushAll.compare_exchange_strong(inverse, true)) {
-        ++stats.diskQueueSize;
-        // wake up (notify) one flusher is good enough for diskFlushAll
-        vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
-    }
+    ++stats.diskQueueSize;
+    bool inverse = true;
+    flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, false);
+    // Waking up (notifying) one flusher is good enough for diskFlushAll
+    vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
 }
 
 /**
@@ -2554,7 +2938,7 @@ public:
                         ++vbucket->opsCreate;
                         vbucket->incrMetaDataDisk(*queuedItem);
                     } else { // Update in full eviction mode.
-                        --vbucket->ht.numTotalItems;
+                        vbucket->ht.decrNumTotalItems();
                         ++vbucket->opsUpdate;
                     }
                     v->setNewCacheItem(false);
@@ -2633,10 +3017,16 @@ public:
                     // exists on DB file, but not in memory (i.e., full eviction),
                     // because we created the temp item in memory and incremented
                     // the item counter when a deletion is pushed in the queue.
-                    --vbucket->ht.numTotalItems;
+                    vbucket->ht.decrNumTotalItems();
                 }
             }
 
+            /**
+             * Deleted items are to be added to the bloomfilter,
+             * in either eviction policy.
+             */
+            vbucket->addToFilter(queuedItem->getKey());
+
             if (value > 0) {
                 ++stats->totalPersisted;
                 ++vbucket->opsDelete;
@@ -2677,6 +3067,32 @@ private:
     DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
 };
 
+bool EventuallyPersistentStore::scheduleFlushAllTask(const void* cookie,
+                                                     time_t when) {
+    bool inverse = false;
+    if (diskFlushAll.compare_exchange_strong(inverse, true)) {
+        flushAllTaskCtx.cookie = cookie;
+        flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
+        ExTask task = new FlushAllTask(&engine, static_cast<double>(when));
+        ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+void EventuallyPersistentStore::setFlushAllComplete() {
+    // Notify memcached about flushAll task completion, and
+    // set diskFlushall flag to false
+    if (flushAllTaskCtx.cookie) {
+        engine.notifyIOComplete(flushAllTaskCtx.cookie, ENGINE_SUCCESS);
+    }
+    bool inverse = false;
+    flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
+    inverse = true;
+    diskFlushAll.compare_exchange_strong(inverse, false);
+}
+
 void EventuallyPersistentStore::flushOneDeleteAll() {
     for (size_t i = 0; i < vbMap.getSize(); ++i) {
         RCPtr<VBucket> vb = getVBucket(i);
@@ -2686,14 +3102,13 @@ void EventuallyPersistentStore::flushOneDeleteAll() {
         }
     }
 
-    bool inverse = true;
-    diskFlushAll.compare_exchange_strong(inverse, false);
     stats.decrDiskQueueSize(1);
+    setFlushAllComplete();
 }
 
 int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
     KVShard *shard = vbMap.getShard(vbid);
-    if (diskFlushAll) {
+    if (diskFlushAll && !flushAllTaskCtx.delayFlushAll) {
         if (shard->getId() == EP_PRIMARY_SHARD) {
             flushOneDeleteAll();
         } else {
@@ -2725,14 +3140,11 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
             vb->rejectQueue.pop();
         }
 
-        LockHolder slh = vb->getSnapshotLock();
-        uint64_t snapStart;
-        uint64_t snapEnd;
-        vb->getCurrentSnapshot_UNLOCKED(snapStart, snapEnd);
-
+        const std::string cursor(CheckpointManager::pCursorName);
         vb->getBackfillItems(items);
-        vb->checkpointManager.getAllItemsForPersistence(items);
-        slh.unlock();
+
+        snapshot_range_t range;
+        range = vb->checkpointManager.getAllItemsForCursor(cursor, items);
 
         if (!items.empty()) {
             while (!rwUnderlying->begin()) {
@@ -2745,6 +3157,7 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
 
             Item *prev = NULL;
             uint64_t maxSeqno = 0;
+            uint64_t maxCas = 0;
             std::list<PersistenceCallback*> pcbs;
             std::vector<queued_item>::iterator it = items.begin();
             for(; it != items.end(); ++it) {
@@ -2758,7 +3171,9 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
                     if (cb) {
                         pcbs.push_back(cb);
                     }
+
                     maxSeqno = std::max(maxSeqno, (uint64_t)(*it)->getBySeqno());
+                    maxCas = std::max(maxCas, (uint64_t)(*it)->getCas());
                     ++stats.flusher_todo;
                 } else {
                     stats.decrDiskQueueSize(1);
@@ -2771,11 +3186,12 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
             hrtime_t start = gethrtime();
 
             if (vb->getState() == vbucket_state_active) {
-                snapStart = maxSeqno;
-                snapEnd = maxSeqno;
+                range.start = maxSeqno;
+                range.end = maxSeqno;
             }
 
-            while (!rwUnderlying->commit(&cb, snapStart, snapEnd)) {
+            while (!rwUnderlying->commit(&cb, range.start, range.end, maxCas,
+                                         vb->getDriftCounter())) {
                 ++stats.commitFailed;
                 LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
                     "1 sec...\n");
@@ -2784,10 +3200,12 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
             }
 
             if (vb->rejectQueue.empty()) {
+                vb->setPersistedSnapshot(range.start, range.end);
                 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
                 if (highSeqno > 0 &&
                     highSeqno != vbMap.getPersistenceSeqno(vbid)) {
                     vbMap.setPersistenceSeqno(vbid, highSeqno);
+                    vb->notifySeqnoPersisted(highSeqno);
                 }
             }
 
@@ -2826,6 +3244,8 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
             if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
                 vbMap.setPersistenceCheckpointId(vbid, chkid);
             }
+        } else {
+            return RETRY_FLUSH_VBUCKET;
         }
     }
 
@@ -2885,24 +3305,26 @@ EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
 void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
                                            StoredValue* v,
                                            LockHolder *plh,
+                                           uint64_t *seqno,
                                            bool tapBackfill,
                                            bool notifyReplicator,
-                                           bool genBySeqno) {
+                                           bool genBySeqno,
+                                           bool setConflictMode) {
     if (vb) {
+        if (setConflictMode && (v->getConflictResMode() != last_write_wins) &&
+            vb->isTimeSyncEnabled()) {
+            v->setConflictResMode(last_write_wins);
+        }
+
         queued_item qi(v->toItem(false, vb->getId()));
-        bool rv;
-        if (genBySeqno) {
-            LockHolder slh = vb->getSnapshotLock();
-            rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
-                               vb->checkpointManager.queueDirty(vb, qi,
-                                                                genBySeqno);
-            v->setBySeqno(qi->getBySeqno());
-            vb->setCurrentSnapshot_UNLOCKED(qi->getBySeqno(), qi->getBySeqno());
-        } else {
-            rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
-                               vb->checkpointManager.queueDirty(vb, qi,
-                                                                genBySeqno);
-            v->setBySeqno(qi->getBySeqno());
+
+        bool rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
+                                vb->checkpointManager.queueDirty(vb, qi,
+                                                                 genBySeqno);
+        v->setBySeqno(qi->getBySeqno());
+
+        if (seqno) {
+            *seqno = v->getBySeqno();
         }
 
         if (plh) {
@@ -3128,6 +3550,22 @@ void EventuallyPersistentStore::resetAccessScannerStartTime() {
     }
 }
 
+void EventuallyPersistentStore::setAllBloomFilters(bool to) {
+    size_t maxSize = vbMap.getSize();
+    cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
+    for (size_t i = 0; i < maxSize; i++) {
+        uint16_t vbid = static_cast<uint16_t>(i);
+        RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+        if (vb) {
+            if (to) {
+                vb->setFilterStatus(BFILTER_ENABLED);
+            } else {
+                vb->setFilterStatus(BFILTER_DISABLED);
+            }
+        }
+    }
+}
+
 void EventuallyPersistentStore::visit(VBucketVisitor &visitor)
 {
     size_t maxSize = vbMap.getSize();
@@ -3146,6 +3584,38 @@ void EventuallyPersistentStore::visit(VBucketVisitor &visitor)
     visitor.complete();
 }
 
+EventuallyPersistentStore::Position
+EventuallyPersistentStore::pauseResumeVisit(PauseResumeEPStoreVisitor& visitor,
+                                            Position& start_pos)
+{
+    const size_t maxSize = vbMap.getSize();
+
+    uint16_t vbid = start_pos.vbucket_id;
+    for (; vbid < maxSize; ++vbid) {
+        RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+        if (vb) {
+            bool paused = !visitor.visit(vbid, vb->ht);
+            if (paused) {
+                break;
+            }
+        }
+    }
+
+    return EventuallyPersistentStore::Position(vbid);
+}
+
+EventuallyPersistentStore::Position
+EventuallyPersistentStore::startPosition() const
+{
+    return EventuallyPersistentStore::Position(0);
+}
+
+EventuallyPersistentStore::Position
+EventuallyPersistentStore::endPosition() const
+{
+    return EventuallyPersistentStore::Position(vbMap.getSize());
+}
+
 VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s,
                          shared_ptr<VBucketVisitor> v,
                          const char *l, const Priority &p, double sleep) :
@@ -3300,9 +3770,8 @@ EventuallyPersistentStore::rollback(uint16_t vbid,
         if (result.success) {
             RCPtr<VBucket> vb = vbMap.getBucket(vbid);
             vb->failovers->pruneEntries(result.highSeqno);
-            vb->checkpointManager.clear(vb->getState());
-            vb->checkpointManager.setBySeqno(result.highSeqno);
-            vb->setCurrentSnapshot(result.snapStartSeqno, result.snapEndSeqno);
+            vb->checkpointManager.clear(vb, result.highSeqno);
+            vb->setPersistedSnapshot(result.snapStartSeqno, result.snapEndSeqno);
             return ENGINE_SUCCESS;
         }
     }
@@ -3312,3 +3781,13 @@ EventuallyPersistentStore::rollback(uint16_t vbid,
     }
     return ENGINE_NOT_MY_VBUCKET;
 }
+
+void EventuallyPersistentStore::runDefragmenterTask() {
+    defragmenterTask->run();
+}
+
+std::ostream& operator<<(std::ostream& os,
+                         const EventuallyPersistentStore::Position& pos) {
+    os << "vbucket:" << pos.vbucket_id;
+    return os;
+}