Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / ep_engine.cc
index a26236d..146a64a 100644 (file)
@@ -49,8 +49,6 @@
 #include "dcp-producer.h"
 #include "warmup.h"
 
-#include <JSON_checker.h>
-
 static ALLOCATOR_HOOKS_API *hooksApi;
 static SERVER_LOG_API *loggerApi;
 
@@ -175,11 +173,12 @@ extern "C" {
                                            const void* key,
                                            const size_t nkey,
                                            uint64_t* cas,
-                                           uint16_t vbucket)
+                                           uint16_t vbucket,
+                                           mutation_descr_t *mut_info)
     {
         ENGINE_ERROR_CODE err_code = getHandle(handle)->itemDelete(cookie, key,
                                                                    nkey, cas,
-                                                                   vbucket);
+                                                                   vbucket, mut_info);
         releaseHandle(handle);
         return err_code;
     }
@@ -242,7 +241,7 @@ extern "C" {
                                            const uint64_t delta,
                                            const uint64_t initial,
                                            const rel_time_t exptime,
-                                           uint64_t *cas,
+                                           item **itm,
                                            uint8_t datatype,
                                            uint64_t *result,
                                            uint16_t vbucket)
@@ -262,7 +261,7 @@ extern "C" {
                                                                 increment,
                                                                 create, delta,
                                                                 initial,
-                                                                exptime, cas,
+                                                                exptime, itm,
                                                                 datatype,
                                                                 result,
                                                                 vbucket);
@@ -514,6 +513,41 @@ extern "C" {
                 validate(v, 0, std::numeric_limits<int>::max());
                 e->getConfiguration().setMaxNumNonio(v);
                 ExecutorPool::get()->setMaxNonIO(v);
+            } else if (strcmp(keyz, "bfilter_enabled") == 0) {
+                if (strcmp(valz, "true") == 0) {
+                    e->getConfiguration().setBfilterEnabled(true);
+                } else if (strcmp(valz, "false") == 0) {
+                    e->getConfiguration().setBfilterEnabled(false);
+                } else {
+                    throw std::runtime_error("Value expected: true/false.");
+                }
+            } else if (strcmp(keyz, "bfilter_residency_threshold") == 0) {
+                float val = atof(valz);
+                if (val >= 0.0 && val <= 1.0) {
+                    e->getConfiguration().setBfilterResidencyThreshold(val);
+                } else {
+                    throw std::runtime_error("Value out of range [0.0-1.0].");
+                }
+            } else if (strcmp(keyz, "defragmenter_enabled") == 0) {
+                if (strcmp(valz, "true") == 0) {
+                    e->getConfiguration().setDefragmenterEnabled(true);
+                } else {
+                    e->getConfiguration().setDefragmenterEnabled(false);
+                }
+            } else if (strcmp(keyz, "defragmenter_interval") == 0) {
+                checkNumeric(valz);
+                validate(v, 1, std::numeric_limits<int>::max());
+                e->getConfiguration().setDefragmenterInterval(v);
+            } else if (strcmp(keyz, "defragmenter_age_threshold") == 0) {
+                checkNumeric(valz);
+                validate(v, 0, std::numeric_limits<int>::max());
+                e->getConfiguration().setDefragmenterAgeThreshold(v);
+            } else if (strcmp(keyz, "defragmenter_chunk_duration") == 0) {
+                checkNumeric(valz);
+                validate(v, 1, std::numeric_limits<int>::max());
+                e->getConfiguration().setDefragmenterChunkDuration(v);
+            } else if (strcmp(keyz, "defragmenter_run") == 0) {
+                e->runDefragmenterTask();
             } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
                 checkNumeric(valz);
                 validate(v, 1, std::numeric_limits<int>::max());
@@ -523,7 +557,38 @@ extern "C" {
                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
             }
         } catch(std::runtime_error& ex) {
-            *msg = strdup(ex.what());
+            *msg = "Value out of range.";
+            rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
+        }
+
+        return rv;
+    }
+
+    static protocol_binary_response_status setDcpParam(
+                                                    EventuallyPersistentEngine *e,
+                                                    const char *keyz,
+                                                    const char *valz,
+                                                    const char **msg,
+                                                    size_t *) {
+        protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
+        try {
+
+            if (strcmp(keyz, "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
+                size_t v = atoi(valz);
+                checkNumeric(valz);
+                validate(v, size_t(1), std::numeric_limits<size_t>::max());
+                e->getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(v);
+            } else if (strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") == 0) {
+                size_t v = atoi(valz);
+                checkNumeric(valz);
+                validate(v, size_t(1), std::numeric_limits<size_t>::max());
+                e->getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(v);
+            } else {
+                *msg = "Unknown config param";
+                rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+            }
+        } catch (std::runtime_error& ex) {
+            *msg = "Value out of range.";
             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
         }
 
@@ -723,7 +788,7 @@ extern "C" {
         const char *valuep = keyp + keylen;
         vallen -= (keylen + extlen);
 
-        char keyz[32];
+        char keyz[128];
         char valz[512];
 
         // Read the key.
@@ -754,6 +819,9 @@ extern "C" {
         case protocol_binary_engine_param_checkpoint:
             rv = setCheckpointParam(e, keyz, valz, msg, msg_size);
             break;
+        case protocol_binary_engine_param_dcp:
+            rv = setDcpParam(e, keyz, valz, msg, msg_size);
+            break;
         default:
             rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
         }
@@ -896,7 +964,7 @@ extern "C" {
             res = PROTOCOL_BINARY_RESPONSE_EINVAL;
             break;
         case ENGINE_EWOULDBLOCK:
-            LOG(EXTENSION_LOG_WARNING, "Requst to vbucket %d deletion is in"
+            LOG(EXTENSION_LOG_WARNING, "Request for vbucket %d deletion is in"
                 " EWOULDBLOCK until the database file is removed from disk",
                 vbucket);
             e->storeEngineSpecific(cookie, req);
@@ -987,6 +1055,7 @@ extern "C" {
         compactreq.purge_before_seq =
                                     ntohll(req->message.body.purge_before_seq);
         compactreq.drop_deletes     = req->message.body.drop_deletes;
+        compactreq.bfcb             = NULL;
 
         ENGINE_ERROR_CODE err;
         void* es = e->getEngineSpecific(cookie);
@@ -1094,7 +1163,7 @@ extern "C" {
 
         switch (request->request.opcode) {
         case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
-            return h->getAllVBucketSequenceNumbers(cookie, response);
+            return h->getAllVBucketSequenceNumbers(cookie, request, response);
 
         case PROTOCOL_BINARY_CMD_GET_VBUCKET:
             {
@@ -1153,6 +1222,8 @@ extern "C" {
             break;
         case PROTOCOL_BINARY_CMD_OBSERVE:
             return h->observe(cookie, request, response);
+        case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
+            return h->observe_seqno(cookie, request, response);
         case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
             {
                 rv = h->deregisterTapClient(cookie, request, response);
@@ -1250,17 +1321,32 @@ extern "C" {
                 return rv;
             }
         case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY:
-            if (request->request.extlen != 0 ||
-                request->request.keylen != 0 ||
-                request->request.bodylen != 0) {
-                return ENGINE_EINVAL;
+            {
+                if (request->request.extlen != 0 ||
+                    request->request.keylen != 0 ||
+                    request->request.bodylen != 0) {
+                    return ENGINE_EINVAL;
+                }
+                return h->getRandomKey(cookie, response);
             }
-
-            return h->getRandomKey(cookie, response);
         case CMD_GET_KEYS:
-            return h->getAllKeys(cookie,
-               reinterpret_cast<protocol_binary_request_get_keys*>(request),
-                                                                   response);
+            {
+                return h->getAllKeys(cookie,
+                   reinterpret_cast<protocol_binary_request_get_keys*>
+                                                           (request), response);
+            }
+        case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
+            {
+                return h->getAdjustedTime(cookie,
+                   reinterpret_cast<protocol_binary_request_get_adjusted_time*>
+                                                           (request), response);
+            }
+        case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE:
+            {
+                return h->setDriftCounterState(cookie,
+                reinterpret_cast<protocol_binary_request_set_drift_counter_state*>
+                                                           (request), response);
+            }
         }
 
         // Send a special response for getl since we don't want to send the key
@@ -1754,14 +1840,31 @@ extern "C" {
         return ENGINE_SUCCESS;
     }
 
-    static bool EvpGetItemInfo(ENGINE_HANDLE *, const void *,
+    static bool EvpGetItemInfo(ENGINE_HANDLE *handle, const void *,
                                const item* itm, item_info *itm_info)
     {
         const Item *it = reinterpret_cast<const Item*>(itm);
+        EventuallyPersistentEngine *engine = getHandle(handle);
         if (itm_info->nvalue < 1) {
             return false;
         }
         itm_info->cas = it->getCas();
+
+        if (engine) {
+            RCPtr<VBucket> vb = engine->getEpStore()->getVBucket(it->getVBucketId());
+
+            if (vb) {
+                itm_info->vbucket_uuid = vb->failovers->getLatestUUID();
+            } else {
+                itm_info->vbucket_uuid = 0;
+            }
+
+            releaseHandle(handle);
+        } else{
+            itm_info->vbucket_uuid = 0;
+        }
+
+        itm_info->seqno = it->getBySeqno();
         itm_info->exptime = it->getExptime();
         itm_info->nbytes = it->getNBytes();
         itm_info->datatype = it->getDataType();
@@ -1833,7 +1936,7 @@ EventuallyPersistentEngine::EventuallyPersistentEngine(
     clusterConfig(), epstore(NULL), workload(NULL),
     workloadPriority(NO_BUCKET_PRIORITY),
     tapThrottle(NULL), getServerApiFunc(get_server_api),
-    dcpConnMap_(NULL), tapConnMap(NULL)tapConfig(NULL), checkpointConfig(NULL),
+    dcpConnMap_(NULL), tapConnMap(NULL) ,tapConfig(NULL), checkpointConfig(NULL),
     trafficEnabled(false), flushAllEnabled(false), startupTime(0)
 {
     interface.interface = 1;
@@ -1857,7 +1960,6 @@ EventuallyPersistentEngine::EventuallyPersistentEngine(
     ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
     ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
     ENGINE_HANDLE_V1::get_stats_struct = NULL;
-    ENGINE_HANDLE_V1::errinfo = NULL;
     ENGINE_HANDLE_V1::aggregate_stats = NULL;
 
 
@@ -1887,7 +1989,6 @@ EventuallyPersistentEngine::EventuallyPersistentEngine(
                                              ENGINE_FEATURE_PERSISTENT_STORAGE;
     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
-
 }
 
 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
@@ -2058,46 +2159,41 @@ void EventuallyPersistentEngine::destroy(bool force) {
     }
 }
 
-class FlushAllTask : public GlobalTask {
-public:
-    FlushAllTask(EventuallyPersistentStore *st, TapConnMap &tcm, double when)
-        : GlobalTask(&st->getEPEngine(), Priority::FlushAllPriority, when,
-                     false), epstore(st), tapConnMap(tcm) { }
-
-    bool run(void) {
-        epstore->reset();
-        tapConnMap.addFlushEvent();
-        return false;
-    }
-
-    std::string getDescription() {
-        return std::string("Performing flush.");
-    }
-
-private:
-    EventuallyPersistentStore *epstore;
-    TapConnMap                &tapConnMap;
-};
-
-ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *, time_t when){
+ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie,
+                                                    time_t when){
     if (!flushAllEnabled) {
         return ENGINE_ENOTSUP;
     }
 
-    if (isDegradedMode()) {
+    if (!isDegradedMode()) {
         return ENGINE_TMPFAIL;
     }
 
-    if (when == 0) {
-        epstore->reset();
-        tapConnMap->addFlushEvent();
+    /*
+     * Supporting only a SYNC operation for bucket flush
+     */
+
+    void* es = getEngineSpecific(cookie);
+    if (es == NULL) {
+
+        // Check if diskFlushAll was false and set it to true
+        // if yes, if the atomic variable weren't false, then
+        // we will assume that a flushAll has been scheduled
+        // already and return TMPFAIL.
+        if (epstore->scheduleFlushAllTask(cookie, when)) {
+            storeEngineSpecific(cookie, this);
+            return ENGINE_EWOULDBLOCK;
+        } else {
+            LOG(EXTENSION_LOG_INFO, "Tried to trigger a bucket flush, but"
+                    "there seems to be a task running already!");
+            return ENGINE_TMPFAIL;
+        }
+
     } else {
-        ExTask flushTask = new FlushAllTask(epstore, *tapConnMap,
-                static_cast<double>(when));
-        ExecutorPool::get()->schedule(flushTask, NONIO_TASK_IDX);
+        storeEngineSpecific(cookie, NULL);
+        LOG(EXTENSION_LOG_WARNING, "Completed bucket flush operation");
+        return ENGINE_SUCCESS;
     }
-
-    return ENGINE_SUCCESS;
 }
 
 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
@@ -2202,14 +2298,17 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
                 }
 
                 ret = store(cookie, old, cas, OPERATION_CAS, vbucket);
+
+                it->setBySeqno(old->getBySeqno());
                 itemRelease(cookie, i);
             }
         } while (ret == ENGINE_KEY_EEXISTS);
 
-        // Map the error code back to what memcacpable expects
+        // Map the error code back to what memcapable expects
         if (ret == ENGINE_KEY_ENOENT) {
             ret = ENGINE_NOT_STORED;
         }
+
         break;
 
     default:
@@ -3179,6 +3278,9 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
     add_casted_stat("ep_vb_snapshot_total",
                     epstats.snapshotVbucketHisto.total(), add_stat, cookie);
 
+    add_casted_stat("ep_persist_vbstate_total",
+                    epstats.persistVBStateHisto.total(), add_stat, cookie);
+
     add_casted_stat("ep_vb_total",
                     activeCountVisitor.getVBucketNumber() +
                     replicaCountVisitor.getVBucketNumber() +
@@ -3285,14 +3387,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
     add_casted_stat("ep_num_not_my_vbuckets", epstats.numNotMyVBuckets,
                     add_stat, cookie);
 
-    add_casted_stat("ep_io_num_read", epstats.io_num_read,
-                    add_stat, cookie);
-    add_casted_stat("ep_io_num_write", epstats.io_num_write, add_stat, cookie);
-    add_casted_stat("ep_io_read_bytes", epstats.io_read_bytes,
-                    add_stat, cookie);
-    add_casted_stat("ep_io_write_bytes", epstats.io_write_bytes,
-                     add_stat, cookie);
-
     add_casted_stat("ep_pending_ops", epstats.pendingOps, add_stat, cookie);
     add_casted_stat("ep_pending_ops_total", epstats.pendingOpsTotal,
                     add_stat, cookie);
@@ -3427,6 +3521,12 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
     add_casted_stat("ep_workload_pattern",
                     workload->stringOfWorkLoadPattern(),
                     add_stat, cookie);
+
+    add_casted_stat("ep_defragmenter_num_visited", epstats.defragNumVisited,
+                    add_stat, cookie);
+    add_casted_stat("ep_defragmenter_num_moved", epstats.defragNumMoved,
+                    add_stat, cookie);
+
     return ENGINE_SUCCESS;
 }
 
@@ -3637,7 +3737,7 @@ public:
 class StatCheckpointTask : public GlobalTask {
 public:
     StatCheckpointTask(EventuallyPersistentEngine *e, const void *c,
-            ADD_STAT a) : GlobalTask(e, Priority::CheckpointStatsPriority,
+            ADD_STAT a) : GlobalTask(e, TaskId::StatCheckpointTask,
                                      0, false),
                           ep(e), cookie(c), add_stat(a) { }
     bool run(void) {
@@ -3969,8 +4069,11 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doDcpStats(const void *cookie,
                     add_stat, cookie);
     add_casted_stat("ep_dcp_queue_backfillremaining",
                     aggregator.conn_queueBackfillRemaining, add_stat, cookie);
+    add_casted_stat("ep_dcp_num_running_backfills",
+                    dcpConnMap_->getNumActiveSnoozingBackfills(), add_stat, cookie);
+    add_casted_stat("ep_dcp_max_running_backfills",
+                    dcpConnMap_->getMaxActiveSnoozingBackfills(), add_stat, cookie);
 
-    dcpConnMap_->addStats(add_stat, cookie);
     return ENGINE_SUCCESS;
 }
 
@@ -4108,6 +4211,8 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doTimingStats(const void *cookie,
     add_casted_stat("disk_commit", stats.diskCommitHisto, add_stat, cookie);
     add_casted_stat("disk_vbstate_snapshot", stats.snapshotVbucketHisto,
                     add_stat, cookie);
+    add_casted_stat("disk_persist_vbstate", stats.persistVBStateHisto,
+                    add_stat, cookie);
 
     add_casted_stat("item_alloc_sizes", stats.itemAllocSizeHisto,
                     add_stat, cookie);
@@ -4118,9 +4223,9 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doSchedulerStats(const void
                                                                 *cookie,
                                                                 ADD_STAT
                                                                 add_stat) {
-    for (size_t i = 0; i < MAX_TYPE_ID; ++i) {
-        add_casted_stat(Priority::getTypeName(static_cast<type_id_t>(i)),
-                        stats.schedulingHisto[i],
+    for (TaskId id : GlobalTask::allTaskIds) {
+        add_casted_stat(GlobalTask::getTaskName(id),
+                        stats.schedulingHisto[static_cast<int>(id)],
                         add_stat, cookie);
     }
 
@@ -4131,9 +4236,9 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doRunTimeStats(const void
                                                                 *cookie,
                                                                 ADD_STAT
                                                                 add_stat) {
-    for (size_t i = 0; i < MAX_TYPE_ID; ++i) {
-        add_casted_stat(Priority::getTypeName(static_cast<type_id_t>(i)),
-                        stats.taskRuntimeHisto[i],
+    for (TaskId id : GlobalTask::allTaskIds) {
+        add_casted_stat(GlobalTask::getTaskName(id),
+                        stats.taskRuntimeHisto[static_cast<int>(id)],
                         add_stat, cookie);
     }
 
@@ -4229,9 +4334,8 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doSeqnoStats(const void *cookie,
         // An atomic read of vbucket state without acquiring the
         // reader lock for state should suffice here.
         if (vb->getState() != vbucket_state_active) {
-            uint64_t snapshot_start, snapshot_end;
-            vb->getCurrentSnapshot(snapshot_start, snapshot_end);
-            relHighSeqno = snapshot_end;
+            snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
+            relHighSeqno = info.range.end;
         }
 
         char buffer[32];
@@ -4257,9 +4361,8 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doSeqnoStats(const void *cookie,
             // An atomic read of vbucket state without acquiring the
             // reader lock for state should suffice here.
             if (vb->getState() != vbucket_state_active) {
-                uint64_t snapshot_start, snapshot_end;
-                vb->getCurrentSnapshot(snapshot_start, snapshot_end);
-                relHighSeqno = snapshot_end;
+                snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
+                relHighSeqno = info.range.end;
             }
 
             char buffer[32];
@@ -4344,6 +4447,10 @@ void EventuallyPersistentEngine::addLookupAllKeys(const void *cookie,
     allKeysLookups[cookie] = err;
 }
 
+void EventuallyPersistentEngine::runDefragmenterTask(void) {
+    epstore->runDefragmenterTask();
+}
+
 ENGINE_ERROR_CODE EventuallyPersistentEngine::getStats(const void* cookie,
                                                        const char* stat_key,
                                                        int nkey,
@@ -4599,6 +4706,90 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::observe(
                         cookie);
 }
 
+ENGINE_ERROR_CODE EventuallyPersistentEngine::observe_seqno(
+                                       const void* cookie,
+                                       protocol_binary_request_header *request,
+                                       ADD_RESPONSE response) {
+    protocol_binary_request_no_extras *req =
+                          (protocol_binary_request_no_extras*)request;
+    const char* data = reinterpret_cast<const char*>(req->bytes) +
+                                                   sizeof(req->bytes);
+    uint16_t vb_id;
+    uint64_t vb_uuid;
+    uint8_t  format_type;
+    uint64_t last_persisted_seqno;
+    uint64_t current_seqno;
+
+    std::stringstream result;
+
+    vb_id = ntohs(req->message.header.request.vbucket);
+    memcpy(&vb_uuid, data, sizeof(uint64_t));
+    vb_uuid = ntohll(vb_uuid);
+
+    LOG(EXTENSION_LOG_DEBUG, "Observing vbucket: %d with uuid: %llu\n",
+                             vb_id, vb_uuid);
+
+    RCPtr<VBucket> vb = epstore->getVBucket(vb_id);
+
+    if (!vb) {
+        LockHolder lh(clusterConfig.lock);
+        return sendResponse(response, NULL, 0, NULL, 0,
+                            clusterConfig.config, clusterConfig.len,
+                            PROTOCOL_BINARY_RAW_BYTES,
+                            PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
+                            cookie);
+    }
+
+    //Check if the vb uuid matches with the latest entry
+    failover_entry_t entry = vb->failovers->getLatestEntry();
+
+    if (vb_uuid != entry.vb_uuid) {
+       uint64_t failover_highseqno = 0;
+       uint64_t latest_uuid;
+       bool found = vb->failovers->getLastSeqnoForUUID(vb_uuid, &failover_highseqno);
+       if (!found) {
+           return sendResponse(response, NULL, 0, 0, 0, 0, 0,
+                               PROTOCOL_BINARY_RAW_BYTES,
+                               PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0,
+                               cookie);
+       }
+
+       format_type = 1;
+       last_persisted_seqno = htonll(epstore->getVBuckets().getPersistenceSeqno(vb_id));
+       current_seqno = htonll(vb->getHighSeqno());
+       latest_uuid = htonll(entry.vb_uuid);
+       vb_id = htons(vb_id);
+       vb_uuid = htonll(vb_uuid);
+       failover_highseqno = htonll(failover_highseqno);
+
+       result.write((char*) &format_type, sizeof(uint8_t));
+       result.write((char*) &vb_id, sizeof(uint16_t));
+       result.write((char*) &latest_uuid, sizeof(uint64_t));
+       result.write((char*) &last_persisted_seqno, sizeof(uint64_t));
+       result.write((char*) &current_seqno, sizeof(uint64_t));
+       result.write((char*) &vb_uuid, sizeof(uint64_t));
+       result.write((char*) &failover_highseqno, sizeof(uint64_t));
+    } else {
+        format_type = 0;
+        last_persisted_seqno = htonll(epstore->getVBuckets().getPersistenceSeqno(vb_id));
+        current_seqno = htonll(vb->getHighSeqno());
+        vb_id   =  htons(vb_id);
+        vb_uuid =  htonll(vb_uuid);
+
+        result.write((char*) &format_type, sizeof(uint8_t));
+        result.write((char*) &vb_id, sizeof(uint16_t));
+        result.write((char*) &vb_uuid, sizeof(uint64_t));
+        result.write((char*) &last_persisted_seqno, sizeof(uint64_t));
+        result.write((char*) &current_seqno, sizeof(uint64_t));
+    }
+
+    return sendResponse(response, NULL, 0, 0, 0, result.str().data(),
+                        result.str().length(),
+                        PROTOCOL_BINARY_RAW_BYTES,
+                        PROTOCOL_BINARY_RESPONSE_SUCCESS, 0,
+                        cookie);
+}
+
 ENGINE_ERROR_CODE EventuallyPersistentEngine::touch(const void *cookie,
                                        protocol_binary_request_header *request,
                                        ADD_RESPONSE response)
@@ -4714,7 +4905,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::deregisterTapClient(
             if (!vb) {
                 continue;
             }
-            vb->checkpointManager.removeTAPCursor(tap_name);
+            vb->checkpointManager.removeCursor(tap_name);
         }
     }
 
@@ -4924,35 +5115,57 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::getMeta(const void* cookie,
                                      protocol_binary_request_get_meta *request,
                                                       ADD_RESPONSE response)
 {
-    if (request->message.header.request.extlen != 0 ||
-        request->message.header.request.keylen == 0) {
+    if (request->message.header.request.keylen == 0) {
         return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
                             PROTOCOL_BINARY_RAW_BYTES,
                             PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
     }
 
-    std::string key((char *)(request->bytes + sizeof(request->bytes)),
+    uint8_t extlen = request->message.header.request.extlen;
+    std::string key((char *)(request->bytes + sizeof(request->bytes) + extlen),
                     (size_t)ntohs(request->message.header.request.keylen));
     uint16_t vbucket = ntohs(request->message.header.request.vbucket);
+
+
+    bool sendConfResMode = false;
+    uint8_t confResMode;
+
+    if (extlen == 1) {
+        uint8_t reqExtMeta;
+        memcpy(&reqExtMeta, request->bytes + sizeof(request->bytes), extlen);
+        if (reqExtMeta == 0x01) {
+            sendConfResMode = true;
+        }
+    }
+
     ItemMetaData metadata;
     uint32_t deleted;
 
     ENGINE_ERROR_CODE rv = epstore->getMetaData(key, vbucket, cookie,
-                                                metadata, deleted);
-    uint8_t meta[20];
-    deleted = htonl(deleted);
-    uint32_t flags = metadata.flags;
-    uint32_t exp = htonl(metadata.exptime);
-    uint64_t seqno = htonll(metadata.revSeqno);
-
-    memcpy(meta, &deleted, 4);
-    memcpy(meta + 4, &flags, 4);
-    memcpy(meta + 8, &exp, 4);
-    memcpy(meta + 12, &seqno, 8);
+                                                metadata, deleted,
+                                                confResMode);
 
     if (rv == ENGINE_SUCCESS) {
+
+        uint8_t meta[21];
+        uint8_t metalen = 20;
+        deleted = htonl(deleted);
+        uint32_t flags = metadata.flags;
+        uint32_t exp = htonl(metadata.exptime);
+        uint64_t seqno = htonll(metadata.revSeqno);
+
+        memcpy(meta, &deleted, 4);
+        memcpy(meta + 4, &flags, 4);
+        memcpy(meta + 8, &exp, 4);
+        memcpy(meta + 12, &seqno, 8);
+
+        if (sendConfResMode) {
+            *(meta + metalen) = confResMode;
+            metalen++;
+        }
+
         rv = sendResponse(response, NULL, 0, (const void *)meta,
-                          20, NULL, 0,
+                          metalen, NULL, 0,
                           PROTOCOL_BINARY_RAW_BYTES,
                           PROTOCOL_BINARY_RESPONSE_SUCCESS,
                           metadata.cas, cookie);
@@ -4983,8 +5196,10 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::setWithMeta(const void* cookie,
     // revid_nbytes, flags and exptime is mandatory fields.. and we need a key
     uint8_t extlen = request->message.header.request.extlen;
     uint16_t keylen = ntohs(request->message.header.request.keylen);
-    if ((extlen != 24 && extlen != 28) ||
-         request->message.header.request.keylen == 0) {
+    if ((extlen != 24           // Packet without nmeta or options
+         && extlen != 28        // Packet with options but without nmeta
+         && extlen != 26)       // Packet with nmeta
+        || keylen == 0) {
         return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
                             PROTOCOL_BINARY_RAW_BYTES,
                             PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
@@ -5004,29 +5219,55 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::setWithMeta(const void* cookie,
     uint8_t datatype = request->message.header.request.datatype;
     size_t vallen = bodylen - keylen - extlen;
 
-    if (vallen > maxItemSize) {
-        LOG(EXTENSION_LOG_WARNING,
-            "Item value size %ld for setWithMeta is bigger "
-            "than the max size %ld allowed!!!\n", vallen, maxItemSize);
-        return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                            PROTOCOL_BINARY_RAW_BYTES,
-                            PROTOCOL_BINARY_RESPONSE_E2BIG, 0, cookie);
-    }
-
     uint32_t flags = request->message.body.flags;
     uint32_t expiration = ntohl(request->message.body.expiration);
     uint64_t seqno = ntohll(request->message.body.seqno);
     uint64_t cas = ntohll(request->message.body.cas);
 
     bool force = false;
+    ExtendedMetaData *emd = NULL;
+
     if (extlen == 28) {
         uint32_t options;
         memcpy(&options, request->bytes + sizeof(request->bytes),
                sizeof(options));
-        key += 4;
+        key += 4;       // 4 bytes for options
         if (ntohl(options) & SKIP_CONFLICT_RESOLUTION_FLAG) {
             force = true;
         }
+    } else if (extlen == 26) {
+        uint16_t nmeta;
+        memcpy(&nmeta, request->bytes + sizeof(request->bytes),
+               sizeof(nmeta));
+        key += 2;       // 2 bytes for nmeta
+        nmeta = ntohs(nmeta);
+        if (nmeta > 0) {
+            // Correct the vallen
+            vallen -= nmeta;
+            emd = new ExtendedMetaData(key + keylen + vallen, nmeta);
+
+            if (emd == NULL) {
+                return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
+                                    PROTOCOL_BINARY_RAW_BYTES,
+                                    PROTOCOL_BINARY_RESPONSE_ENOMEM, 0, cookie);
+            }
+
+            if (emd->getStatus() == ENGINE_EINVAL) {
+                delete emd;
+                return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
+                                    PROTOCOL_BINARY_RAW_BYTES,
+                                    PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
+            }
+        }
+    }
+
+    if (vallen > maxItemSize) {
+        LOG(EXTENSION_LOG_WARNING,
+            "Item value size %ld for setWithMeta is bigger "
+            "than the max size %ld allowed!!!\n", vallen, maxItemSize);
+        return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
+                            PROTOCOL_BINARY_RAW_BYTES,
+                            PROTOCOL_BINARY_RESPONSE_E2BIG, 0, cookie);
     }
 
     uint8_t *dta = key + keylen;
@@ -5042,8 +5283,8 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::setWithMeta(const void* cookie,
     uint8_t ext_meta[1];
     uint8_t ext_len = EXT_META_LEN;
     *(ext_meta) = datatype;
-    Item *itm = new Item(key, keylen, vallen, flags, expiration, ext_meta, ext_len,
-                         cas, -1, vbucket);
+    Item *itm = new Item(key, keylen, flags, expiration, dta, vallen,
+                         ext_meta, ext_len, cas, -1, vbucket);
 
     if (itm == NULL) {
         return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
@@ -5060,14 +5301,27 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::setWithMeta(const void* cookie,
     }
 
     itm->setRevSeqno(seqno);
-    memcpy((char*)itm->getData(), dta, vallen);
+
+    if (emd) {
+        itm->setConflictResMode(
+                 static_cast<enum conflict_resolution_mode>(
+                                            emd->getConflictResMode()));
+    }
 
     bool allowExisting = (opcode == PROTOCOL_BINARY_CMD_SET_WITH_META ||
                           opcode == PROTOCOL_BINARY_CMD_SETQ_WITH_META);
 
+    uint8_t meta[16];
+    uint64_t by_seqno = 0;
+    uint64_t vb_uuid = 0;
+
     ENGINE_ERROR_CODE ret = epstore->setWithMeta(*itm, ntohll(request->
                                                  message.header.request.cas),
-                                                 cookie, force, allowExisting);
+                                                 &by_seqno, cookie, force,
+                                                 allowExisting, 0xff, true,
+                                                 emd);
+
+    delete emd;
 
     if (ret == ENGINE_SUCCESS) {
         ++stats.numOpsSetMeta;
@@ -5115,8 +5369,17 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::setWithMeta(const void* cookie,
                             rc, cas, cookie);
     }
 
-    return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                        PROTOCOL_BINARY_RAW_BYTES,
+    if (ret == ENGINE_SUCCESS && isMutationExtrasSupported(cookie)) {
+        RCPtr<VBucket> vb = epstore->getVBucket(vbucket);
+        vb_uuid = htonll(vb->failovers->getLatestUUID());
+        by_seqno = htonll(by_seqno);
+        memcpy(meta, &vb_uuid, sizeof(vb_uuid));
+        memcpy(meta + sizeof(vb_uuid), &by_seqno, sizeof(by_seqno));
+        return sendResponse(response, NULL, 0, (const void *)meta, sizeof(meta),
+                            NULL, 0, PROTOCOL_BINARY_RAW_BYTES, rc, cas, cookie);
+    }
+
+    return sendResponse(response, NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
                         rc, cas, cookie);
 }
 
@@ -5127,7 +5390,10 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::deleteWithMeta(
     // revid_nbytes, flags and exptime is mandatory fields.. and we need a key
     uint16_t nkey = ntohs(request->message.header.request.keylen);
     uint8_t extlen = request->message.header.request.extlen;
-    if ((extlen != 24 && extlen != 28) || nkey == 0) {
+    if ((extlen != 24           // Packet without nmeta or options
+         && extlen != 28        // Packet with options but without nmeta
+         && extlen != 26)       // Packet with nmeta
+        || nkey == 0) {
         return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
                             PROTOCOL_BINARY_RAW_BYTES,
                             PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
@@ -5152,20 +5418,53 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::deleteWithMeta(
     uint64_t metacas = ntohll(request->message.body.cas);
 
     bool force = false;
+    ExtendedMetaData *emd = NULL;
+
     if (extlen == 28) {
         uint32_t options;
         memcpy(&options, request->bytes + sizeof(request->bytes),
                sizeof(options));
-        key_ptr += 4;
+        key_ptr += 4;       // 4 bytes for options
         if (ntohl(options) & SKIP_CONFLICT_RESOLUTION_FLAG) {
             force = true;
         }
+    } else if (extlen == 26) {
+        uint16_t nmeta;
+        memcpy(&nmeta, request->bytes + sizeof(request->bytes),
+               sizeof(nmeta));
+        key_ptr += 2;       // 2 bytes for nmeta
+        nmeta = ntohs(nmeta);
+        if (nmeta > 0) {
+            emd = new ExtendedMetaData(key_ptr + nkey, nmeta);
+
+            if (emd == NULL ) {
+                return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
+                                    PROTOCOL_BINARY_RAW_BYTES,
+                                    PROTOCOL_BINARY_RESPONSE_ENOMEM, 0, cookie);
+            }
+
+            if (emd->getStatus() == ENGINE_EINVAL) {
+                delete emd;
+                return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
+                                    PROTOCOL_BINARY_RAW_BYTES,
+                                    PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
+            }
+        }
     }
+
     std::string key(key_ptr, nkey);
 
     ItemMetaData itm_meta(metacas, seqno, flags, expiration);
-    ENGINE_ERROR_CODE ret = epstore->deleteWithMeta(key, &cas, vbucket, cookie,
-                                                    force, &itm_meta);
+
+    uint8_t meta[16];
+    uint64_t by_seqno = 0;
+    uint64_t vb_uuid = 0;
+
+    ENGINE_ERROR_CODE ret = epstore->deleteWithMeta(key, &cas, &by_seqno, vbucket, cookie,
+                                                    force, &itm_meta, false, true, 0, emd);
+
+    delete emd;
+
     if (ret == ENGINE_SUCCESS) {
         stats.numOpsDelMeta++;
     } else if (ret == ENGINE_ENOMEM) {
@@ -5189,8 +5488,18 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::deleteWithMeta(
                             rc, cas, cookie);
     }
 
+    if (ret == ENGINE_SUCCESS && isMutationExtrasSupported(cookie)) {
+        RCPtr<VBucket> vb = epstore->getVBucket(vbucket);
+        vb_uuid = htonll(vb->failovers->getLatestUUID());
+        by_seqno = htonll(by_seqno);
+        memcpy(meta, &vb_uuid, 8);
+        memcpy(meta + 8, &by_seqno, 8);
+        return sendResponse(response, NULL, 0, (const void *)meta, sizeof(meta),
+                            NULL, 0, PROTOCOL_BINARY_RAW_BYTES, rc, cas, cookie);
+    }
+
     return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                        PROTOCOL_BINARY_RAW_BYTES, rc, 0, cookie);
+                        PROTOCOL_BINARY_RAW_BYTES, rc, cas, cookie);
 }
 
 ENGINE_ERROR_CODE
@@ -5274,7 +5583,7 @@ EventuallyPersistentEngine::handleTrafficControlCmd(const void *cookie,
             status = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
         } else {
             if (enableTraffic(true)) {
-                msg << "Data traffic to persistent engine is enabled";
+                msg << "Data traffic to persistence engine is enabled";
             } else {
                 msg <<
                       "Data traffic to persistence engine was already enabled";
@@ -5363,7 +5672,7 @@ EventuallyPersistentEngine::doTapVbTakeoverStats(const void *cookie,
     } else {
         if (tapConnMap->isBackfillCompleted(tapName)) {
             chk_items = vb_items > 0 ?
-                vb->checkpointManager.getNumItemsForTAPConnection(tapName) : 0;
+                vb->checkpointManager.getNumItemsForCursor(tapName) : 0;
             total = chk_items;
             add_casted_stat("status", "backfill completed", add_stat, cookie);
         } else {
@@ -5413,7 +5722,6 @@ EventuallyPersistentEngine::returnMeta(const void* cookie,
     size_t vallen = bodylen - keylen - extlen;
     uint64_t seqno;
 
-
     ENGINE_ERROR_CODE ret = ENGINE_EINVAL;
     if (mutate_type == SET_RET_META || mutate_type == ADD_RET_META) {
         uint8_t *dta = key + keylen;
@@ -5429,7 +5737,7 @@ EventuallyPersistentEngine::returnMeta(const void* cookie,
         uint8_t ext_meta[1];
         uint8_t ext_len = EXT_META_LEN;
         *(ext_meta) = datatype;
-        Item *itm = new Item(key, keylen, vallen, flags, exp, ext_meta,
+        Item *itm = new Item(key, keylen, flags, exp, dta, vallen, ext_meta,
                              ext_len, cas, -1, vbucket);
 
         if (!itm) {
@@ -5438,7 +5746,6 @@ EventuallyPersistentEngine::returnMeta(const void* cookie,
                                 PROTOCOL_BINARY_RESPONSE_ENOMEM, 0, cookie);
         }
 
-        memcpy((char*)itm->getData(), dta, vallen);
         if (mutate_type == SET_RET_META) {
             ret = epstore->set(*itm, cookie);
         } else {
@@ -5452,9 +5759,10 @@ EventuallyPersistentEngine::returnMeta(const void* cookie,
         delete itm;
     } else if (mutate_type == DEL_RET_META) {
         ItemMetaData itm_meta;
+        mutation_descr_t mut_info;
         std::string key_str(reinterpret_cast<char*>(key), keylen);
         ret = epstore->deleteItem(key_str, &cas, vbucket, cookie, false,
-                                  &itm_meta);
+                                  &itm_meta, &mut_info);
         if (ret == ENGINE_SUCCESS) {
             ++stats.numOpsDelRetMeta;
         }
@@ -5518,6 +5826,19 @@ EventuallyPersistentEngine::setClusterConfig(const void* cookie,
         lh.unlock();
     }
 
+    // clusterConfig is opaque to ep-engine, but typically there is a rev id
+    // at the start of it. Print the first 100 bytes which hopefully includes
+    // helpful identifying information.
+    const int CONFIG_LIMIT = 100;
+    if (clusterConfig.len > CONFIG_LIMIT) {
+        LOG(EXTENSION_LOG_WARNING, "Updated cluster configuration - first %d "
+                "bytes: '%.*s'...\n", CONFIG_LIMIT, CONFIG_LIMIT,
+                clusterConfig.config);
+    } else {
+        LOG(EXTENSION_LOG_WARNING, "Updated cluster configuration: '%.*s'\n",
+            clusterConfig.len, clusterConfig.config);
+    }
+
     return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
                         PROTOCOL_BINARY_RAW_BYTES,
                         PROTOCOL_BINARY_RESPONSE_SUCCESS, cas, cookie);
@@ -5541,8 +5862,8 @@ class FetchAllKeysTask : public GlobalTask {
 public:
     FetchAllKeysTask(EventuallyPersistentEngine *e, const void *c,
                      ADD_RESPONSE resp, const std::string &start_key_,
-                     uint16_t vbucket, uint32_t count_, const Priority &p) :
-        GlobalTask(e, p, 0, false), engine(e), cookie(c),
+                     uint16_t vbucket, uint32_t count_) :
+        GlobalTask(e, TaskId::FetchAllKeysTask, 0, false), engine(e), cookie(c),
         response(resp), start_key(start_key_), vbid(vbucket),
         count(count_) { }
 
@@ -5551,20 +5872,29 @@ public:
     }
 
     bool run() {
-        AllKeysCB *cb = new AllKeysCB();
-        ENGINE_ERROR_CODE err =
-              engine->getEpStore()->getROUnderlying(vbid)->getAllKeys(vbid,
-                                                              start_key, count,
-                                                              cb);
-        if (err == ENGINE_SUCCESS) {
-            err =  sendResponse(response, NULL, 0, NULL, 0,
-                                cb->getAllKeysPtr(), cb->getAllKeysLen(),
-                                PROTOCOL_BINARY_RAW_BYTES,
-                                PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
+        ENGINE_ERROR_CODE err;
+        if (engine->getEpStore()->getVBuckets().isBucketCreation(vbid)) {
+            // Returning an empty packet with a SUCCESS response as
+            // there aren't any keys during the vbucket file creation.
+            err = sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
+                               PROTOCOL_BINARY_RAW_BYTES,
+                               PROTOCOL_BINARY_RESPONSE_SUCCESS, 0,
+                               cookie);
+        } else {
+            AllKeysCB *cb = new AllKeysCB();
+            err = engine->getEpStore()->getROUnderlying(vbid)->getAllKeys(
+                                                    vbid, start_key, count, cb);
+            if (err == ENGINE_SUCCESS) {
+                err =  sendResponse(response, NULL, 0, NULL, 0,
+                                    cb->getAllKeysPtr(), cb->getAllKeysLen(),
+                                    PROTOCOL_BINARY_RAW_BYTES,
+                                    PROTOCOL_BINARY_RESPONSE_SUCCESS, 0,
+                                    cookie);
+            }
+            delete cb;
         }
         engine->addLookupAllKeys(cookie, err);
         engine->notifyIOComplete(cookie, err);
-        delete cb;
         return false;
     }
 
@@ -5618,8 +5948,7 @@ EventuallyPersistentEngine::getAllKeys(const void* cookie,
     std::string start_key(keyptr, keylen);
 
     ExTask task = new FetchAllKeysTask(this, cookie, response, start_key,
-                                       vbucket, count,
-                                       Priority::BgFetcherPriority);
+                                       vbucket, count);
     ExecutorPool::get()->schedule(task, READER_TASK_IDX);
     return ENGINE_EWOULDBLOCK;
 }
@@ -5646,6 +5975,63 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::getRandomKey(const void *cookie,
     return ret;
 }
 
+ENGINE_ERROR_CODE EventuallyPersistentEngine::getAdjustedTime(
+                             const void *cookie,
+                             protocol_binary_request_get_adjusted_time *request,
+                             ADD_RESPONSE response) {
+
+    uint16_t vbucket = ntohs(request->message.header.request.vbucket);
+    RCPtr<VBucket> vb = getVBucket(vbucket);
+    if (!vb) {
+        LockHolder lh(clusterConfig.lock);
+        return sendResponse(response, NULL, 0, NULL, 0,
+                            clusterConfig.config,
+                            clusterConfig.len,
+                            PROTOCOL_BINARY_RAW_BYTES,
+                            PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
+                            cookie);
+    }
+    // Will return the vbucket's adjusted time, only if
+    // time synchronization for the vbucket is enabled
+    if (vb->isTimeSyncEnabled()) {
+        int64_t adjusted_time = gethrtime() + vb->getDriftCounter();
+        adjusted_time = htonll(adjusted_time);
+        return sendResponse(response, NULL, 0, NULL, 0,
+                            (const void *)&adjusted_time, sizeof(int64_t),
+                            PROTOCOL_BINARY_RAW_BYTES,
+                            PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
+    } else {
+        return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
+                            PROTOCOL_BINARY_RAW_BYTES,
+                            PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
+                            cookie);
+    }
+}
+
+ENGINE_ERROR_CODE EventuallyPersistentEngine::setDriftCounterState(
+                       const void *cookie,
+                       protocol_binary_request_set_drift_counter_state *request,
+                       ADD_RESPONSE response) {
+
+    uint16_t vbucket = ntohs(request->message.header.request.vbucket);
+    RCPtr<VBucket> vb = getVBucket(vbucket);
+    if (!vb) {
+        LockHolder lh(clusterConfig.lock);
+        return sendResponse(response, NULL, 0, NULL, 0,
+                            clusterConfig.config,
+                            clusterConfig.len,
+                            PROTOCOL_BINARY_RAW_BYTES,
+                            PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
+                            cookie);
+    }
+    int64_t initialDriftCount = ntohll(request->message.body.initial_drift);
+    uint8_t timeSync = request->message.body.time_sync;
+    vb->setDriftCounterState(initialDriftCount, timeSync);
+    return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
+                        PROTOCOL_BINARY_RAW_BYTES,
+                        PROTOCOL_BINARY_RESPONSE_SUCCESS, 0 , cookie);
+}
+
 ENGINE_ERROR_CODE EventuallyPersistentEngine::dcpOpen(const void* cookie,
                                                        uint32_t opaque,
                                                        uint32_t seqno,
@@ -5732,11 +6118,42 @@ void EventuallyPersistentEngine::handleDisconnect(const void *cookie) {
     }
 }
 
-ENGINE_ERROR_CODE EventuallyPersistentEngine::getAllVBucketSequenceNumbers(const void *cookie,
-                                                                           ADD_RESPONSE response) {
+ENGINE_ERROR_CODE EventuallyPersistentEngine::getAllVBucketSequenceNumbers(
+                                    const void *cookie,
+                                    protocol_binary_request_header *request,
+                                    ADD_RESPONSE response) {
+    protocol_binary_request_get_all_vb_seqnos *req =
+        reinterpret_cast<protocol_binary_request_get_all_vb_seqnos*>(request);
+
+    // if extlen is non-zero, it limits the result to only include the
+    // vbuckets in the specified vbucket state.
+    size_t bodylen = ntohl(req->message.header.request.bodylen);
+    uint8_t extlen = req->message.header.request.extlen;
+
+    if ((bodylen != extlen) ||
+        ((bodylen != 0) && (bodylen != sizeof(vbucket_state_t)))) {
+        const std::string msg("Incorrect packet format");
+        return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
+                            msg.length(), PROTOCOL_BINARY_RAW_BYTES,
+                            PROTOCOL_BINARY_RESPONSE_EINVAL,
+                            0, cookie);
+    }
 
-    std::vector<uint8_t> payload;
+    vbucket_state_t reqState = static_cast<vbucket_state_t>(0);;
+    if (bodylen != 0) {
+        memcpy(&reqState, &req->message.body.state, sizeof(reqState));
+        reqState = static_cast<vbucket_state_t>(ntohl(reqState));
+
+        if (!is_valid_vbucket_state_t(reqState)) {
+            const std::string msg("Invalid vbucket state");
+            return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
+                                msg.length(), PROTOCOL_BINARY_RAW_BYTES,
+                                PROTOCOL_BINARY_RESPONSE_EINVAL,
+                                0, cookie);
+        }
+    }
 
+    std::vector<uint8_t> payload;
     std::vector<int> vbuckets = epstore->getVBuckets().getBuckets();
 
     /* Reserve a buffer that's big enough to hold all of them (we might
@@ -5752,24 +6169,29 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::getAllVBucketSequenceNumbers(const
                             cookie);
     }
 
-    for (std::vector<int>::iterator id = vbuckets.begin();
-         id != vbuckets.end(); ++id) {
-        RCPtr<VBucket> vb = getVBucket(*id);
+    for (auto id : vbuckets) {
+        RCPtr<VBucket> vb = getVBucket(id);
         if (vb) {
-            vbucket_state_t state = vb->getState();
-            if (state == vbucket_state_active ||
-                state == vbucket_state_replica ||
-                state == vbucket_state_pending) {
-                uint16_t vbid = htons(static_cast<uint16_t>(*id));
+            auto state = vb->getState();
+            bool getSeqnoForThisVb = false;
+            if (reqState) {
+                getSeqnoForThisVb = (reqState == state);
+            } else {
+                getSeqnoForThisVb = (state == vbucket_state_active) ||
+                                    (state == vbucket_state_replica) ||
+                                    (state == vbucket_state_pending);
+            }
+            if (getSeqnoForThisVb) {
+                uint16_t vbid = htons(static_cast<uint16_t>(id));
                 uint64_t highSeqno;
                 if (vb->getState() == vbucket_state_active) {
                     highSeqno = htonll(vb->getHighSeqno());
                 } else {
-                    uint64_t snapshot_start, snapshot_end;
-                    vb->getCurrentSnapshot(snapshot_start, snapshot_end);
-                    highSeqno = htonll(snapshot_end);
+                    snapshot_info_t info =
+                                        vb->checkpointManager.getSnapshotInfo();
+                    highSeqno = htonll(info.range.end);
                 }
-                size_t offset = payload.size();
+                auto offset = payload.size();
                 payload.resize(offset + sizeof(vbid) + sizeof(highSeqno));
                 memcpy(payload.data() + offset, &vbid, sizeof(vbid));
                 memcpy(payload.data() + offset + sizeof(vbid), &highSeqno,