Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / ep.cc
index 11e6e8f..0ca9b4b 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -32,6 +32,7 @@
 #include "access_scanner.h"
 #include "checkpoint_remover.h"
 #include "conflict_resolution.h"
+#include "defragmenter.h"
 #include "ep.h"
 #include "ep_engine.h"
 #include "failover-table.h"
 
 class StatsValueChangeListener : public ValueChangedListener {
 public:
-    StatsValueChangeListener(EPStats &st) : stats(st) {
+    StatsValueChangeListener(EPStats &st, EventuallyPersistentStore &str)
+        : stats(st), store(str) {
         // EMPTY
     }
 
     virtual void sizeValueChanged(const std::string &key, size_t value) {
         if (key.compare("max_size") == 0) {
             stats.setMaxDataSize(value);
+            store.getEPEngine().getDcpConnMap(). \
+                                     updateMaxActiveSnoozingBackfills(value);
             size_t low_wat = static_cast<size_t>
-                             (static_cast<double>(value) * 0.6);
+                             (static_cast<double>(value) * 0.75);
             size_t high_wat = static_cast<size_t>(
-                              static_cast<double>(value) * 0.75);
+                              static_cast<double>(value) * 0.85);
             stats.mem_low_wat.store(low_wat);
             stats.mem_high_wat.store(high_wat);
         } else if (key.compare("mem_low_wat") == 0) {
@@ -80,6 +84,7 @@ public:
 
 private:
     EPStats &stats;
+    EventuallyPersistentStore &store;
 };
 
 /**
@@ -129,6 +134,14 @@ public:
             } else {
                 store.disableAccessScannerTask();
             }
+        } else if (key.compare("bfilter_enabled") == 0) {
+            store.setAllBloomFilters(value);
+        }
+    }
+
+    virtual void floatValueChanged(const std::string &key, float value) {
+        if (key.compare("bfilter_residency_threshold") == 0) {
+            store.setBfiltersResidencyThreshold(value);
         }
     }
 
@@ -141,7 +154,7 @@ public:
     VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
                               RCPtr<VBucket> &vb, double delay) :
                               GlobalTask(&eng,
-                              Priority::VBMemoryDeletionPriority, delay, true),
+                              TaskId::VBucketMemoryDeletionTask, delay, true),
                               e(eng), vbucket(vb), vbid(vb->getId()) { }
 
     std::string getDescription() {
@@ -166,7 +179,7 @@ private:
 class PendingOpsNotification : public GlobalTask {
 public:
     PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
-        GlobalTask(&e, Priority::VBMemoryDeletionPriority, 0, false),
+        GlobalTask(&e, TaskId::PendingOpsNotification, 0, false),
         engine(e), vbucket(vb) { }
 
     std::string getDescription() {
@@ -189,8 +202,10 @@ EventuallyPersistentStore::EventuallyPersistentStore(
     EventuallyPersistentEngine &theEngine) :
     engine(theEngine), stats(engine.getEpStats()),
     vbMap(theEngine.getConfiguration(), *this),
+    defragmenterTask(NULL),
     bgFetchQueue(0),
-    diskFlushAll(false), bgFetchDelay(0), backfillMemoryThreshold(0.95),
+    diskFlushAll(false), bgFetchDelay(0),
+    backfillMemoryThreshold(0.95),
     statsSnapshotTaskId(0), lastTransTimePerItem(0)
 {
     cachedResidentRatio.activeRatio.store(0);
@@ -209,10 +224,10 @@ EventuallyPersistentStore::EventuallyPersistentStore(
 
     storageProperties = new StorageProperties(true, true, true, true);
 
-    stats.schedulingHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
-    stats.taskRuntimeHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
+    stats.schedulingHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
+    stats.taskRuntimeHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
 
-    for (size_t i = 0; i < MAX_TYPE_ID; i++) {
+    for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
         stats.schedulingHisto[i].reset();
         stats.taskRuntimeHisto[i].reset();
     }
@@ -229,26 +244,27 @@ EventuallyPersistentStore::EventuallyPersistentStore(
     stats.memOverhead = sizeof(EventuallyPersistentStore);
 
     if (config.getConflictResolutionType().compare("seqno") == 0) {
-        conflictResolver = new SeqBasedResolution();
+        conflictResolver = new ConflictResolution();
     }
 
     stats.setMaxDataSize(config.getMaxSize());
     config.addValueChangedListener("max_size",
-                                   new StatsValueChangeListener(stats));
+                                   new StatsValueChangeListener(stats, *this));
+    getEPEngine().getDcpConnMap().updateMaxActiveSnoozingBackfills(config.getMaxSize());
 
     stats.mem_low_wat.store(config.getMemLowWat());
     config.addValueChangedListener("mem_low_wat",
-                                   new StatsValueChangeListener(stats));
+                                   new StatsValueChangeListener(stats, *this));
 
     stats.mem_high_wat.store(config.getMemHighWat());
     config.addValueChangedListener("mem_high_wat",
-                                   new StatsValueChangeListener(stats));
+                                   new StatsValueChangeListener(stats, *this));
 
     stats.tapThrottleThreshold.store(static_cast<double>
                                     (config.getTapThrottleThreshold())
                                      / 100.0);
     config.addValueChangedListener("tap_throttle_threshold",
-                                   new StatsValueChangeListener(stats));
+                                   new StatsValueChangeListener(stats, *this));
 
     stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
     config.addValueChangedListener("tap_throttle_queue_cap",
@@ -263,11 +279,11 @@ EventuallyPersistentStore::EventuallyPersistentStore(
     stats.warmupMemUsedCap.store(static_cast<double>
                                (config.getWarmupMinMemoryThreshold()) / 100.0);
     config.addValueChangedListener("warmup_min_memory_threshold",
-                                   new StatsValueChangeListener(stats));
+                                   new StatsValueChangeListener(stats, *this));
     stats.warmupNumReadCap.store(static_cast<double>
                                 (config.getWarmupMinItemsThreshold()) / 100.0);
     config.addValueChangedListener("warmup_min_items_threshold",
-                                   new StatsValueChangeListener(stats));
+                                   new StatsValueChangeListener(stats, *this));
 
     double mem_threshold = static_cast<double>
                                       (config.getMutationMemThreshold()) / 100;
@@ -281,6 +297,13 @@ EventuallyPersistentStore::EventuallyPersistentStore(
     config.addValueChangedListener("backfill_mem_threshold",
                                    new EPStoreValueChangeListener(*this));
 
+    config.addValueChangedListener("bfilter_enabled",
+                                   new EPStoreValueChangeListener(*this));
+
+    bfilterResidencyThreshold = config.getBfilterResidencyThreshold();
+    config.addValueChangedListener("bfilter_residency_threshold",
+                                   new EPStoreValueChangeListener(*this));
+
     compactionExpMemThreshold = config.getCompactionExpMemThreshold();
     config.addValueChangedListener("compaction_exp_mem_threshold",
                                    new EPStoreValueChangeListener(*this));
@@ -348,13 +371,22 @@ bool EventuallyPersistentStore::initialize() {
     ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
     ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
 
+#if HAVE_JEMALLOC
+    /* Only create the defragmenter task if we have an underlying memory
+     * allocator which can facilitate defragmenting memory.
+     */
+    defragmenterTask = new DefragmenterTask(&engine, stats);
+    ExecutorPool::get()->schedule(defragmenterTask, NONIO_TASK_IDX);
+#endif
+
     return true;
 }
 
 EventuallyPersistentStore::~EventuallyPersistentStore() {
     stopWarmup();
     stopBgFetcher();
-    ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX);
+    ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX,
+                                       stats.forceShutdown);
 
     ExecutorPool::get()->cancel(statsSnapshotTaskId);
     LockHolder lh(accessScanner.mutex);
@@ -362,7 +394,8 @@ EventuallyPersistentStore::~EventuallyPersistentStore() {
     lh.unlock();
 
     stopFlusher();
-    ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine());
+    ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine(),
+                                          stats.forceShutdown);
 
     delete [] vb_mutexes;
     delete [] schedule_vbstate_persist;
@@ -371,6 +404,7 @@ EventuallyPersistentStore::~EventuallyPersistentStore() {
     delete conflictResolver;
     delete warmupTask;
     delete storageProperties;
+    defragmenterTask.reset();
 
     std::vector<MutationLog*>::iterator it;
     for (it = accessLog.begin(); it != accessLog.end(); it++) {
@@ -397,6 +431,8 @@ bool EventuallyPersistentStore::startFlusher() {
 void EventuallyPersistentStore::stopFlusher() {
     for (uint16_t i = 0; i < vbMap.numShards; i++) {
         Flusher *flusher = vbMap.shards[i]->getFlusher();
+        LOG(EXTENSION_LOG_WARNING, "Attempting to stop the flusher for "
+            "shard:%" PRIu16, i);
         bool rv = flusher->stop(stats.forceShutdown);
         if (rv && !stats.forceShutdown) {
             flusher->wait();
@@ -445,7 +481,7 @@ bool EventuallyPersistentStore::startBgFetcher() {
         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
         if (bgfetcher == NULL) {
             LOG(EXTENSION_LOG_WARNING,
-                "Falied to start bg fetcher for shard %d", i);
+                "Failed to start bg fetcher for shard %d", i);
             return false;
         }
         bgfetcher->start();
@@ -461,7 +497,7 @@ void EventuallyPersistentStore::stopBgFetcher() {
                 "Shutting down engine while there are still pending data "
                 "read for shard %d from database storage", i);
         }
-        LOG(EXTENSION_LOG_INFO, "Stopping bg fetcher for underlying storage");
+        LOG(EXTENSION_LOG_WARNING, "Stopping bg fetcher for shard:%" PRIu16, i);
         bgfetcher->stop();
     }
 }
@@ -488,22 +524,25 @@ EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
                     cb_assert(deleted);
                 } else if (v->isExpired(startTime) && !v->isDeleted()) {
                     vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
-                    queueDirty(vb, v, &lh, false);
+                    queueDirty(vb, v, &lh, NULL, false);
                 }
             } else {
                 if (eviction_policy == FULL_EVICTION) {
                     // Create a temp item and delete and push it
-                    // into the checkpoint queue.
-                    add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
-                                                                eviction_policy);
-                    if (rv == ADD_NOMEM) {
-                        return;
+                    // into the checkpoint queue, only if the bloomfilter
+                    // predicts that the item may exist on disk.
+                    if (vb->maybeKeyExistsInFilter(key)) {
+                        add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
+                                                                    eviction_policy);
+                        if (rv == ADD_NOMEM) {
+                            return;
+                        }
+                        v = vb->ht.unlocked_find(key, bucket_num, true, false);
+                        v->setStoredValueState(StoredValue::state_deleted_key);
+                        v->setRevSeqno(revSeqno);
+                        vb->ht.unlocked_softDelete(v, 0, eviction_policy);
+                        queueDirty(vb, v, &lh, NULL, false);
                     }
-                    v = vb->ht.unlocked_find(key, bucket_num, true, false);
-                    v->setStoredValueState(StoredValue::state_deleted_key);
-                    v->setRevSeqno(revSeqno);
-                    vb->ht.unlocked_softDelete(v, 0, eviction_policy);
-                    queueDirty(vb, v, &lh, false);
                 }
             }
         }
@@ -539,7 +578,7 @@ StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
             if (queueExpired && vb->getState() == vbucket_state_active) {
                 incExpirationStat(vb, false);
                 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
-                queueDirty(vb, v, NULL, false, true);
+                queueDirty(vb, v, NULL, NULL, false, true);
             }
             return wantDeleted ? v : NULL;
         }
@@ -547,6 +586,21 @@ StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
     return v;
 }
 
+bool EventuallyPersistentStore::isMetaDataResident(RCPtr<VBucket> &vb,
+                                                   const std::string &key) {
+
+    cb_assert(vb);
+    int bucket_num(0);
+    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
+    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, false, false);
+
+    if (v && !v->isTempItem()) {
+        return true;
+    } else {
+        return false;
+    }
+}
+
 protocol_binary_response_status EventuallyPersistentStore::evictKey(
                                                         const std::string &key,
                                                         uint16_t vbucket,
@@ -572,6 +626,11 @@ protocol_binary_response_status EventuallyPersistentStore::evictKey(
         if (v->isResident()) {
             if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
                 *msg = "Ejected.";
+
+                // Add key to bloom filter incase of full eviction mode
+                if (getItemEvictionPolicy() == FULL_EVICTION) {
+                    vb->addToFilter(key);
+                }
             } else {
                 *msg = "Can't eject: Dirty object.";
                 rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
@@ -614,7 +673,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
             abort();
         case ADD_BG_FETCH:
             lock.unlock();
-            bgFetch(key, vb->getId(), -1, cookie, metadataOnly);
+            bgFetch(key, vb->getId(), cookie, metadataOnly);
     }
     return ENGINE_EWOULDBLOCK;
 }
@@ -655,10 +714,23 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
          vb->getState() == vbucket_state_pending)) {
         v->unlock();
     }
-    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(),
-                                                true, false,
-                                                eviction_policy, nru);
 
+    bool maybeKeyExists = true;
+    // Check Bloomfilter's prediction if in full eviction policy
+    // and for a CAS operation only.
+    if (eviction_policy == FULL_EVICTION && itm.getCas() != 0) {
+        // Check Bloomfilter's prediction
+        if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
+            maybeKeyExists = false;
+        }
+    }
+
+    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(), true, false,
+                                                eviction_policy, nru,
+                                                maybeKeyExists);
+
+    Item& it = const_cast<Item&>(itm);
+    uint64_t seqno = 0;
     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
     switch (mtype) {
     case NOMEM:
@@ -678,20 +750,23 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
         // Even if the item was dirty, push it into the vbucket's open
         // checkpoint.
     case WAS_CLEAN:
-        queueDirty(vb, v, &lh);
+        it.setCas(vb->nextHLCCas());
+        v->setCas(it.getCas());
+        queueDirty(vb, v, &lh, &seqno);
+        it.setBySeqno(seqno);
         break;
     case NEED_BG_FETCH:
-        { // CAS operation with non-resident item + full eviction.
-            if (v) {
-                // temp item is already created. Simply schedule a bg fetch job
-                lh.unlock();
-                bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
-                return ENGINE_EWOULDBLOCK;
-            }
-            ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
-                                        cookie, true);
-            break;
+    {   // CAS operation with non-resident item + full eviction.
+        if (v) {
+            // temp item is already created. Simply schedule a bg fetch job
+            lh.unlock();
+            bgFetch(itm.getKey(), vb->getId(), cookie, true);
+            return ENGINE_EWOULDBLOCK;
         }
+        ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
+                                    cookie, true);
+        break;
+    }
     case INVALID_VBUCKET:
         ret = ENGINE_NOT_MY_VBUCKET;
         break;
@@ -731,26 +806,43 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
                                           false);
+
+    bool maybeKeyExists = true;
+    if (eviction_policy == FULL_EVICTION) {
+        // Check bloomfilter's prediction
+        if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
+            maybeKeyExists = false;
+        }
+    }
+
     add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
-                                           eviction_policy);
+                                           eviction_policy,
+                                           true, true,
+                                           maybeKeyExists);
 
+    Item& it = const_cast<Item&>(itm);
+    uint64_t seqno = 0;
     switch (atype) {
     case ADD_NOMEM:
         return ENGINE_ENOMEM;
     case ADD_EXISTS:
         return ENGINE_NOT_STORED;
     case ADD_TMP_AND_BG_FETCH:
-        return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
+        return addTempItemForBgFetch(lh, bucket_num, it.getKey(), vb,
                                      cookie, true);
     case ADD_BG_FETCH:
         lh.unlock();
-        bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
+        bgFetch(it.getKey(), vb->getId(), cookie, true);
         return ENGINE_EWOULDBLOCK;
     case ADD_SUCCESS:
     case ADD_UNDEL:
-        queueDirty(vb, v, &lh);
+        it.setCas(vb->nextHLCCas());
+        v->setCas(it.getCas());
+        queueDirty(vb, v, &lh, &seqno);
+        it.setBySeqno(seqno);
         break;
     }
+
     return ENGINE_SUCCESS;
 }
 
@@ -793,6 +885,8 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
                                         0xff);
         }
 
+        Item& it = const_cast<Item&>(itm);
+        uint64_t seqno = 0;
         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
         switch (mtype) {
             case NOMEM:
@@ -810,13 +904,16 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
                 // Even if the item was dirty, push it into the vbucket's open
                 // checkpoint.
             case WAS_CLEAN:
-                queueDirty(vb, v, &lh);
+                it.setCas(vb->nextHLCCas());
+                v->setCas(it.getCas());
+                queueDirty(vb, v, &lh, &seqno);
+                it.setBySeqno(seqno);
                 break;
             case NEED_BG_FETCH:
             {
                 // temp item is already created. Simply schedule a bg fetch job
                 lh.unlock();
-                bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
+                bgFetch(it.getKey(), vb->getId(), cookie, true);
                 ret = ENGINE_EWOULDBLOCK;
                 break;
             }
@@ -831,14 +928,22 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
             return ENGINE_KEY_ENOENT;
         }
 
-        return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
-                                     cookie, false);
+        if (vb->maybeKeyExistsInFilter(itm.getKey())) {
+            return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
+                                         cookie, false);
+        } else {
+            // As bloomfilter predicted that item surely doesn't exist
+            // on disk, return ENOENT for replace().
+            return ENGINE_KEY_ENOENT;
+        }
     }
 }
 
-ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
-                                                                uint8_t nru,
-                                                                bool genBySeqno) {
+ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(
+                                                        const Item &itm,
+                                                        uint8_t nru,
+                                                        bool genBySeqno,
+                                                        ExtendedMetaData *emd) {
 
     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
     if (!vb) {
@@ -855,6 +960,11 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
         return ENGINE_NOT_MY_VBUCKET;
     }
 
+    //check for the incoming item's CAS validity
+    if (!Item::isValidCas(itm.getCas())) {
+        return ENGINE_KEY_EEXISTS;
+    }
+
     int bucket_num(0);
     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
@@ -883,7 +993,16 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
     case NOT_FOUND:
         // FALLTHROUGH
     case WAS_CLEAN:
-        queueDirty(vb, v, &lh, true, true, genBySeqno);
+        /* set the conflict resolution mode from the extended meta data *
+         * Given that the mode is already set, we don't need to set the *
+         * conflict resolution mode in queueDirty */
+        if (emd) {
+            v->setConflictResMode(
+                 static_cast<enum conflict_resolution_mode>(
+                                      emd->getConflictResMode()));
+        }
+        vb->setMaxCas(v->getCas());
+        queueDirty(vb, v, &lh, NULL,true, true, genBySeqno, false);
         break;
     case INVALID_VBUCKET:
         ret = ENGINE_NOT_MY_VBUCKET;
@@ -893,6 +1012,11 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
         abort();
     }
 
+    // Update drift counter for vbucket upon a success only
+    if (ret == ENGINE_SUCCESS && emd) {
+        vb->setDriftCounter(emd->getAdjustedTime());
+    }
+
     return ret;
 }
 
@@ -913,7 +1037,7 @@ class KVStatsCallback : public Callback<kvstats_ctx> {
         EventuallyPersistentStore *epstore;
 };
 
-void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
+void EventuallyPersistentStore::snapshotVBuckets(VBSnapshotTask::Priority prio,
                                                  uint16_t shardId) {
 
     class VBucketStateVisitor : public VBucketVisitor {
@@ -922,15 +1046,15 @@ void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
             : vbuckets(vb_map), shardId(sid) { }
         bool visitBucket(RCPtr<VBucket> &vb) {
             if (vbuckets.getShard(vb->getId())->getId() == shardId) {
-                uint64_t snapStart = 0;
-                uint64_t snapEnd = 0;
+                snapshot_range_t range;
+                vb->getPersistedSnapshot(range);
                 std::string failovers = vb->failovers->toJSON();
                 uint64_t chkId = vbuckets.getPersistenceCheckpointId(vb->getId());
 
-                vb->getCurrentSnapshot(snapStart, snapEnd);
                 vbucket_state vb_state(vb->getState(), chkId, 0,
                                        vb->getHighSeqno(), vb->getPurgeSeqno(),
-                                       snapStart, snapEnd, failovers);
+                                       range.start, range.end, vb->getMaxCas(),
+                                       vb->getDriftCounter() ,failovers);
                 states.insert(std::pair<uint16_t, vbucket_state>(vb->getId(), vb_state));
             }
             return false;
@@ -948,7 +1072,7 @@ void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
     };
 
     KVShard *shard = vbMap.shards[shardId];
-    if (priority == Priority::VBucketPersistLowPriority) {
+    if (prio == VBSnapshotTask::Priority::LOW) {
         shard->setLowPriorityVbSnapshotFlag(false);
     } else {
         shard->setHighPriorityVbSnapshotFlag(false);
@@ -975,7 +1099,7 @@ void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
             break;
         }
 
-        if (priority == Priority::VBucketPersistHighPriority) {
+        if (prio == VBSnapshotTask::Priority::HIGH) {
             if (vbMap.setBucketCreation(iter->first, false)) {
                 LOG(EXTENSION_LOG_INFO, "VBucket %d created", iter->first);
             }
@@ -983,14 +1107,13 @@ void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
     }
 
     if (!success) {
-        scheduleVBSnapshot(priority, shard->getId());
+        scheduleVBSnapshot(prio, shard->getId());
     } else {
         stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
     }
 }
 
-bool EventuallyPersistentStore::persistVBState(const Priority &priority,
-                                               uint16_t vbid) {
+bool EventuallyPersistentStore::persistVBState(uint16_t vbid) {
     schedule_vbstate_persist[vbid] = false;
 
     RCPtr<VBucket> vb = getVBucket(vbid);
@@ -1011,18 +1134,22 @@ bool EventuallyPersistentStore::persistVBState(const Priority &priority,
         }
     }
 
+    const hrtime_t start = gethrtime();
+
     KVStatsCallback kvcb(this);
     uint64_t chkId = vbMap.getPersistenceCheckpointId(vbid);
     std::string failovers = vb->failovers->toJSON();
-    uint64_t snapStart = 0;
-    uint64_t snapEnd = 0;
 
-    vb->getCurrentSnapshot(snapStart, snapEnd);
+    snapshot_range_t range;
+    vb->getPersistedSnapshot(range);
     vbucket_state vb_state(vb->getState(), chkId, 0, vb->getHighSeqno(),
-                           vb->getPurgeSeqno(), snapStart, snapEnd, failovers);
+                           vb->getPurgeSeqno(), range.start, range.end,
+                           vb->getMaxCas(), vb->getDriftCounter(),
+                           failovers);
 
     KVStore *rwUnderlying = getRWUnderlying(vbid);
     if (rwUnderlying->snapshotVBucket(vbid, vb_state, &kvcb)) {
+        stats.persistVBStateHisto.add((gethrtime() - start) / 1000);
         if (vbMap.setBucketCreation(vbid, false)) {
             LOG(EXTENSION_LOG_INFO, "VBucket %d created", vbid);
         }
@@ -1058,14 +1185,23 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
         }
 
         vb->setState(to, engine.getServerApi());
+
+        if (to == vbucket_state_active && oldstate == vbucket_state_replica) {
+            /**
+             * Update snapshot range when vbucket goes from being a replica
+             * to active, to maintain the correct snapshot sequence numbers
+             * even in a failover scenario.
+             */
+            vb->checkpointManager.resetSnapshotRange();
+        }
+
         if (to == vbucket_state_active && !transfer) {
-            uint64_t snapStart = 0;
-            uint64_t snapEnd = 0;
-            vb->getCurrentSnapshot(snapStart, snapEnd);
-            if (snapEnd == vbMap.getPersistenceSeqno(vbid)) {
-                vb->failovers->createEntry(snapEnd);
+            snapshot_range_t range;
+            vb->getPersistedSnapshot(range);
+            if (range.end == vbMap.getPersistenceSeqno(vbid)) {
+                vb->failovers->createEntry(range.end);
             } else {
-                vb->failovers->createEntry(snapStart);
+                vb->failovers->createEntry(range.start);
             }
         }
 
@@ -1075,12 +1211,14 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
             ExTask notifyTask = new PendingOpsNotification(engine, vb);
             ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
         }
-        scheduleVBStatePersist(Priority::VBucketPersistLowPriority, vbid);
+        scheduleVBStatePersist(VBStatePersistTask::Priority::LOW, vbid);
     } else {
         FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
+        KVShard* shard = vbMap.getShard(vbid);
+        shared_ptr<Callback<uint16_t> > cb(new NotifyFlusherCB(shard));
         RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
                                          engine.getCheckpointConfig(),
-                                         vbMap.getShard(vbid), 0, 0, 0, ft));
+                                         shard, 0, 0, 0, ft, cb));
         // The first checkpoint for active vbucket should start with id 2.
         uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
         newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
@@ -1092,18 +1230,18 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
         vbMap.setPersistenceSeqno(vbid, 0);
         vbMap.setBucketCreation(vbid, true);
         lh.unlock();
-        scheduleVBStatePersist(Priority::VBucketPersistHighPriority, vbid);
+        scheduleVBStatePersist(VBStatePersistTask::Priority::HIGH, vbid);
     }
     return ENGINE_SUCCESS;
 }
 
-bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
+bool EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio) {
     KVShard *shard = NULL;
-    if (p == Priority::VBucketPersistHighPriority) {
+    if (prio == VBSnapshotTask::Priority::HIGH) {
         for (size_t i = 0; i < vbMap.numShards; ++i) {
             shard = vbMap.shards[i];
             if (shard->setHighPriorityVbSnapshotFlag(true)) {
-                ExTask task = new VBSnapshotTask(&engine, p, i, false);
+                ExTask task = new VBSnapshotTaskHigh(&engine, i, true);
                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
             }
         }
@@ -1111,7 +1249,7 @@ bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
         for (size_t i = 0; i < vbMap.numShards; ++i) {
             shard = vbMap.shards[i];
             if (shard->setLowPriorityVbSnapshotFlag(true)) {
-                ExTask task = new VBSnapshotTask(&engine, p, i, false);
+                ExTask task = new VBSnapshotTaskLow(&engine, i, true);
                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
             }
         }
@@ -1122,31 +1260,34 @@ bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
     return true;
 }
 
-void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p,
+void EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio,
                                                    uint16_t shardId,
                                                    bool force) {
     KVShard *shard = vbMap.shards[shardId];
-    if (p == Priority::VBucketPersistHighPriority) {
+    if (prio == VBSnapshotTask::Priority::HIGH) {
         if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
-            ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
+            ExTask task = new VBSnapshotTaskHigh(&engine, shardId, true);
             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
         }
     } else {
         if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
-            ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
+            ExTask task = new VBSnapshotTaskLow(&engine, shardId, true);
             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
         }
     }
 }
 
-void EventuallyPersistentStore::scheduleVBStatePersist(const Priority &priority,
+void EventuallyPersistentStore::scheduleVBStatePersist(VBStatePersistTask::Priority priority,
                                                        uint16_t vbid,
                                                        bool force) {
     bool inverse = false;
     if (force ||
         schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
-        ExTask task = new VBStatePersistTask(&engine, priority, vbid, false);
-        ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
+        if (priority == VBStatePersistTask::Priority::HIGH) {
+            ExecutorPool::get()->schedule(new VBStatePersistTaskHigh(&engine, vbid, true), WRITER_TASK_IDX);
+        } else {
+            ExecutorPool::get()->schedule(new VBStatePersistTaskLow(&engine, vbid, true), WRITER_TASK_IDX);
+        }
     }
 }
 
@@ -1186,8 +1327,7 @@ void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
     ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
 
     if (vbMap.setBucketDeletion(vb->getId(), true)) {
-        ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie,
-                                       Priority::VBucketDeletionPriority);
+        ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie);
         ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
     }
 }
@@ -1222,8 +1362,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
     }
 
     LockHolder lh(compactionLock);
-    ExTask task = new CompactVBucketTask(&engine, Priority::CompactorPriority,
-                                         vbid, c, cookie);
+    ExTask task = new CompactVBucketTask(&engine, vbid, c, cookie);
     compactionTasks.push_back(std::make_pair(vbid, task));
     if (compactionTasks.size() > 1) {
         if ((stats.diskQueueSize > compactionWriteQueueCap &&
@@ -1278,6 +1417,62 @@ bool EventuallyPersistentStore::compactVBucket(const uint16_t vbid,
             return true; // Schedule a compaction task again.
         }
 
+        Configuration &config = getEPEngine().getConfiguration();
+        if (config.isBfilterEnabled()) {
+            size_t initial_estimation = config.getBfilterKeyCount();
+            size_t estimated_count;
+            size_t num_deletes =
+                    getROUnderlying(vbid)->getNumPersistedDeletes(vbid);
+            if (eviction_policy == VALUE_ONLY) {
+                /**
+                 * VALUE-ONLY EVICTION POLICY
+                 * Obtain number of persisted deletes from underlying kvstore.
+                 * Bloomfilter's estimated_key_count = 1.25 * deletes
+                 */
+
+                estimated_count = round(1.25 * num_deletes);
+                ctx->bfcb = new BfilterCB(this, vbid, false);
+            } else {
+                /**
+                 * FULL EVICTION POLICY
+                 * First determine if the resident ratio of vbucket is less than
+                 * the threshold from configuration.
+                 */
+
+                bool residentRatioAlert = vb->isResidentRatioUnderThreshold(
+                                                getBfiltersResidencyThreshold(),
+                                                eviction_policy);
+                ctx->bfcb = new BfilterCB(this, vbid, residentRatioAlert);
+
+                /**
+                 * Based on resident ratio against threshold, estimate count.
+                 *
+                 * 1. If resident ratio is greater than the threshold:
+                 * Obtain number of persisted deletes from underlying kvstore.
+                 * Obtain number of non-resident-items for vbucket.
+                 * Bloomfilter's estimated_key_count =
+                 *                              1.25 * (deletes + non-resident)
+                 *
+                 * 2. Otherwise:
+                 * Obtain number of items for vbucket.
+                 * Bloomfilter's estimated_key_count =
+                 *                              1.25 * (num_items)
+                 */
+
+                if (residentRatioAlert) {
+                    estimated_count = round(1.25 *
+                                            vb->getNumItems(eviction_policy));
+                } else {
+                    estimated_count = round(1.25 * (num_deletes +
+                                vb->getNumNonResidentItems(eviction_policy)));
+                }
+            }
+            if (estimated_count < initial_estimation) {
+                estimated_count = initial_estimation;
+            }
+            vb->initTempFilter(estimated_count, config.getBfilterFpProb());
+        }
+
         if (vb->getState() == vbucket_state_active) {
             // Set the current time ONLY for active vbuckets.
             ctx->curr_time = ep_real_time();
@@ -1286,7 +1481,17 @@ bool EventuallyPersistentStore::compactVBucket(const uint16_t vbid,
         }
         ExpiredItemsCallback cb(this, vbid);
         KVStatsCallback kvcb(this);
-        getRWUnderlying(vbid)->compactVBucket(vbid, ctx, cb, kvcb);
+        if (getRWUnderlying(vbid)->compactVBucket(vbid, ctx, cb, kvcb)) {
+            if (config.isBfilterEnabled()) {
+                vb->swapFilter();
+            } else {
+                vb->clearFilter();
+            }
+        } else {
+            LOG(EXTENSION_LOG_WARNING, "Compaction: Not successful for vb %u, "
+                    "clearing bloom filter, if any.", vb->getId());
+            vb->clearFilter();
+        }
         vb->setPurgeSeqno(ctx->max_purged_seq);
     } else {
         err = ENGINE_NOT_MY_VBUCKET;
@@ -1337,14 +1542,14 @@ bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
         lh.unlock();
 
         std::list<std::string> tap_cursors = vb->checkpointManager.
-                                             getTAPCursorNames();
+                                             getCursorNames();
         // Delete and recreate the vbucket database file
         scheduleVBDeletion(vb, NULL, 0);
         setVBucketState(vbid, vbstate, false);
 
         // Copy the all cursors from the old vbucket into the new vbucket
         RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
-        newvb->checkpointManager.resetTAPCursors(tap_cursors);
+        newvb->checkpointManager.resetCursors(tap_cursors);
 
         rv = true;
     }
@@ -1414,7 +1619,6 @@ void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
 
 void EventuallyPersistentStore::completeBGFetch(const std::string &key,
                                                 uint16_t vbucket,
-                                                uint64_t rowid,
                                                 const void *cookie,
                                                 hrtime_t init,
                                                 bool isMeta) {
@@ -1427,7 +1631,7 @@ void EventuallyPersistentStore::completeBGFetch(const std::string &key,
     } else {
         ++stats.bg_fetched;
     }
-    getROUnderlying(vbucket)->get(key, rowid, vbucket, gcb);
+    getROUnderlying(vbucket)->get(key, vbucket, gcb);
     gcb.waitForValue();
     cb_assert(gcb.fired);
     ENGINE_ERROR_CODE status = gcb.val.getStatus();
@@ -1488,7 +1692,7 @@ void EventuallyPersistentStore::completeBGFetch(const std::string &key,
                         // returns, the item may have been updated and queued
                         // Hence test the CAS value to be the same first.
                         // exptime mutated, schedule it into new checkpoint
-                        queueDirty(vb, v, &hlh);
+                        queueDirty(vb, v, &hlh, NULL);
                     }
                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
                     v->setStoredValueState(
@@ -1504,8 +1708,9 @@ void EventuallyPersistentStore::completeBGFetch(const std::string &key,
                     // underlying kvstore couldn't fetch requested data
                     // log returned error and notify TMPFAIL to client
                     LOG(EXTENSION_LOG_WARNING,
-                        "Warning: failed background fetch for vb=%d seq=%d "
-                        "key=%s", vbucket, v->getBySeqno(), key.c_str());
+                        "Warning: failed background fetch for vb=%d "
+                        "seq=%" PRId64 " key=%s", vbucket, v->getBySeqno(),
+                        key.c_str());
                     status = ENGINE_TMPFAIL;
                 }
             }
@@ -1595,6 +1800,7 @@ void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
                     if (status == ENGINE_SUCCESS) {
                         v->unlocked_restoreValue(fetchedValue, vb->ht);
                         cb_assert(v->isResident());
+                        ReaderLockHolder(vb->getStateLock());
                         if (vb->getState() == vbucket_state_active &&
                             v->getExptime() != fetchedValue->getExptime() &&
                             v->getCas() == fetchedValue->getCas()) {
@@ -1603,7 +1809,7 @@ void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
                             // updated and queued
                             // Hence test the CAS value to be the same first.
                             // exptime mutated, schedule it into new checkpoint
-                            queueDirty(vb, v, &blh);
+                            queueDirty(vb, v, &blh, NULL);
                         }
                     } else if (status == ENGINE_KEY_ENOENT) {
                         v->setStoredValueState(StoredValue::state_non_existent_key);
@@ -1646,7 +1852,6 @@ void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
 
 void EventuallyPersistentStore::bgFetch(const std::string &key,
                                         uint16_t vbucket,
-                                        uint64_t rowid,
                                         const void *cookie,
                                         bool isMeta) {
     std::stringstream ss;
@@ -1670,10 +1875,8 @@ void EventuallyPersistentStore::bgFetch(const std::string &key,
         stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
                                             bgFetchQueue.load());
         ExecutorPool* iom = ExecutorPool::get();
-        ExTask task = new BGFetchTask(&engine, key, vbucket, rowid, cookie,
-                                      isMeta,
-                                      Priority::BgFetcherGetMetaPriority,
-                                      bgFetchDelay, false);
+        ExTask task = new SingleBGFetcherTask(&engine, key, vbucket, cookie,
+                                              isMeta, bgFetchDelay, false);
         iom->schedule(task, READER_TASK_IDX);
         ss << "Queued a background fetch, now at " << bgFetchQueue.load()
            << std::endl;
@@ -1687,7 +1890,9 @@ GetValue EventuallyPersistentStore::getInternal(const std::string &key,
                                                 bool queueBG,
                                                 bool honorStates,
                                                 vbucket_state_t allowedState,
-                                                bool trackReference) {
+                                                bool trackReference,
+                                                bool deleteTempItem,
+                                                bool hideLockedCAS) {
 
     vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
         vbucket_state_replica : vbucket_state_active;
@@ -1715,34 +1920,55 @@ GetValue EventuallyPersistentStore::getInternal(const std::string &key,
     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
                                      trackReference);
     if (v) {
-        if (v->isDeleted() || v->isTempDeletedItem() ||
-            v->isTempNonExistentItem()) {
+        if (v->isDeleted()) {
             GetValue rv;
             return rv;
         }
+        if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
+            // Delete a temp non-existent item to ensure that
+            // if the get were issued over an item that doesn't
+            // exist, then we dont preserve a temp item.
+            if (deleteTempItem) {
+                vb->ht.unlocked_del(key, bucket_num);
+            }
+            GetValue rv;
+            return rv;
+        }
+
         // If the value is not resident, wait for it...
         if (!v->isResident()) {
             if (queueBG) {
-                bgFetch(key, vbucket, v->getBySeqno(), cookie);
+                bgFetch(key, vbucket, cookie);
             }
             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
                             true, v->getNRUValue());
         }
 
-        GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
-                    ENGINE_SUCCESS, v->getBySeqno(), false, v->getNRUValue());
+        // Should we hide (return -1) for the items' CAS?
+        const bool hide_cas = hideLockedCAS &&
+                              v->isLocked(ep_current_time());
+        GetValue rv(v->toItem(hide_cas, vbucket), ENGINE_SUCCESS,
+                    v->getBySeqno(), false, v->getNRUValue());
         return rv;
     } else {
         if (eviction_policy == VALUE_ONLY || diskFlushAll) {
             GetValue rv;
             return rv;
         }
-        ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
-        if (queueBG) { // Full eviction and need a bg fetch.
-            ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
-                                       cookie, false);
+
+        if (vb->maybeKeyExistsInFilter(key)) {
+            ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
+            if (queueBG) { // Full eviction and need a bg fetch.
+                ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
+                                           cookie, false);
+            }
+            return GetValue(NULL, ec, -1, true);
+        } else {
+            // As bloomfilter predicted that item surely doesn't exist
+            // on disk, return ENONET, for getInternal().
+            GetValue rv;
+            return rv;
         }
-        return GetValue(NULL, ec, -1, true);
     }
 }
 
@@ -1791,6 +2017,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
                                                         const void *cookie,
                                                         ItemMetaData &metadata,
                                                         uint32_t &deleted,
+                                                        uint8_t &confResMode,
                                                         bool trackReferenced)
 {
     (void) cookie;
@@ -1817,7 +2044,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
         stats.numOpsGetMeta++;
 
         if (v->isTempInitialItem()) { // Need bg meta fetch.
-            bgFetch(key, vbucket, -1, cookie, true);
+            bgFetch(key, vbucket, cookie, true);
             return ENGINE_EWOULDBLOCK;
         } else if (v->isTempNonExistentItem()) {
             metadata.cas = v->getCas();
@@ -1836,6 +2063,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
             metadata.flags = v->getFlags();
             metadata.exptime = v->getExptime();
             metadata.revSeqno = v->getRevSeqno();
+            confResMode = v->getConflictResMode();
             return ENGINE_SUCCESS;
         }
     } else {
@@ -1844,18 +2072,29 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
         // So, add a temporary item corresponding to the key to the hash table
         // and schedule a background fetch for its metadata from the persistent
         // store. The item's state will be updated after the fetch completes.
-        return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
+        //
+        // Schedule this bgFetch only if the key is predicted to be may-be
+        // existent on disk by the bloomfilter.
+
+        if (vb->maybeKeyExistsInFilter(key)) {
+            return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
+        } else {
+            return ENGINE_KEY_ENOENT;
+        }
     }
 }
 
-ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
-                                                         uint64_t cas,
-                                                         const void *cookie,
-                                                         bool force,
-                                                         bool allowExisting,
-                                                         uint8_t nru,
-                                                         bool genBySeqno,
-                                                         bool isReplication)
+ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
+                                                     const Item &itm,
+                                                     uint64_t cas,
+                                                     uint64_t *seqno,
+                                                     const void *cookie,
+                                                     bool force,
+                                                     bool allowExisting,
+                                                     uint8_t nru,
+                                                     bool genBySeqno,
+                                                     ExtendedMetaData *emd,
+                                                     bool isReplication)
 {
     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
     if (!vb) {
@@ -1876,24 +2115,49 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
         }
     }
 
+    //check for the incoming item's CAS validity
+    if (!Item::isValidCas(itm.getCas())) {
+        return ENGINE_KEY_EEXISTS;
+    }
+
     int bucket_num(0);
     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
                                           false);
 
+    bool maybeKeyExists = true;
     if (!force) {
         if (v)  {
             if (v->isTempInitialItem()) {
-                bgFetch(itm.getKey(), itm.getVBucketId(), -1, cookie, true);
+                bgFetch(itm.getKey(), itm.getVBucketId(), cookie, true);
                 return ENGINE_EWOULDBLOCK;
             }
-            if (!conflictResolver->resolve(v, itm.getMetaData(), false)) {
+
+            enum conflict_resolution_mode confResMode = revision_seqno;
+            if (emd) {
+                confResMode = static_cast<enum conflict_resolution_mode>(
+                                                       emd->getConflictResMode());
+            }
+
+            if (!conflictResolver->resolve(vb, v, itm.getMetaData(), false,
+                                           confResMode)) {
                 ++stats.numOpsSetMetaResolutionFailed;
                 return ENGINE_KEY_EEXISTS;
             }
         } else {
-            return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
-                                         cookie, true, isReplication);
+            if (vb->maybeKeyExistsInFilter(itm.getKey())) {
+                return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
+                                             cookie, true, isReplication);
+            } else {
+                maybeKeyExists = false;
+            }
+        }
+    } else {
+        if (eviction_policy == FULL_EVICTION) {
+            // Check Bloomfilter's prediction
+            if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
+                maybeKeyExists = false;
+            }
         }
     }
 
@@ -1902,9 +2166,10 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
          vb->getState() == vbucket_state_pending)) {
         v->unlock();
     }
+
     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
                                                 true, eviction_policy, nru,
-                                                isReplication);
+                                                maybeKeyExists, isReplication);
 
     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
     switch (mtype) {
@@ -1920,7 +2185,16 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
         break;
     case WAS_DIRTY:
     case WAS_CLEAN:
-        queueDirty(vb, v, &lh, false, true, genBySeqno);
+        /* set the conflict resolution mode from the extended meta data *
+         * Given that the mode is already set, we don't need to set the *
+         * conflict resolution mode in queueDirty */
+        if (emd) {
+            v->setConflictResMode(
+                      static_cast<enum conflict_resolution_mode>(
+                                            emd->getConflictResMode()));
+        }
+        vb->setMaxCas(v->getCas());
+        queueDirty(vb, v, &lh, seqno, false, true, genBySeqno, false);
         break;
     case NOT_FOUND:
         ret = ENGINE_KEY_ENOENT;
@@ -1929,14 +2203,20 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
         {            // CAS operation with non-resident item + full eviction.
             if (v) { // temp item is already created. Simply schedule a
                 lh.unlock(); // bg fetch job.
-                bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
+                bgFetch(itm.getKey(), vb->getId(), cookie, true);
                 return ENGINE_EWOULDBLOCK;
             }
+
             ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
                                         cookie, true, isReplication);
         }
     }
 
+    // Update drift counter for vbucket upon a success only
+    if (ret == ENGINE_SUCCESS && emd) {
+        vb->setDriftCounter(emd->getAdjustedTime());
+    }
+
     return ret;
 }
 
@@ -1976,7 +2256,7 @@ GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
         }
 
         if (!v->isResident()) {
-            bgFetch(key, vbucket, v->getBySeqno(), cookie);
+            bgFetch(key, vbucket, cookie);
             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
         }
         if (v->isLocked(ep_current_time())) {
@@ -1997,7 +2277,7 @@ GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
             if (vb->getState() == vbucket_state_active) {
                 // persist the item in the underlying storage for
                 // mutated exptime but only if VB is active.
-                queueDirty(vb, v, &lh);
+                queueDirty(vb, v, &lh, NULL);
             }
         }
         return rv;
@@ -2006,9 +2286,17 @@ GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
             GetValue rv;
             return rv;
         } else {
-            ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
-                                                         vb, cookie, false);
-            return GetValue(NULL, ec, -1, true);
+            if (vb->maybeKeyExistsInFilter(key)) {
+                ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
+                                                             key, vb, cookie,
+                                                             false);
+                return GetValue(NULL, ec, -1, true);
+            } else {
+                // As bloomfilter predicted that item surely doesn't exist
+                // on disk, return ENOENT for getAndUpdateTtl().
+                GetValue rv;
+                return rv;
+            }
         }
     }
 }
@@ -2036,7 +2324,6 @@ EventuallyPersistentStore::statsVKey(const std::string &key,
         ExecutorPool* iom = ExecutorPool::get();
         ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
                                            v->getBySeqno(), cookie,
-                                           Priority::VKeyStatBgFetcherPriority,
                                            bgFetchDelay, false);
         iom->schedule(task, READER_TASK_IDX);
         return ENGINE_EWOULDBLOCK;
@@ -2062,7 +2349,6 @@ EventuallyPersistentStore::statsVKey(const std::string &key,
                     ExecutorPool* iom = ExecutorPool::get();
                     ExTask task = new VKeyStatBGFetchTask(&engine, key,
                                                           vbucket, -1, cookie,
-                                           Priority::VKeyStatBgFetcherPriority,
                                                           bgFetchDelay, false);
                     iom->schedule(task, READER_TASK_IDX);
                 }
@@ -2078,7 +2364,7 @@ void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
                                                   uint64_t bySeqNum) {
     RememberingCallback<GetValue> gcb;
 
-    getROUnderlying(vbid)->get(key, bySeqNum, vbid, gcb);
+    getROUnderlying(vbid)->get(key, vbid, gcb);
     gcb.waitForValue();
     cb_assert(gcb.fired);
 
@@ -2099,8 +2385,9 @@ void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
                     // underlying kvstore couldn't fetch requested data
                     // log returned error and notify TMPFAIL to client
                     LOG(EXTENSION_LOG_WARNING,
-                        "Warning: failed background fetch for vb=%d seq=%d "
-                        "key=%s", vbid, v->getBySeqno(), key.c_str());
+                        "Warning: failed background fetch for vb=%d "
+                        "seq=%" PRId64 " key=%s", vbid, v->getBySeqno(),
+                        key.c_str());
                 }
             }
         }
@@ -2152,7 +2439,7 @@ bool EventuallyPersistentStore::getLocked(const std::string &key,
         // If the value is not resident, wait for it...
         if (!v->isResident()) {
             if (cookie) {
-                bgFetch(key, vbucket, v->getBySeqno(), cookie);
+                bgFetch(key, vbucket, cookie);
             }
             GetValue rv(NULL, ENGINE_EWOULDBLOCK, -1, true);
             cb.callback(rv);
@@ -2163,7 +2450,7 @@ bool EventuallyPersistentStore::getLocked(const std::string &key,
         v->lock(currentTime + lockTimeout);
 
         Item *it = v->toItem(false, vbucket);
-        it->setCas();
+        it->setCas(vb->nextHLCCas());
         v->setCas(it->getCas());
 
         GetValue rv(it);
@@ -2175,11 +2462,20 @@ bool EventuallyPersistentStore::getLocked(const std::string &key,
             cb.callback(rv);
             return true;
         } else {
-            ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
-                                                         vb, cookie, false);
-            GetValue rv(NULL, ec, -1, true);
-            cb.callback(rv);
-            return false;
+            if (vb->maybeKeyExistsInFilter(key)) {
+                ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
+                                                             key, vb, cookie,
+                                                             false);
+                GetValue rv(NULL, ec, -1, true);
+                cb.callback(rv);
+                return false;
+            } else {
+                // As bloomfilter predicted that item surely doesn't exist
+                // on disk, return ENOENT for getLocked().
+                GetValue rv;
+                cb.callback(rv);
+                return true;
+            }
         }
     }
 }
@@ -2254,7 +2550,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
         if (eviction_policy == FULL_EVICTION &&
             v->isTempInitialItem() && bgfetch) {
             lh.unlock();
-            bgFetch(key, vbucket, -1, cookie, true);
+            bgFetch(key, vbucket, cookie, true);
             return ENGINE_EWOULDBLOCK;
         }
         kstats.logically_deleted = v->isDeleted();
@@ -2268,10 +2564,13 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
         if (eviction_policy == VALUE_ONLY) {
             return ENGINE_KEY_ENOENT;
         } else {
-            if (bgfetch) {
+            if (bgfetch && vb->maybeKeyExistsInFilter(key)) {
                 return addTempItemForBgFetch(lh, bucket_num, key, vb,
                                              cookie, true);
             } else {
+                // If bgFetch were false, or bloomfilter predicted that
+                // item surely doesn't exist on disk, return ENOENT for
+                // getKeyStats().
                 return ENGINE_KEY_ENOENT;
             }
         }
@@ -2309,11 +2608,12 @@ std::string EventuallyPersistentStore::validateKey(const std::string &key,
 }
 
 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
-                                                        uint64_tcas,
+                                                        uint64_t *cas,
                                                         uint16_t vbucket,
                                                         const void *cookie,
                                                         bool force,
                                                         ItemMetaData *itemMeta,
+                                                        mutation_descr_t *mutInfo,
                                                         bool tapBackfill)
 {
     RCPtr<VBucket> vb = getVBucket(vbucket);
@@ -2338,30 +2638,54 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
         } else { // Full eviction.
             if (!force) {
                 if (!v) { // Item might be evicted from cache.
-                    return addTempItemForBgFetch(lh, bucket_num, key, vb,
-                                                 cookie, true);
+                    if (vb->maybeKeyExistsInFilter(key)) {
+                        return addTempItemForBgFetch(lh, bucket_num, key, vb,
+                                                     cookie, true);
+                    } else {
+                        // As bloomfilter predicted that item surely doesn't
+                        // exist on disk, return ENOENT for deleteItem().
+                        return ENGINE_KEY_ENOENT;
+                    }
                 } else if (v->isTempInitialItem()) {
                     lh.unlock();
-                    bgFetch(key, vbucket, -1, cookie, true);
+                    bgFetch(key, vbucket, cookie, true);
                     return ENGINE_EWOULDBLOCK;
                 } else { // Non-existent or deleted key.
+                    if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
+                        // Delete a temp non-existent item to ensure that
+                        // if a delete were issued over an item that doesn't
+                        // exist, then we don't preserve a temp item.
+                        vb->ht.unlocked_del(key, bucket_num);
+                    }
                     return ENGINE_KEY_ENOENT;
                 }
             } else {
                 if (!v) { // Item might be evicted from cache.
                     // Create a temp item and delete it below as it is a
-                    // force deletion.
-                    add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num,
-                                                              key,
-                                                              eviction_policy);
-                    if (rv == ADD_NOMEM) {
-                        return ENGINE_ENOMEM;
+                    // force deletion, only if bloomfilter predicts that
+                    // item may exist on disk.
+                    if (vb->maybeKeyExistsInFilter(key)) {
+                        add_type_t rv = vb->ht.unlocked_addTempItem(
+                                                               bucket_num,
+                                                               key,
+                                                               eviction_policy);
+                        if (rv == ADD_NOMEM) {
+                            return ENGINE_ENOMEM;
+                        }
+                        v = vb->ht.unlocked_find(key, bucket_num, true, false);
+                        v->setStoredValueState(StoredValue::state_deleted_key);
+                    } else {
+                        return ENGINE_KEY_ENOENT;
                     }
-                    v = vb->ht.unlocked_find(key, bucket_num, true, false);
-                    v->setStoredValueState(StoredValue::state_deleted_key);
                 } else if (v->isTempInitialItem()) {
                     v->setStoredValueState(StoredValue::state_deleted_key);
                 } else { // Non-existent or deleted key.
+                    if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
+                        // Delete a temp non-existent item to ensure that
+                        // if a delete were issued over an item that doesn't
+                        // exist, then we don't preserve a temp item.
+                        vb->ht.unlocked_del(key, bucket_num);
+                    }
                     return ENGINE_KEY_ENOENT;
                 }
             }
@@ -2384,6 +2708,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
     }
     *cas = v ? v->getCas() : 0;
 
+    uint64_t seqno = 0;
     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
     switch (delrv) {
     case NOMEM:
@@ -2401,12 +2726,14 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
     case NOT_FOUND:
         ret = ENGINE_KEY_ENOENT;
         if (v) {
-            queueDirty(vb, v, &lh, tapBackfill);
+            queueDirty(vb, v, &lh, NULL, tapBackfill);
         }
         break;
     case WAS_DIRTY:
     case WAS_CLEAN:
-        queueDirty(vb, v, &lh, tapBackfill);
+        queueDirty(vb, v, &lh, &seqno, tapBackfill);
+        mutInfo->seqno = seqno;
+        mutInfo->vbucket_uuid = vb->failovers->getLatestUUID();
         break;
     case NEED_BG_FETCH:
         // We already figured out if a bg fetch is requred for a full-evicted
@@ -2417,16 +2744,18 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
 }
 
 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
-                                                        const std::string &key,
-                                                        uint64_t* cas,
-                                                        uint16_t vbucket,
-                                                        const void *cookie,
-                                                        bool force,
-                                                        ItemMetaData *itemMeta,
-                                                        bool tapBackfill,
-                                                        bool genBySeqno,
-                                                        uint64_t bySeqno,
-                                                        bool isReplication)
+                                                     const std::string &key,
+                                                     uint64_t *cas,
+                                                     uint64_t *seqno,
+                                                     uint16_t vbucket,
+                                                     const void *cookie,
+                                                     bool force,
+                                                     ItemMetaData *itemMeta,
+                                                     bool tapBackfill,
+                                                     bool genBySeqno,
+                                                     uint64_t bySeqno,
+                                                     ExtendedMetaData *emd,
+                                                     bool isReplication)
 {
     RCPtr<VBucket> vb = getVBucket(vbucket);
     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
@@ -2441,28 +2770,53 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
         }
     }
 
+    //check for the incoming item's CAS validity
+    if (!Item::isValidCas(itemMeta->cas)) {
+        return ENGINE_KEY_EEXISTS;
+    }
+
     int bucket_num(0);
     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
     if (!force) { // Need conflict resolution.
         if (v)  {
             if (v->isTempInitialItem()) {
-                bgFetch(key, vbucket, -1, cookie, true);
+                bgFetch(key, vbucket, cookie, true);
                 return ENGINE_EWOULDBLOCK;
             }
-            if (!conflictResolver->resolve(v, *itemMeta, true)) {
+
+            enum conflict_resolution_mode confResMode = revision_seqno;
+            if (emd) {
+                confResMode = static_cast<enum conflict_resolution_mode>(
+                                                       emd->getConflictResMode());
+            }
+
+            if (!conflictResolver->resolve(vb, v, *itemMeta, true, confResMode)) {
                 ++stats.numOpsDelMetaResolutionFailed;
                 return ENGINE_KEY_EEXISTS;
             }
         } else {
             // Item is 1) deleted or not existent in the value eviction case OR
             // 2) deleted or evicted in the full eviction.
-            return addTempItemForBgFetch(lh, bucket_num, key, vb,
-                                         cookie, true, isReplication);
+            if (vb->maybeKeyExistsInFilter(key)) {
+                return addTempItemForBgFetch(lh, bucket_num, key, vb,
+                                             cookie, true, isReplication);
+            } else {
+                // Even though bloomfilter predicted that item doesn't exist
+                // on disk, we must put this delete on disk if the cas is valid.
+                add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
+                                                            eviction_policy,
+                                                            isReplication);
+                if (rv == ADD_NOMEM) {
+                    return ENGINE_ENOMEM;
+                }
+                v = vb->ht.unlocked_find(key, bucket_num, true, false);
+                v->setStoredValueState(StoredValue::state_deleted_key);
+            }
         }
     } else {
         if (!v) {
-            // Create a temp item and delete it below as it is a force deletion
+            // We should always try to persist a delete here.
             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
                                                         eviction_policy,
                                                         isReplication);
@@ -2471,8 +2825,10 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
             }
             v = vb->ht.unlocked_find(key, bucket_num, true, false);
             v->setStoredValueState(StoredValue::state_deleted_key);
+            v->setCas(*cas);
         } else if (v->isTempInitialItem()) {
             v->setStoredValueState(StoredValue::state_deleted_key);
+            v->setCas(*cas);
         }
     }
 
@@ -2508,14 +2864,29 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
         if (!genBySeqno) {
             v->setBySeqno(bySeqno);
         }
-        queueDirty(vb, v, &lh, tapBackfill, true, genBySeqno);
+
+        /* set the conflict resolution mode from the extended meta data *
+         * Given that the mode is already set, we don't need to set the *
+         * conflict resolution mode in queueDirty */
+        if (emd) {
+            v->setConflictResMode(
+               static_cast<enum conflict_resolution_mode>(
+                                         emd->getConflictResMode()));
+        }
+        vb->setMaxCas(v->getCas());
+        queueDirty(vb, v, &lh, seqno, tapBackfill, true, genBySeqno, false);
         break;
     case NEED_BG_FETCH:
         lh.unlock();
-        bgFetch(key, vbucket, -1, cookie, true);
+        bgFetch(key, vbucket, cookie, true);
         ret = ENGINE_EWOULDBLOCK;
     }
 
+    // Update drift counter for vbucket upon a success only
+    if (ret == ENGINE_SUCCESS && emd) {
+        vb->setDriftCounter(emd->getAdjustedTime());
+    }
+
     return ret;
 }
 
@@ -2529,16 +2900,15 @@ void EventuallyPersistentStore::reset() {
             vb->ht.clear();
             vb->checkpointManager.clear(vb->getState());
             vb->resetStats();
-            vb->setCurrentSnapshot(0, 0);
+            vb->setPersistedSnapshot(0, 0);
         }
     }
 
-    bool inverse = false;
-    if (diskFlushAll.compare_exchange_strong(inverse, true)) {
-        ++stats.diskQueueSize;
-        // wake up (notify) one flusher is good enough for diskFlushAll
-        vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
-    }
+    ++stats.diskQueueSize;
+    bool inverse = true;
+    flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, false);
+    // Waking up (notifying) one flusher is good enough for diskFlushAll
+    vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
 }
 
 /**
@@ -2580,7 +2950,7 @@ public:
                         ++vbucket->opsCreate;
                         vbucket->incrMetaDataDisk(*queuedItem);
                     } else { // Update in full eviction mode.
-                        --vbucket->ht.numTotalItems;
+                        vbucket->ht.decrNumTotalItems();
                         ++vbucket->opsUpdate;
                     }
                     v->setNewCacheItem(false);
@@ -2659,10 +3029,16 @@ public:
                     // exists on DB file, but not in memory (i.e., full eviction),
                     // because we created the temp item in memory and incremented
                     // the item counter when a deletion is pushed in the queue.
-                    --vbucket->ht.numTotalItems;
+                    vbucket->ht.decrNumTotalItems();
                 }
             }
 
+            /**
+             * Deleted items are to be added to the bloomfilter,
+             * in either eviction policy.
+             */
+            vbucket->addToFilter(queuedItem->getKey());
+
             if (value > 0) {
                 ++stats->totalPersisted;
                 ++vbucket->opsDelete;
@@ -2703,6 +3079,32 @@ private:
     DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
 };
 
+bool EventuallyPersistentStore::scheduleFlushAllTask(const void* cookie,
+                                                     time_t when) {
+    bool inverse = false;
+    if (diskFlushAll.compare_exchange_strong(inverse, true)) {
+        flushAllTaskCtx.cookie = cookie;
+        flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
+        ExTask task = new FlushAllTask(&engine, static_cast<double>(when));
+        ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+void EventuallyPersistentStore::setFlushAllComplete() {
+    // Notify memcached about flushAll task completion, and
+    // set diskFlushall flag to false
+    if (flushAllTaskCtx.cookie) {
+        engine.notifyIOComplete(flushAllTaskCtx.cookie, ENGINE_SUCCESS);
+    }
+    bool inverse = false;
+    flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
+    inverse = true;
+    diskFlushAll.compare_exchange_strong(inverse, false);
+}
+
 void EventuallyPersistentStore::flushOneDeleteAll() {
     for (size_t i = 0; i < vbMap.getSize(); ++i) {
         RCPtr<VBucket> vb = getVBucket(i);
@@ -2712,14 +3114,13 @@ void EventuallyPersistentStore::flushOneDeleteAll() {
         }
     }
 
-    bool inverse = true;
-    diskFlushAll.compare_exchange_strong(inverse, false);
     stats.decrDiskQueueSize(1);
+    setFlushAllComplete();
 }
 
 int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
     KVShard *shard = vbMap.getShard(vbid);
-    if (diskFlushAll) {
+    if (diskFlushAll && !flushAllTaskCtx.delayFlushAll) {
         if (shard->getId() == EP_PRIMARY_SHARD) {
             flushOneDeleteAll();
         } else {
@@ -2751,14 +3152,11 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
             vb->rejectQueue.pop();
         }
 
-        LockHolder slh = vb->getSnapshotLock();
-        uint64_t snapStart;
-        uint64_t snapEnd;
-        vb->getCurrentSnapshot_UNLOCKED(snapStart, snapEnd);
-
+        const std::string cursor(CheckpointManager::pCursorName);
         vb->getBackfillItems(items);
-        vb->checkpointManager.getAllItemsForPersistence(items);
-        slh.unlock();
+
+        snapshot_range_t range;
+        range = vb->checkpointManager.getAllItemsForCursor(cursor, items);
 
         if (!items.empty()) {
             while (!rwUnderlying->begin()) {
@@ -2771,6 +3169,7 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
 
             Item *prev = NULL;
             uint64_t maxSeqno = 0;
+            uint64_t maxCas = 0;
             std::list<PersistenceCallback*> pcbs;
             std::vector<queued_item>::iterator it = items.begin();
             for(; it != items.end(); ++it) {
@@ -2784,7 +3183,9 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
                     if (cb) {
                         pcbs.push_back(cb);
                     }
+
                     maxSeqno = std::max(maxSeqno, (uint64_t)(*it)->getBySeqno());
+                    maxCas = std::max(maxCas, (uint64_t)(*it)->getCas());
                     ++stats.flusher_todo;
                 } else {
                     stats.decrDiskQueueSize(1);
@@ -2797,11 +3198,12 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
             hrtime_t start = gethrtime();
 
             if (vb->getState() == vbucket_state_active) {
-                snapStart = maxSeqno;
-                snapEnd = maxSeqno;
+                range.start = maxSeqno;
+                range.end = maxSeqno;
             }
 
-            while (!rwUnderlying->commit(&cb, snapStart, snapEnd)) {
+            while (!rwUnderlying->commit(&cb, range.start, range.end, maxCas,
+                                         vb->getDriftCounter())) {
                 ++stats.commitFailed;
                 LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
                     "1 sec...\n");
@@ -2810,10 +3212,12 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
             }
 
             if (vb->rejectQueue.empty()) {
+                vb->setPersistedSnapshot(range.start, range.end);
                 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
                 if (highSeqno > 0 &&
                     highSeqno != vbMap.getPersistenceSeqno(vbid)) {
                     vbMap.setPersistenceSeqno(vbid, highSeqno);
+                    vb->notifySeqnoPersisted(highSeqno);
                 }
             }
 
@@ -2852,6 +3256,8 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
             if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
                 vbMap.setPersistenceCheckpointId(vbid, chkid);
             }
+        } else {
+            return RETRY_FLUSH_VBUCKET;
         }
     }
 
@@ -2911,24 +3317,26 @@ EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
 void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
                                            StoredValue* v,
                                            LockHolder *plh,
+                                           uint64_t *seqno,
                                            bool tapBackfill,
                                            bool notifyReplicator,
-                                           bool genBySeqno) {
+                                           bool genBySeqno,
+                                           bool setConflictMode) {
     if (vb) {
+        if (setConflictMode && (v->getConflictResMode() != last_write_wins) &&
+            vb->isTimeSyncEnabled()) {
+            v->setConflictResMode(last_write_wins);
+        }
+
         queued_item qi(v->toItem(false, vb->getId()));
-        bool rv;
-        if (genBySeqno) {
-            LockHolder slh = vb->getSnapshotLock();
-            rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
-                               vb->checkpointManager.queueDirty(vb, qi,
-                                                                genBySeqno);
-            v->setBySeqno(qi->getBySeqno());
-            vb->setCurrentSnapshot_UNLOCKED(qi->getBySeqno(), qi->getBySeqno());
-        } else {
-            rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
-                               vb->checkpointManager.queueDirty(vb, qi,
-                                                                genBySeqno);
-            v->setBySeqno(qi->getBySeqno());
+
+        bool rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
+                                vb->checkpointManager.queueDirty(vb, qi,
+                                                                 genBySeqno);
+        v->setBySeqno(qi->getBySeqno());
+
+        if (seqno) {
+            *seqno = v->getBySeqno();
         }
 
         if (plh) {
@@ -2955,7 +3363,7 @@ std::vector<vbucket_state *> EventuallyPersistentStore::loadVBucketState()
 
 void EventuallyPersistentStore::warmupCompleted() {
     // Run the vbucket state snapshot job once after the warmup
-    scheduleVBSnapshot(Priority::VBucketPersistHighPriority);
+    scheduleVBSnapshot(VBSnapshotTask::Priority::HIGH);
 
     if (engine.getConfiguration().getAlogPath().length() > 0) {
 
@@ -2985,7 +3393,7 @@ void EventuallyPersistentStore::warmupCompleted() {
     // right after warmup. Subsequent snapshot tasks will be scheduled every
     // 60 sec by default.
     ExecutorPool *iom = ExecutorPool::get();
-    ExTask task = new StatSnap(&engine, Priority::StatSnapPriority, 0, false);
+    ExTask task = new StatSnap(&engine, 0, false);
     statsSnapshotTaskId = iom->schedule(task, WRITER_TASK_IDX);
 }
 
@@ -3080,7 +3488,6 @@ void EventuallyPersistentStore::enableAccessScannerTask() {
         accessScanner.sleeptime = alogSleepTime * 60;
         if (accessScanner.sleeptime != 0) {
             ExTask task = new AccessScanner(*this, stats,
-                                            Priority::AccessScannerPriority,
                                             accessScanner.sleeptime);
             accessScanner.task = ExecutorPool::get()->schedule(task,
                                                                AUXIO_TASK_IDX);
@@ -3120,7 +3527,6 @@ void EventuallyPersistentStore::setAccessScannerSleeptime(size_t val) {
         accessScanner.sleeptime = val * 60;
         if (accessScanner.sleeptime != 0) {
             ExTask task = new AccessScanner(*this, stats,
-                                            Priority::AccessScannerPriority,
                                             accessScanner.sleeptime);
             accessScanner.task = ExecutorPool::get()->schedule(task,
                                                                AUXIO_TASK_IDX);
@@ -3141,7 +3547,6 @@ void EventuallyPersistentStore::resetAccessScannerStartTime() {
             ExecutorPool::get()->cancel(accessScanner.task);
             // re-schedule task according to the new task start hour
             ExTask task = new AccessScanner(*this, stats,
-                                            Priority::AccessScannerPriority,
                                             accessScanner.sleeptime);
             accessScanner.task = ExecutorPool::get()->schedule(task,
                                                                AUXIO_TASK_IDX);
@@ -3154,6 +3559,22 @@ void EventuallyPersistentStore::resetAccessScannerStartTime() {
     }
 }
 
+void EventuallyPersistentStore::setAllBloomFilters(bool to) {
+    size_t maxSize = vbMap.getSize();
+    cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
+    for (size_t i = 0; i < maxSize; i++) {
+        uint16_t vbid = static_cast<uint16_t>(i);
+        RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+        if (vb) {
+            if (to) {
+                vb->setFilterStatus(BFILTER_ENABLED);
+            } else {
+                vb->setFilterStatus(BFILTER_DISABLED);
+            }
+        }
+    }
+}
+
 void EventuallyPersistentStore::visit(VBucketVisitor &visitor)
 {
     size_t maxSize = vbMap.getSize();
@@ -3172,10 +3593,42 @@ void EventuallyPersistentStore::visit(VBucketVisitor &visitor)
     visitor.complete();
 }
 
-VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s,
+EventuallyPersistentStore::Position
+EventuallyPersistentStore::pauseResumeVisit(PauseResumeEPStoreVisitor& visitor,
+                                            Position& start_pos)
+{
+    const size_t maxSize = vbMap.getSize();
+
+    uint16_t vbid = start_pos.vbucket_id;
+    for (; vbid < maxSize; ++vbid) {
+        RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+        if (vb) {
+            bool paused = !visitor.visit(vbid, vb->ht);
+            if (paused) {
+                break;
+            }
+        }
+    }
+
+    return EventuallyPersistentStore::Position(vbid);
+}
+
+EventuallyPersistentStore::Position
+EventuallyPersistentStore::startPosition() const
+{
+    return EventuallyPersistentStore::Position(0);
+}
+
+EventuallyPersistentStore::Position
+EventuallyPersistentStore::endPosition() const
+{
+    return EventuallyPersistentStore::Position(vbMap.getSize());
+}
+
+VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s, TaskId id,
                          shared_ptr<VBucketVisitor> v,
-                         const char *l, const Priority &p, double sleep) :
-    GlobalTask(&s->getEPEngine(), p, 0, false), store(s),
+                         const char *l, double sleep) :
+    GlobalTask(&s->getEPEngine(), id, 0, false), store(s),
     visitor(v), label(l), sleepTime(sleep), currentvb(0)
 {
     const VBucketFilter &vbFilter = visitor->getVBucketFilter();
@@ -3216,12 +3669,10 @@ bool VBCBAdaptor::run(void) {
 VBucketVisitorTask::VBucketVisitorTask(EventuallyPersistentStore *s,
                                        shared_ptr<VBucketVisitor> v,
                                        uint16_t sh, const char *l,
-                                       double sleep, bool shutdown):
-    GlobalTask(&(s->getEPEngine()), Priority::AccessScannerPriority,
-               0, shutdown),
-    store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
-    shardID(sh)
-{
+                                       double sleep, bool shutdown)
+    : GlobalTask(&(s->getEPEngine()), TaskId::VBucketVisitorTask, 0, shutdown),
+      store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
+      shardID(sh) {
     const VBucketFilter &vbFilter = visitor->getVBucketFilter();
     std::vector<int> vbs = store->vbMap.getShard(shardID)->getVBuckets();
     cb_assert(vbs.size() <= std::numeric_limits<uint16_t>::max());
@@ -3266,7 +3717,7 @@ void EventuallyPersistentStore::resetUnderlyingStats(void)
         shard->getROUnderlying()->resetStats();
     }
 
-    for (size_t i = 0; i < MAX_TYPE_ID; i++) {
+    for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
         stats.schedulingHisto[i].reset();
         stats.taskRuntimeHisto[i].reset();
     }
@@ -3326,9 +3777,8 @@ EventuallyPersistentStore::rollback(uint16_t vbid,
         if (result.success) {
             RCPtr<VBucket> vb = vbMap.getBucket(vbid);
             vb->failovers->pruneEntries(result.highSeqno);
-            vb->checkpointManager.clear(vb->getState());
-            vb->checkpointManager.setBySeqno(result.highSeqno);
-            vb->setCurrentSnapshot(result.snapStartSeqno, result.snapEndSeqno);
+            vb->checkpointManager.clear(vb, result.highSeqno);
+            vb->setPersistedSnapshot(result.snapStartSeqno, result.snapEndSeqno);
             return ENGINE_SUCCESS;
         }
     }
@@ -3338,3 +3788,13 @@ EventuallyPersistentStore::rollback(uint16_t vbid,
     }
     return ENGINE_NOT_MY_VBUCKET;
 }
+
+void EventuallyPersistentStore::runDefragmenterTask() {
+    defragmenterTask->run();
+}
+
+std::ostream& operator<<(std::ostream& os,
+                         const EventuallyPersistentStore::Position& pos) {
+    os << "vbucket:" << pos.vbucket_id;
+    return os;
+}