Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / tests / ep_testsuite.cc
index d4c4548..457d36d 100644 (file)
@@ -23,6 +23,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+
 #include <sys/stat.h>
 #ifdef _MSC_VER
 #include <direct.h>
@@ -33,6 +34,7 @@
 
 #include <cstdlib>
 #include <iostream>
+#include <iomanip>
 #include <map>
 #include <set>
 #include <sstream>
@@ -46,6 +48,7 @@
 #include "ep_test_apis.h"
 #include "ep_testsuite.h"
 #include "locks.h"
+#include <libcouchstore/couch_db.h>
 #include "mock/mock_dcp.h"
 #include "mutex.h"
 
 // away ;)
 typedef void (*UNLOCK_COOKIE_T)(const void *cookie);
 
-extern "C" {
+template <typename T>
+static void checknefn(T exp, T got, const char *msg, const char *file, const int linenum) {
+    if (exp == got) {
+        std::stringstream ss;
+        ss << "Expected `" << exp << "' to not equal `" << got << "' - " << msg;
+        abort_msg(ss.str().c_str(), file, linenum);
+    }
+}
+
+#define checkne(a, b, c) checknefn(a, b, c, __FILE__, __LINE__)
 
+extern "C" {
 #define check(expr, msg) \
     static_cast<void>((expr) ? 0 : abort_msg(#expr, msg, __LINE__))
 
@@ -151,6 +164,55 @@ static void check_key_value(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
     check(memcmp(info.value[0].iov_base, val, vlen) == 0, "Data mismatch");
 }
 
+// Fetches the CAS of the specified key.
+static uint64_t get_CAS(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+                        const std::string& key) {
+    item *i = NULL;
+    checkeq(ENGINE_SUCCESS,
+            h1->get(h, NULL, &i, key.c_str(), key.size(), /*vBucket*/0),
+            "Failed to get key");
+
+    item_info info;
+    info.nvalue = 1;
+    check(h1->get_item_info(h, NULL, i, &info),
+          "Failed to get item info for key");
+    h1->release(h, NULL, i);
+
+    return info.cas;
+}
+
+static void check_observe_seqno(bool failover, uint8_t format_type, uint16_t vb_id,
+                                uint64_t vb_uuid, uint64_t last_persisted_seqno,
+                                uint64_t current_seqno, uint64_t failover_vbuuid = 0,
+                                uint64_t failover_seqno = 0) {
+    uint8_t  recv_format_type;
+    uint16_t recv_vb_id;
+    uint64_t recv_vb_uuid;
+    uint64_t recv_last_persisted_seqno;
+    uint64_t recv_current_seqno;
+    uint64_t recv_failover_vbuuid;
+    uint64_t recv_failover_seqno;
+
+    memcpy(&recv_format_type, last_body.data(), sizeof(uint8_t));
+    check(recv_format_type == format_type, "Wrong format type in result");
+    memcpy(&recv_vb_id, last_body.data() + 1, sizeof(uint16_t));
+    check(ntohs(recv_vb_id) == vb_id, "Wrong vbucket id in result");
+    memcpy(&recv_vb_uuid, last_body.data() + 3, sizeof(uint64_t));
+    check(ntohll(recv_vb_uuid) == vb_uuid, "Wrong vbucket uuid in result");
+    memcpy(&recv_last_persisted_seqno, last_body.data() + 11, sizeof(uint64_t));
+    check(ntohll(recv_last_persisted_seqno) == last_persisted_seqno,
+          "Wrong persisted seqno in result");
+    memcpy(&recv_current_seqno, last_body.data() + 19, sizeof(uint64_t));
+    check(ntohll(recv_current_seqno) == current_seqno, "Wrong current seqno in result");
+
+    if (failover) {
+        memcpy(&recv_failover_vbuuid, last_body.data() + 27, sizeof(uint64_t));
+        check(ntohll(recv_failover_vbuuid) == failover_vbuuid, "Wrong failover uuid in result");
+        memcpy(&recv_failover_seqno, last_body.data() + 35, sizeof(uint64_t));
+        check(ntohll(recv_failover_seqno) == failover_seqno, "Wrong failover seqno in result");
+    }
+}
+
 static bool test_setup(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     wait_for_warmup_complete(h, h1);
 
@@ -240,11 +302,11 @@ static enum test_result test_getl(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     /* try an incr operation followed by a delete, both of which should fail */
     uint64_t cas = 0;
     uint64_t result = 0;
-
+    i = NULL;
     check(h1->arithmetic(h, NULL, key, 2, true, false, 1, 1, 0,
-                         &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
-                         0)  == ENGINE_TMPFAIL, "Incr failed");
-
+                         &i, PROTOCOL_BINARY_RAW_BYTES, &result,
+                         0) == ENGINE_TMPFAIL, "Incr failed");
+    h1->release(h, NULL, i);
 
     check(del(h, h1, key, 0, 0) == ENGINE_TMPFAIL, "Delete failed");
 
@@ -438,6 +500,8 @@ static enum test_result test_get_miss(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 
 static enum test_result test_set(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     item *i = NULL;
+    item_info info;
+    uint64_t vb_uuid = 0, high_seqno = 0;
     const int num_sets = 5, num_keys = 4;
 
     std::string key_arr[num_keys] = { "dummy_key",
@@ -448,18 +512,45 @@ static enum test_result test_set(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 
     for (int k = 0; k < num_keys; k++) {
         for (int j = 0; j < num_sets; j++) {
-            std::string err_string("Error setting " + key_arr[k]);
+            memset(&info, 0, sizeof(info));
+            vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+            high_seqno = get_ull_stat(h, h1, "vb_0:high_seqno",
+                                      "vbucket-seqno");
+
+            std::string err_str_store("Error setting " + key_arr[k]);
             checkeq(ENGINE_SUCCESS,
                     store(h, h1, NULL, OPERATION_SET, key_arr[k].c_str(),
                           "somevalue", &i),
-                    err_string.c_str());
+                    err_str_store.c_str());
             h1->release(h, NULL, i);
+
+            std::string err_str_get_item_info("Error getting " + key_arr[k]);
+            checkeq(true, get_item_info(h, h1, &info, key_arr[k].c_str()),
+                  err_str_get_item_info.c_str());
+
+            std::string err_str_vb_uuid("Expected valid vbucket uuid for " +
+                                        key_arr[k]);
+            checkeq(vb_uuid, info.vbucket_uuid, err_str_vb_uuid.c_str());
+
+            std::string err_str_seqno("Expected valid sequence number for " +
+                                        key_arr[k]);
+            checkeq(high_seqno + 1, info.seqno, err_str_seqno.c_str());
         }
     }
 
     wait_for_flusher_to_settle(h, h1);
-    checkeq(num_keys, get_int_stat(h, h1, "ep_total_persisted"),
-            "Expected ep_total_persisted equals 4");
+    std::stringstream error1, error2;
+    error1 << "Expected ep_total_persisted >= num_keys (" << num_keys << ")";
+    error2 << "Expected ep_total_persisted <= num_sets*num_keys ("
+           << num_sets*num_keys << ")";
+
+    // The flusher could of ran > 1 times. We can only assert
+    // that we persisted between num_keys and upto num_keys*num_sets
+    check(get_int_stat(h, h1, "ep_total_persisted") >= num_keys,
+        error1.str().c_str());
+    check(get_int_stat(h, h1, "ep_total_persisted") <= num_sets*num_keys,
+        error2.str().c_str());
+
     return SUCCESS;
 }
 
@@ -492,13 +583,15 @@ extern "C" {
 
     static void conc_incr_thread(void *arg) {
         struct handle_pair *hp = static_cast<handle_pair *>(arg);
-        uint64_t cas = 0, result = 0;
+        uint64_t result = 0;
 
         for (int i = 0; i < 10; i++) {
+            item *it = NULL;
             check(hp->h1->arithmetic(hp->h, NULL, "key", 3, true, true, 1, 1, 0,
-                                     &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
+                                     &it, PROTOCOL_BINARY_RAW_BYTES, &result,
                                      0) == ENGINE_SUCCESS,
                                      "Failed arithmetic operation");
+            hp->h1->release(hp->h, NULL, it);
         }
     }
 }
@@ -558,6 +651,78 @@ static enum test_result test_conc_incr(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     return SUCCESS;
 }
 
+static enum test_result test_conc_incr_new_itm (ENGINE_HANDLE *h,
+                                                ENGINE_HANDLE_V1 *h1) {
+    const int n_threads = 10;
+    cb_thread_t threads[n_threads];
+    struct handle_pair hp = {h, h1};
+
+    for (int i = 0; i < n_threads; i++) {
+        int r = cb_create_thread(&threads[i], conc_incr_thread, &hp, 0);
+        cb_assert(r == 0);
+    }
+
+    for (int i = 0; i < n_threads; i++) {
+        int r = cb_join_thread(threads[i]);
+        cb_assert(r == 0);
+    }
+
+    check_key_value(h, h1, "key", "100", 3);
+
+    return SUCCESS;
+}
+
+struct multi_set_args {
+    ENGINE_HANDLE *h;
+    ENGINE_HANDLE_V1 *h1;
+    std::string prefix;
+    int count;
+};
+
+extern "C" {
+    static void multi_set_thread(void *arg) {
+        struct multi_set_args *msa = static_cast<multi_set_args *>(arg);
+
+        for (int i = 0; i < msa->count; i++) {
+            item *it = NULL;
+            std::stringstream s;
+            s << msa->prefix << i;
+            std::string key(s.str());
+            check(ENGINE_SUCCESS == store(msa->h, msa->h1, NULL, OPERATION_SET,
+                          key.c_str(), "somevalue", &it), "Set failure!");
+            msa->h1->release(msa->h, NULL, it);
+        }
+    }
+}
+
+static enum test_result test_multi_set(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+
+    cb_thread_t thread1, thread2;
+    struct multi_set_args msa1, msa2;
+    msa1.h = h;
+    msa1.h1 = h1;
+    msa1.prefix = "ONE_";
+    msa1.count = 50000;
+    cb_assert(cb_create_thread(&thread1, multi_set_thread, &msa1, 0) == 0);
+
+    msa2.h = h;
+    msa2.h1 = h1;
+    msa2.prefix = "TWO_";
+    msa2.count = 50000;
+    cb_assert(cb_create_thread(&thread2, multi_set_thread, &msa2, 0) == 0);
+
+    cb_assert(cb_join_thread(thread1) == 0);
+    cb_assert(cb_join_thread(thread2) == 0);
+
+    wait_for_flusher_to_settle(h, h1);
+
+    check(get_int_stat(h, h1, "curr_items") == 100000,
+          "Mismatch in number of items inserted");
+    check(get_int_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno") == 100000,
+          "Unexpected high sequence number");
+
+    return SUCCESS;
+}
 
 static enum test_result test_set_get_hit(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     item *i = NULL;
@@ -656,9 +821,23 @@ static enum test_result test_cas(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 
 static enum test_result test_add(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     item *i = NULL;
+    item_info info;
+    uint64_t vb_uuid = 0;
+    uint32_t high_seqno = 0;
+
+    memset(&info, 0, sizeof(info));
+
+    vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    high_seqno = get_ull_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
+
     check(store(h, h1, NULL, OPERATION_ADD,"key", "somevalue", &i) == ENGINE_SUCCESS,
           "Failed to add value.");
     h1->release(h, NULL, i);
+
+    check(get_item_info(h, h1, &info, "key"), "Error getting item info");
+    check(vb_uuid == info.vbucket_uuid, "Expected valid vbucket uuid");
+    check(high_seqno + 1 == info.seqno, "Expected valid sequence number");
+
     check(store(h, h1, NULL, OPERATION_ADD,"key", "somevalue", &i) == ENGINE_NOT_STORED,
           "Failed to fail to re-add value.");
     h1->release(h, NULL, i);
@@ -671,6 +850,7 @@ static enum test_result test_add(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 
     check(store(h, h1, NULL, OPERATION_ADD,"key", "newvalue", &i) == ENGINE_SUCCESS,
           "Failed to add value again.");
+
     h1->release(h, NULL, i);
     check_key_value(h, h1, "key", "newvalue", 8);
     return SUCCESS;
@@ -701,15 +881,32 @@ static enum test_result test_add_add_with_cas(ENGINE_HANDLE *h, ENGINE_HANDLE_V1
 
 static enum test_result test_replace(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     item *i = NULL;
+    item_info info;
+    uint64_t vb_uuid = 0;
+    uint32_t high_seqno = 0;
+
+    memset(&info, 0, sizeof(info));
+
     check(store(h, h1, NULL, OPERATION_REPLACE,"key", "somevalue", &i) != ENGINE_SUCCESS,
           "Failed to fail to replace non-existing value.");
+
     h1->release(h, NULL, i);
     check(store(h, h1, NULL, OPERATION_SET,"key", "somevalue", &i) == ENGINE_SUCCESS,
           "Failed to set value.");
     h1->release(h, NULL, i);
+
+    vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    high_seqno = get_ull_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
+
     check(store(h, h1, NULL, OPERATION_REPLACE,"key", "somevalue", &i) == ENGINE_SUCCESS,
           "Failed to replace existing value.");
     h1->release(h, NULL, i);
+
+    check(get_item_info(h, h1, &info, "key"), "Error getting item info");
+
+    check(vb_uuid == info.vbucket_uuid, "Expected valid vbucket uuid");
+    check(high_seqno + 1 == info.seqno, "Expected valid sequence number");
+
     check_key_value(h, h1, "key", "somevalue", 9);
     return SUCCESS;
 }
@@ -743,59 +940,114 @@ static enum test_result test_replace_with_eviction(ENGINE_HANDLE *h,
 }
 
 static enum test_result test_incr_miss(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
-    uint64_t cas = 0, result = 0;
+    uint64_t result = 0;
+    item *i = NULL;
     h1->arithmetic(h, NULL, "key", 3, true, false, 1, 0, 0,
-                   &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
+                   &i, PROTOCOL_BINARY_RAW_BYTES, &result,
                    0);
+    h1->release(h, NULL, i);
     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected to not find key");
     return SUCCESS;
 }
 
-static enum test_result test_incr_default(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
-    uint64_t cas = 0, result = 0;
+static enum test_result test_incr_full_eviction(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+    uint64_t result = 0;
+    item *i = NULL;
     check(h1->arithmetic(h, NULL, "key", 3, true, true, 1, 1, 0,
-                         &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
+                   &i, PROTOCOL_BINARY_RAW_BYTES, &result,
+                   0) == ENGINE_SUCCESS,
+          "Failed arithmetic operation");
+    h1->release(h, NULL, i);
+    check(result == 1, "Failed to set initial value in arithmetic operation");
+    return SUCCESS;
+}
+
+static enum test_result test_incr_default(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+    const void *cookie = testHarness.create_cookie();
+    testHarness.set_datatype_support(cookie, false);
+
+    uint64_t result = 0;
+    item *i = NULL;
+    check(h1->arithmetic(h, cookie, "key", 3, true, true, 1, 1, 0,
+                         &i, PROTOCOL_BINARY_RAW_BYTES, &result,
                          0) == ENGINE_SUCCESS,
           "Failed first arith");
+    h1->release(h, cookie, i);
     check(result == 1, "Failed result verification.");
 
-    check(h1->arithmetic(h, NULL, "key", 3, true, false, 1, 1, 0,
-                         &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
+    // Check datatype of counter
+    check(h1->get(h, cookie, &i, "key", 3, 0) == ENGINE_SUCCESS,
+            "Unable to get stored item");
+    item_info info;
+    info.nvalue = 1;
+    h1->get_item_info(h, cookie, i, &info);
+    h1->release(h, cookie, i);
+    check(info.datatype == PROTOCOL_BINARY_DATATYPE_JSON, "Invalid datatype");
+
+    check(h1->arithmetic(h, cookie, "key", 3, true, false, 1, 1, 0,
+                         &i, PROTOCOL_BINARY_RAW_BYTES, &result,
                          0) == ENGINE_SUCCESS,
           "Failed second arith.");
+    h1->release(h, cookie, i);
     check(result == 2, "Failed second result verification.");
 
-    check(h1->arithmetic(h, NULL, "key", 3, true, true, 1, 1, 0,
-                         &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
+    check(h1->arithmetic(h, cookie, "key", 3, true, true, 1, 1, 0,
+                         &i, PROTOCOL_BINARY_RAW_BYTES, &result,
                          0) == ENGINE_SUCCESS,
           "Failed third arith.");
+    h1->release(h, cookie, i);
     check(result == 3, "Failed third result verification.");
 
     check_key_value(h, h1, "key", "3", 1);
+
+    // Check datatype of counter
+    check(h1->get(h, cookie, &i, "key", 3, 0) == ENGINE_SUCCESS,
+            "Unable to get stored item");
+    info.nvalue = 1;
+    h1->get_item_info(h, cookie, i, &info);
+    h1->release(h, cookie, i);
+    check(info.datatype == PROTOCOL_BINARY_DATATYPE_JSON, "Invalid datatype");
+
+    testHarness.destroy_cookie(cookie);
     return SUCCESS;
 }
 
 static enum test_result test_append(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     item *i = NULL;
+    item_info info;
+    uint64_t vb_uuid = 0;
+    uint32_t high_seqno = 0;
+
+    memset(&info, 0, sizeof(info));
 
     // MB-11332: append on non-existing key should return NOT_STORED
     check(storeCasVb11(h, h1, NULL, OPERATION_APPEND, "key",
                        "foo\r\n", 5, 82758, &i, 0, 0)
           == ENGINE_NOT_STORED,
           "MB-11332: Failed append.");
+    h1->release(h, NULL, i);
 
     check(storeCasVb11(h, h1, NULL, OPERATION_SET, "key",
                        "\r\n", 2, 82758, &i, 0, 0)
           == ENGINE_SUCCESS,
           "Failed set.");
+
     h1->release(h, NULL, i);
 
+    vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    high_seqno = get_ull_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
+
     check(storeCasVb11(h, h1, NULL, OPERATION_APPEND, "key",
                        "foo\r\n", 5, 82758, &i, 0, 0)
           == ENGINE_SUCCESS,
           "Failed append.");
     h1->release(h, NULL, i);
 
+    check(get_item_info(h, h1, &info, "key"), "Error in getting item info");
+
+    check(vb_uuid == info.vbucket_uuid, "Expected valid vbucket uuid");
+    check(high_seqno + 1 == info.seqno, "Expected valid sequence number");
+
     check_key_value(h, h1, "key", "\r\nfoo\r\n", 7);
 
     char binaryData1[] = "abcdefg\0gfedcba\r\n";
@@ -833,13 +1085,18 @@ static enum test_result test_append(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 
 static enum test_result test_prepend(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     item *i = NULL;
+    item_info info;
+    uint64_t vb_uuid = 0;
+    uint32_t high_seqno = 0;
+
+    memset(&info, 0, sizeof(info));
 
     // MB-11332: prepend on non-existing key should return NOT_STORED
     check(storeCasVb11(h, h1, NULL, OPERATION_PREPEND, "key",
                        "foo\r\n", 5, 82758, &i, 0, 0)
           == ENGINE_NOT_STORED,
           "MB-11332: Failed prepend.");
-
+    h1->release(h, NULL, i);
 
     check(storeCasVb11(h, h1, NULL, OPERATION_SET, "key",
                        "\r\n", 2, 82758, &i, 0, 0)
@@ -847,12 +1104,19 @@ static enum test_result test_prepend(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
           "Failed set.");
     h1->release(h, NULL, i);
 
+    vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    high_seqno = get_ull_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
+
     check(storeCasVb11(h, h1, NULL, OPERATION_PREPEND, "key",
                        "foo\r\n", 5, 82758, &i, 0, 0)
           == ENGINE_SUCCESS,
-          "Failed append.");
+          "Failed prepend.");
     h1->release(h, NULL, i);
 
+    check(get_item_info(h, h1, &info, "key"), "Error getting item info");
+    check(vb_uuid == info.vbucket_uuid, "Expected valid vbucket uuid");
+    check(high_seqno + 1 == info.seqno, "Expected valid sequence number");
+
     check_key_value(h, h1, "key", "foo\r\n\r\n", 7);
 
     char binaryData1[] = "abcdefg\0gfedcba\r\n";
@@ -1128,32 +1392,54 @@ static enum test_result test_append_prepend_to_json(ENGINE_HANDLE *h,
 }
 
 static enum test_result test_incr(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
-    uint64_t cas = 0, result = 0;
+    const void *cookie = testHarness.create_cookie();
+    testHarness.set_datatype_support(cookie, true);
+
+    uint64_t result = 0;
     item *i = NULL;
-    check(store(h, h1, NULL, OPERATION_ADD,"key", "1", &i) == ENGINE_SUCCESS,
+    const char *key = "key";
+    const char *val = "1";
+    check(store(h, h1, NULL, OPERATION_ADD,key, val, &i,
+                0, 0, 3600,
+                checkUTF8JSON((const unsigned char *)val, 1))
+            == ENGINE_SUCCESS,
           "Failed to add value.");
     h1->release(h, NULL, i);
 
-    check(h1->arithmetic(h, NULL, "key", 3, true, false, 1, 1, 0,
-                         &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
+    check(h1->arithmetic(h, NULL, key, 3, true, false, 1, 1, 0,
+                         &i, PROTOCOL_BINARY_RAW_BYTES, &result,
                          0) == ENGINE_SUCCESS,
           "Failed to incr value.");
+    h1->release(h, NULL, i);
+
+    check_key_value(h, h1, key, "2", 1);
+
+    // Check datatype of counter
+    check(h1->get(h, cookie, &i, key, 3, 0) == ENGINE_SUCCESS,
+            "Unable to get stored item");
+    item_info info;
+    info.nvalue = 1;
+    h1->get_item_info(h, cookie, i, &info);
+    h1->release(h, cookie, i);
+    check(info.datatype == PROTOCOL_BINARY_DATATYPE_JSON, "Invalid datatype");
+
+    testHarness.destroy_cookie(cookie);
 
-    check_key_value(h, h1, "key", "2", 1);
     return SUCCESS;
 }
 
 static enum test_result test_bug2799(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
-    uint64_t cas = 0, result = 0;
+    uint64_t result = 0;
     item *i = NULL;
     check(store(h, h1, NULL, OPERATION_ADD, "key", "1", &i) == ENGINE_SUCCESS,
           "Failed to add value.");
     h1->release(h, NULL, i);
 
     check(h1->arithmetic(h, NULL, "key", 3, true, false, 1, 1, 0,
-                         &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
+                         &i, PROTOCOL_BINARY_RAW_BYTES, &result,
                          0) == ENGINE_SUCCESS,
           "Failed to incr value.");
+    h1->release(h, NULL, i);
 
     check_key_value(h, h1, "key", "2", 1);
 
@@ -1165,14 +1451,27 @@ static enum test_result test_bug2799(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 
 static enum test_result test_flush(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     item *i = NULL;
+
+    if (get_int_stat(h, h1, "ep_flushall_enabled") == 0) {
+        check(set_param(h, h1, protocol_binary_engine_param_flush,
+                    "flushall_enabled", "true"),
+                "Set flushall_enabled should have worked");
+    }
+    check(get_int_stat(h, h1, "ep_flushall_enabled") == 1,
+            "flushall wasn't enabled");
+
     // First try to delete something we know to not be there.
     check(del(h, h1, "key", 0, 0) == ENGINE_KEY_ENOENT, "Failed to fail initial delete.");
     check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
           "Failed set.");
     h1->release(h, NULL, i);
     check_key_value(h, h1, "key", "somevalue", 9);
+
+    set_degraded_mode(h, h1, NULL, true);
     check(h1->flush(h, NULL, 0) == ENGINE_SUCCESS,
           "Failed to flush");
+    set_degraded_mode(h, h1, NULL, false);
+
     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
 
     check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
@@ -1183,6 +1482,84 @@ static enum test_result test_flush(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     return SUCCESS;
 }
 
+/**
+ * The following struct: flush_args and function run_flush(),
+ * will be used by the test that follows: test_multiple_flush
+ */
+struct flush_args {
+    ENGINE_HANDLE *h;
+    ENGINE_HANDLE_V1 *h1;
+    ENGINE_ERROR_CODE expect;
+    int when;
+};
+
+extern "C" {
+    static void run_flush_all(void *arguments) {
+        const void *cookie = testHarness.create_cookie();
+        testHarness.set_ewouldblock_handling(cookie, true);
+        struct flush_args *args = (struct flush_args *)arguments;
+
+        check((args->h1)->flush(args->h, cookie, args->when) == args->expect,
+                "Return code is not what is expected");
+
+        testHarness.destroy_cookie(cookie);
+    }
+}
+
+static enum test_result test_multiple_flush(ENGINE_HANDLE *h,
+                                            ENGINE_HANDLE_V1 *h1) {
+
+    if (get_int_stat(h, h1, "ep_flushall_enabled") == 0) {
+        check(set_param(h, h1, protocol_binary_engine_param_flush,
+                    "flushall_enabled", "true"),
+                "Set flushall_enabled should have worked");
+    }
+    check(get_int_stat(h, h1, "ep_flushall_enabled") == 1,
+            "flushall wasn't enabled");
+
+    item *i = NULL;
+    check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
+          "Failed set.");
+    h1->release(h, NULL, i);
+    wait_for_flusher_to_settle(h, h1);
+    check(get_int_stat(h, h1, "curr_items") == 1,
+          "Expected curr_items equals 1");
+
+    set_degraded_mode(h, h1, NULL, true);
+    cb_thread_t t1, t2;
+    struct flush_args args1,args2;
+    args1.h = h;
+    args1.h1 = h1;
+    args1.expect = ENGINE_SUCCESS;
+    args1.when = 2;
+    check(cb_create_thread(&t1, run_flush_all, &args1, 0) == 0,
+            "cb_create_thread failed!");
+
+    sleep(1);
+
+    args2.h = h;
+    args2.h1 = h1;
+    args2.expect = ENGINE_TMPFAIL;
+    args2.when = 0;
+    check(cb_create_thread(&t2, run_flush_all, &args2, 0) == 0,
+            "cb_create_thread failed!");
+
+    cb_assert(cb_join_thread(t1) == 0);
+    cb_assert(cb_join_thread(t2) == 0);
+
+    set_degraded_mode(h, h1, NULL, false);
+
+    testHarness.reload_engine(&h, &h1,
+                              testHarness.engine_path,
+                              testHarness.get_current_testcase()->cfg,
+                              true, false);
+    wait_for_warmup_complete(h, h1);
+    check(get_int_stat(h, h1, "curr_items") == 0,
+          "Expected curr_items equals 0");
+
+    return SUCCESS;
+}
+
 static enum test_result test_flush_disabled(ENGINE_HANDLE *h,
                                             ENGINE_HANDLE_V1 *h1) {
     item *i = NULL;
@@ -1212,7 +1589,10 @@ static enum test_result test_flush_disabled(ENGINE_HANDLE *h,
                               true, false);
     wait_for_warmup_complete(h, h1);
 
+
+    set_degraded_mode(h, h1, NULL, true);
     check(h1->flush(h, NULL, 0) == ENGINE_SUCCESS, "Flush should be enabled");
+    set_degraded_mode(h, h1, NULL, false);
 
     //expect missing key
     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
@@ -1222,7 +1602,6 @@ static enum test_result test_flush_disabled(ENGINE_HANDLE *h,
 
 static enum test_result test_flush_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     item *i = NULL;
-    int mem_used = get_int_stat(h, h1, "mem_used");
     int overhead = get_int_stat(h, h1, "ep_overhead");
     int cacheSize = get_int_stat(h, h1, "ep_total_cache_size");
     int nonResident = get_int_stat(h, h1, "ep_num_non_resident");
@@ -1243,27 +1622,21 @@ static enum test_result test_flush_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)
     check_key_value(h, h1, "key", "somevalue", 9);
     check_key_value(h, h1, "key2", "somevalue", 9);
 
-    int mem_used2 = get_int_stat(h, h1, "mem_used");
     int overhead2 = get_int_stat(h, h1, "ep_overhead");
     int cacheSize2 = get_int_stat(h, h1, "ep_total_cache_size");
 
-    cb_assert(mem_used2 > mem_used);
-    // "mem_used2 - overhead2" (i.e., ep_kv_size) should be greater than the hashtable cache size
-    // due to the checkpoint overhead
-    cb_assert(mem_used2 - overhead2 > cacheSize2);
-
+    set_degraded_mode(h, h1, NULL, true);
     check(h1->flush(h, NULL, 0) == ENGINE_SUCCESS, "Failed to flush");
+    set_degraded_mode(h, h1, NULL, false);
     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key2"), "Expected missing key");
 
     wait_for_flusher_to_settle(h, h1);
 
-    mem_used2 = get_int_stat(h, h1, "mem_used");
     overhead2 = get_int_stat(h, h1, "ep_overhead");
     cacheSize2 = get_int_stat(h, h1, "ep_total_cache_size");
     int nonResident2 = get_int_stat(h, h1, "ep_num_non_resident");
 
-    cb_assert(mem_used2 == mem_used);
     cb_assert(overhead2 == overhead);
     cb_assert(nonResident2 == nonResident);
     cb_assert(cacheSize2 == cacheSize);
@@ -1288,7 +1661,9 @@ static enum test_result test_flush_multiv(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1
     check_key_value(h, h1, "key", "somevalue", 9);
     check_key_value(h, h1, "key2", "somevalue", 9, 2);
 
+    set_degraded_mode(h, h1, NULL, true);
     check(h1->flush(h, NULL, 0) == ENGINE_SUCCESS, "Failed to flush");
+    set_degraded_mode(h, h1, NULL, false);
 
     vals.clear();
     check(h1->get_stats(h, NULL, NULL, 0, add_stats) == ENGINE_SUCCESS,
@@ -1397,8 +1772,10 @@ static enum test_result test_flush_restart(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h
     check_key_value(h, h1, "key", "somevalue", 9);
 
     // Flush
+    set_degraded_mode(h, h1, NULL, true);
     check(h1->flush(h, NULL, 0) == ENGINE_SUCCESS,
           "Failed to flush");
+    set_degraded_mode(h, h1, NULL, false);
 
     check(store(h, h1, NULL, OPERATION_SET, "key2", "somevalue", &i) == ENGINE_SUCCESS,
           "Failed post-flush set.");
@@ -1444,8 +1821,10 @@ static enum test_result test_flush_multiv_restart(ENGINE_HANDLE *h, ENGINE_HANDL
     check_key_value(h, h1, "key", "somevalue", 9);
 
     // Flush
+    set_degraded_mode(h, h1, NULL, true);
     check(h1->flush(h, NULL, 0) == ENGINE_SUCCESS,
           "Failed to flush");
+    set_degraded_mode(h, h1, NULL, false);
 
     // Restart again, ensure written to disk.
     testHarness.reload_engine(&h, &h1,
@@ -1472,10 +1851,20 @@ static enum test_result test_delete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     check_key_value(h, h1, "key", "somevalue", 9);
 
     uint64_t cas = 0;
-    check(h1->remove(h, NULL, "key", 3, &cas, 0) == ENGINE_SUCCESS,
+    uint64_t vb_uuid = 0;
+    mutation_descr_t mut_info;
+    uint32_t high_seqno = 0;
+
+    memset(&mut_info, 0, sizeof(mut_info));
+
+    vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    high_seqno = get_ull_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
+    check(h1->remove(h, NULL, "key", 3, &cas, 0, &mut_info) == ENGINE_SUCCESS,
           "Failed remove with value.");
     check(orig_cas + 1 == cas, "Cas mismatch on delete");
     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
+    check(vb_uuid == mut_info.vbucket_uuid, "Expected valid vbucket uuid");
+    check(high_seqno + 1 == mut_info.seqno, "Expected valid sequence number");
 
     // Can I time travel to an expired object and delete it?
     checkeq(ENGINE_SUCCESS, store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i),
@@ -1563,6 +1952,8 @@ static enum test_result test_bug7023(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
             item *i;
             check(store(h, h1, NULL, OPERATION_SET, it->c_str(), it->c_str(), &i)
                   == ENGINE_SUCCESS, "Failed to store a value");
+            h1->release(h, NULL, i);
+
         }
     }
     wait_for_flusher_to_settle(h, h1);
@@ -1837,12 +2228,14 @@ static enum test_result test_vb_get_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *
 }
 
 static enum test_result test_wrong_vb_incr(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
-    uint64_t cas, result;
+    uint64_t result;
+    item *i = NULL;
     int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
     check(h1->arithmetic(h, NULL, "key", 3, true, false, 1, 1, 0,
-                         &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
+                         &i, PROTOCOL_BINARY_RAW_BYTES, &result,
                          1) == ENGINE_NOT_MY_VBUCKET,
           "Expected not my vbucket.");
+    h1->release(h, NULL, i);
     wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
     return SUCCESS;
 }
@@ -1850,24 +2243,28 @@ static enum test_result test_wrong_vb_incr(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h
 static enum test_result test_vb_incr_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     const void *cookie = testHarness.create_cookie();
     testHarness.set_ewouldblock_handling(cookie, false);
-    uint64_t cas, result;
+    uint64_t result;
+    item *i = NULL;
     check(set_vbucket_state(h, h1, 1, vbucket_state_pending), "Failed to set vbucket state.");
     check(h1->arithmetic(h, cookie, "key", 3, true, false, 1, 1, 0,
-                         &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
+                         &i, PROTOCOL_BINARY_RAW_BYTES, &result,
                          1) == ENGINE_EWOULDBLOCK,
-          "Expected woodblock.");
+          "Expected wouldblock.");
+    h1->release(h, NULL, i);
     testHarness.destroy_cookie(cookie);
     return SUCCESS;
 }
 
 static enum test_result test_vb_incr_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
-    uint64_t cas, result;
+    uint64_t result;
+    item *i = NULL;
     check(set_vbucket_state(h, h1, 1, vbucket_state_replica), "Failed to set vbucket state.");
     int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
     check(h1->arithmetic(h, NULL, "key", 3, true, false, 1, 1, 0,
-                         &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
+                         &i, PROTOCOL_BINARY_RAW_BYTES, &result,
                          1) == ENGINE_NOT_MY_VBUCKET,
           "Expected not my bucket.");
+    h1->release(h, NULL, i);
     wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
     return SUCCESS;
 }
@@ -2473,6 +2870,7 @@ static enum test_result test_mb5215(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
                               testHarness.engine_path,
                               testHarness.get_current_testcase()->cfg,
                               true, false);
+
     wait_for_warmup_complete(h, h1);
 
     //verify persisted expiration time
@@ -2538,8 +2936,21 @@ static enum test_result test_whitespace_db(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+static enum test_result test_memory_tracking(ENGINE_HANDLE *h,
+                                             ENGINE_HANDLE_V1 *h1) {
+    // Need memory tracker to be able to check our memory usage.
+    std::string tracker = get_str_stat(h, h1, "ep_mem_tracker_enabled");
+    if (tracker == "true") {
+        return SUCCESS;
+    } else {
+        std::cerr << "Memory tracker not enabled" << std::endl;
+        return FAIL;
+    }
+}
+
 static enum test_result test_memory_limit(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     set_param(h, h1, protocol_binary_engine_param_flush, "mutation_mem_threshold", "95");
+    wait_for_stat_change(h, h1,"ep_db_data_size", 0);
     int used = get_int_stat(h, h1, "mem_used");
     double mem_threshold =
         static_cast<double>(get_int_stat(h, h1, "ep_mutation_mem_threshold")) / 100;
@@ -2548,9 +2959,13 @@ static enum test_result test_memory_limit(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1
           get_int_stat(h, h1, "ep_tmp_oom_errors") == 0, "Expected no OOM errors.");
     cb_assert(used < max);
 
-    char data[8192];
-    memset(data, 'x', sizeof(data));
-    size_t vlen = max - used - 192;
+    char *data = new char[2 * 1024 * 1024];
+    cb_assert(data);
+    memset(data, 'x', 2 * 1024 * 1024);
+
+    // Calculate the length of document to set - we want to ensure we can only
+    // store one document before TEMP_OOM is hit.
+    size_t vlen = (max - used) * 0.95;
     data[vlen] = 0x00;
 
     item *i = NULL;
@@ -2559,19 +2974,26 @@ static enum test_result test_memory_limit(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1
           "store failure");
     check_key_value(h, h1, "key", data, vlen);
     h1->release(h, NULL, i);
+    i = NULL;
 
     // There should be no room for another.
     ENGINE_ERROR_CODE second = store(h, h1, NULL, OPERATION_SET, "key2", data, &i);
     check(second == ENGINE_ENOMEM || second == ENGINE_TMPFAIL,
           "should have failed second set");
-    h1->release(h, NULL, i);
+    if (i) {
+        h1->release(h, NULL, i);
+        i = NULL;
+    }
     check(get_int_stat(h, h1, "ep_oom_errors") == 1 ||
           get_int_stat(h, h1, "ep_tmp_oom_errors") == 1, "Expected an OOM error.");
 
     ENGINE_ERROR_CODE overwrite = store(h, h1, NULL, OPERATION_SET, "key", data, &i);
     check(overwrite == ENGINE_ENOMEM || overwrite == ENGINE_TMPFAIL,
           "should have failed second override");
-    h1->release(h, NULL, i);
+    if (i) {
+        h1->release(h, NULL, i);
+        i = NULL;
+    }
     check(get_int_stat(h, h1, "ep_oom_errors") == 2 ||
           get_int_stat(h, h1, "ep_tmp_oom_errors") == 2, "Expected another OOM error.");
     check_key_value(h, h1, "key", data, vlen);
@@ -2587,6 +3009,7 @@ static enum test_result test_memory_limit(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1
           "should have succeded on the last set");
     check_key_value(h, h1, "key2", "somevalue2", 10);
     h1->release(h, NULL, i);
+    delete []data;
     return SUCCESS;
 }
 
@@ -2850,7 +3273,6 @@ static enum test_result vbucket_destroy(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
 static enum test_result test_vbucket_destroy_stats(ENGINE_HANDLE *h,
                                                    ENGINE_HANDLE_V1 *h1) {
 
-    int mem_used = get_int_stat(h, h1, "mem_used");
     int cacheSize = get_int_stat(h, h1, "ep_total_cache_size");
     int overhead = get_int_stat(h, h1, "ep_overhead");
     int nonResident = get_int_stat(h, h1, "ep_num_non_resident");
@@ -2889,7 +3311,6 @@ static enum test_result test_vbucket_destroy_stats(ENGINE_HANDLE *h,
 
     wait_for_stat_change(h, h1, "ep_vbucket_del", vbucketDel);
 
-    wait_for_stat_to_be(h, h1, "mem_used", mem_used);
     wait_for_stat_to_be(h, h1, "ep_total_cache_size", cacheSize);
     wait_for_stat_to_be(h, h1, "ep_overhead", overhead);
     wait_for_stat_to_be(h, h1, "ep_num_non_resident", nonResident);
@@ -3247,15 +3668,95 @@ static enum test_result test_dcp_consumer_open(ENGINE_HANDLE *h, ENGINE_HANDLE_V
     return SUCCESS;
 }
 
-static enum test_result test_dcp_producer_open(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+static enum test_result test_dcp_consumer_flow_control_buf_sz(ENGINE_HANDLE *h,
+                                                        ENGINE_HANDLE_V1 *h1) {
     const void *cookie1 = testHarness.create_cookie();
     uint32_t opaque = 0;
     uint32_t seqno = 0;
-    uint32_t flags = DCP_OPEN_PRODUCER;
-    const char *name  = "unittest";
+    uint32_t flags = 0;
+    const char *name = "unittest";
     uint16_t nname = strlen(name);
+    char stats_buffer[50];
 
-    check(h1->dcp.open(h, cookie1, opaque, seqno, flags, (void*)name, nname)
+    snprintf(stats_buffer, sizeof(stats_buffer),
+             "eq_dcpq:%s:max_buffer_bytes", name);
+
+    /* Check the min limit */
+    set_param(h, h1, protocol_binary_engine_param_flush, "max_size",
+              "500000000");
+    check(get_int_stat(h, h1, "ep_max_size") == 500000000,
+          "Incorrect new size.");
+
+    check(h1->dcp.open(h, cookie1, opaque, seqno, flags, (void*)name, nname)
+          == ENGINE_SUCCESS,
+          "Failed dcp consumer open connection.");
+
+    check((uint32_t)get_int_stat(h, h1, stats_buffer, "dcp")
+          == 10485760, "Flow Control Buffer Size not equal to min");
+    testHarness.destroy_cookie(cookie1);
+
+    /* Check the size as percentage of the bucket memory */
+    const void *cookie2 = testHarness.create_cookie();
+    set_param(h, h1, protocol_binary_engine_param_flush, "max_size",
+              "2000000000");
+    check(get_int_stat(h, h1, "ep_max_size") == 2000000000,
+          "Incorrect new size.");
+
+    check(h1->dcp.open(h, cookie2, opaque, seqno, flags, (void*)name, nname)
+          == ENGINE_SUCCESS,
+          "Failed dcp consumer open connection.");
+
+    check((uint32_t)get_int_stat(h, h1, stats_buffer, "dcp")
+          == 20000000, "Flow Control Buffer Size not equal to 1% of mem size");
+    testHarness.destroy_cookie(cookie2);
+
+    /* Check the case when mem used by flow control bufs hit the threshold */
+    /* Create around 10 more connections to use more than 10% of the total
+       memory */
+    for (int count = 0; count < 10; count++) {
+        const void *cookie = testHarness.create_cookie();
+        check(h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname)
+              == ENGINE_SUCCESS,
+              "Failed dcp consumer open connection.");
+        testHarness.destroy_cookie(cookie);
+    }
+    /* By now mem used by flow control bufs would have crossed the threshold */
+    const void *cookie3 = testHarness.create_cookie();
+    check(h1->dcp.open(h, cookie3, opaque, seqno, flags, (void*)name, nname)
+          == ENGINE_SUCCESS,
+          "Failed dcp consumer open connection.");
+
+    check((uint32_t)get_int_stat(h, h1, stats_buffer, "dcp") == 10485760,
+          "Flow Control Buffer Size not equal to min after threshold is hit");
+    testHarness.destroy_cookie(cookie3);
+
+    /* Check the max limit */
+    const void *cookie4 = testHarness.create_cookie();
+    set_param(h, h1, protocol_binary_engine_param_flush, "max_size",
+              "7000000000");
+    check(get_ull_stat(h, h1, "ep_max_size") == 7000000000,
+          "Incorrect new size.");
+
+    check(h1->dcp.open(h, cookie4, opaque, seqno, flags, (void*)name, nname)
+          == ENGINE_SUCCESS,
+          "Failed dcp consumer open connection.");
+
+    check((uint32_t)get_int_stat(h, h1, stats_buffer, "dcp")
+          == 52428800, "Flow Control Buffer Size beyond max");
+    testHarness.destroy_cookie(cookie4);
+
+    return SUCCESS;
+}
+
+static enum test_result test_dcp_producer_open(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+    const void *cookie1 = testHarness.create_cookie();
+    uint32_t opaque = 0;
+    uint32_t seqno = 0;
+    uint32_t flags = DCP_OPEN_PRODUCER;
+    const char *name  = "unittest";
+    uint16_t nname = strlen(name);
+
+    check(h1->dcp.open(h, cookie1, opaque, seqno, flags, (void*)name, nname)
           == ENGINE_SUCCESS,
           "Failed dcp producer open connection.");
 
@@ -3383,9 +3884,12 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
                        uint64_t start, uint64_t end, uint64_t vb_uuid,
                        uint64_t snap_start_seqno, uint64_t snap_end_seqno,
                        int exp_mutations, int exp_deletions, int exp_markers,
-                       int extra_takeover_ops, int exp_nru_value,
+                       int extra_takeover_ops,
                        bool exp_disk_snapshot = false,
+                       bool time_sync_enabled = false,
+                       uint8_t exp_conflict_res = 0,
                        bool skipEstimateCheck = false,
+                       uint64_t *total_bytes = NULL,
                        uint64_t flow_control_buf_size = 1024,
                        bool disable_ack = false) {
     uint32_t opaque = 1;
@@ -3416,6 +3920,10 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
         checkeq(0, status.compare("disabled"), "Flow control enabled!");
     }
 
+    check(h1->dcp.control(h, cookie, ++opaque, "enable_ext_metadata", 19,
+                          "true", 4) == ENGINE_SUCCESS,
+          "Failed to enable xdcr extras");
+
     uint64_t rollback = 0;
     check(h1->dcp.stream_req(h, cookie, flags, opaque, vbucket, start, end,
                              vb_uuid, snap_start_seqno, snap_end_seqno, &rollback,
@@ -3480,6 +3988,7 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
     int num_set_vbucket_pending = 0;
     int num_set_vbucket_active = 0;
 
+    ExtendedMetaData *emd = NULL;
     bool pending_marker_ack = false;
     uint64_t marker_end = 0;
 
@@ -3488,6 +3997,7 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
     uint64_t all_bytes = 0;
     uint64_t total_acked_bytes = 0;
     uint64_t ack_limit = flow_control_buf_size/2;
+
     do {
         if (!disable_ack && (bytes_read > ack_limit)) {
             h1->dcp.buffer_acknowledgement(h, cookie, ++opaque, 0, bytes_read);
@@ -3501,7 +4011,6 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
             switch (dcp_last_op) {
                 case PROTOCOL_BINARY_CMD_DCP_MUTATION:
                     check(last_by_seqno < dcp_last_byseqno, "Expected bigger seqno");
-                    check(dcp_last_nru == exp_nru_value, "Expected different NRU value");
                     last_by_seqno = dcp_last_byseqno;
                     num_mutations++;
                     bytes_read += dcp_last_packet_size;
@@ -3510,6 +4019,18 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
                         sendDcpAck(h, h1, cookie, PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER,
                                PROTOCOL_BINARY_RESPONSE_SUCCESS, dcp_last_opaque);
                     }
+                    if (time_sync_enabled) {
+                        check(dcp_last_nmeta == 16,
+                                "Expected extended meta in mutation packet");
+                    } else {
+                        check(dcp_last_nmeta == 5,
+                                "Expected no extended metadata");
+                    }
+
+                    emd = new ExtendedMetaData(dcp_last_meta, dcp_last_nmeta);
+                    check(exp_conflict_res == emd->getConflictResMode(),
+                              "Unexpected conflict resolution mode");
+                    delete emd;
                     break;
                 case PROTOCOL_BINARY_CMD_DCP_DELETION:
                     check(last_by_seqno < dcp_last_byseqno, "Expected bigger seqno");
@@ -3521,6 +4042,18 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
                         sendDcpAck(h, h1, cookie, PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER,
                                PROTOCOL_BINARY_RESPONSE_SUCCESS, dcp_last_opaque);
                     }
+                    if (time_sync_enabled) {
+                        check(dcp_last_nmeta == 16,
+                                "Expected adjusted time in mutation packet");
+                    } else {
+                        check(dcp_last_nmeta == 5,
+                                "Expected no extended metadata");
+                    }
+
+                    emd = new ExtendedMetaData(dcp_last_meta, dcp_last_nmeta);
+                    check(exp_conflict_res == emd->getConflictResMode(),
+                              "Unexpected conflict resolution mode");
+                    delete emd;
                     break;
                 case PROTOCOL_BINARY_CMD_DCP_STREAM_END:
                     done = true;
@@ -3583,6 +4116,9 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
         }
     } while (!done);
 
+    if (total_bytes) {
+        *total_bytes = *total_bytes + all_bytes;
+    }
     check(num_mutations == exp_mutations, "Invalid number of mutations");
     check(num_deletions == exp_deletions, "Invalid number of deletes");
     check(num_snapshot_marker == exp_markers,
@@ -3615,10 +4151,10 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
 }
 
 static void dcp_stream_req(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
-                           uint32_t opaque, uint16_t vbucket, uint64_t start,
+                           uint32_t opaque, uint32_t stream_flag,
+                           uint16_t vbucket, uint64_t start,
                            uint64_t end, uint64_t uuid,
-                           uint64_t snap_start_seqno,
-                           uint64_t snap_end_seqno,
+                           uint64_t snap_start_seqno, uint64_t snap_end_seqno,
                            uint64_t exp_rollback, ENGINE_ERROR_CODE err) {
     const void *cookie = testHarness.create_cookie();
     uint32_t flags = DCP_OPEN_PRODUCER;
@@ -3629,10 +4165,11 @@ static void dcp_stream_req(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
           == ENGINE_SUCCESS, "Failed dcp Consumer open connection.");
 
     uint64_t rollback = 0;
-    ENGINE_ERROR_CODE rv = h1->dcp.stream_req(h, cookie, 0, 1, 0, start, end,
-                                              uuid, snap_start_seqno,
-                                              snap_end_seqno,
-                                              &rollback, mock_dcp_add_failover_log);
+    ENGINE_ERROR_CODE rv = h1->dcp.stream_req(h, cookie, stream_flag, 1,
+                                              vbucket, start, end, uuid,
+                                              snap_start_seqno, snap_end_seqno,
+                                              &rollback,
+                                              mock_dcp_add_failover_log);
     check(rv == err, "Unexpected error code");
     if (err == ENGINE_ROLLBACK || err == ENGINE_KEY_ENOENT) {
         check(exp_rollback == rollback, "Rollback didn't match expected value");
@@ -3669,13 +4206,52 @@ static enum test_result test_dcp_producer_stream_req_partial(ENGINE_HANDLE *h,
     const void *cookie = testHarness.create_cookie();
 
     dcp_stream(h, h1, "unittest", cookie, 0, 0, 95, 209, vb_uuid, 95, 95, 105,
-               100, 2, 0, 2);
+               100, 2, 0);
 
     testHarness.destroy_cookie(cookie);
 
     return SUCCESS;
 }
 
+static enum test_result test_dcp_producer_stream_req_partial_with_time_sync(
+                                                             ENGINE_HANDLE *h,
+                                                             ENGINE_HANDLE_V1 *h1) {
+
+    set_drift_counter_state(h, h1, 1000, 0x01);
+
+    int num_items = 200;
+    for (int j = 0; j < num_items; ++j) {
+        item *i = NULL;
+        std::stringstream ss;
+        ss << "key" << j;
+        check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
+              == ENGINE_SUCCESS, "Failed to store a value");
+        h1->release(h, NULL, i);
+    }
+
+    wait_for_flusher_to_settle(h, h1);
+    stop_persistence(h, h1);
+
+    for (int j = 0; j < (num_items / 2); ++j) {
+        std::stringstream ss;
+        ss << "key" << j;
+        check(del(h, h1, ss.str().c_str(), 0, 0) == ENGINE_SUCCESS,
+              "Expected delete to succeed");
+    }
+
+    wait_for_stat_to_be(h, h1, "vb_0:num_checkpoints", 2, "checkpoint");
+
+    uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+
+    const void *cookie = testHarness.create_cookie();
+
+    dcp_stream(h, h1, "unittest", cookie, 0, 0, 95, 209, vb_uuid, 95, 95, 105,
+               100, 2, 0, false, true, 1);
+
+    testHarness.destroy_cookie(cookie);
+
+    return SUCCESS;
+}
 static enum test_result test_dcp_producer_stream_req_full(ENGINE_HANDLE *h,
                                                           ENGINE_HANDLE_V1 *h1) {
     int num_items = 300;
@@ -3701,7 +4277,7 @@ static enum test_result test_dcp_producer_stream_req_full(ENGINE_HANDLE *h,
     const void *cookie = testHarness.create_cookie();
 
     dcp_stream(h, h1, "unittest", cookie, 0, 0, 0, end, vb_uuid, 0, 0,
-               num_items, 0, 1, 0, 2);
+               num_items, 0, 1, 0);
 
     testHarness.destroy_cookie(cookie);
 
@@ -3726,14 +4302,14 @@ static enum test_result test_dcp_producer_stream_req_disk(ENGINE_HANDLE *h,
     }
 
     verify_curr_items(h, h1, num_items, "Wrong amount of items");
-    wait_for_stat_to_be(h, h1, "vb_0:num_checkpoints", 2, "checkpoint");
+    wait_for_stat_to_be_gte(h, h1, "vb_0:num_checkpoints", 2, "checkpoint");
 
     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
 
     const void *cookie = testHarness.create_cookie();
 
     dcp_stream(h, h1,"unittest", cookie, 0, 0, 0, 200, vb_uuid, 0, 0, 200, 0, 1,
-               0, 2);
+               0);
 
     testHarness.destroy_cookie(cookie);
 
@@ -3765,7 +4341,7 @@ static enum test_result test_dcp_producer_stream_req_diskonly(ENGINE_HANDLE *h,
     const void *cookie = testHarness.create_cookie();
 
     dcp_stream(h, h1, "unittest", cookie, 0, flags, 0, -1, vb_uuid, 0, 0, 300,
-               0, 1, 0, 2);
+               0, 1, 0);
 
     testHarness.destroy_cookie(cookie);
 
@@ -3795,7 +4371,43 @@ static enum test_result test_dcp_producer_stream_req_mem(ENGINE_HANDLE *h,
     const void *cookie = testHarness.create_cookie();
 
     dcp_stream(h, h1, "unittest", cookie, 0, 0, 200, 300, vb_uuid, 200, 200,
-               100, 0, 1, 0, 2);
+               100, 0, 1, 0);
+
+    testHarness.destroy_cookie(cookie);
+
+    return SUCCESS;
+}
+
+static enum test_result test_dcp_producer_stream_req_dgm(ENGINE_HANDLE *h,
+                                                         ENGINE_HANDLE_V1 *h1) {
+    int i = 0;  // Item count
+    while(get_int_stat(h, h1, "vb_active_perc_mem_resident") > 50) {
+        item *itm = NULL;
+        std::stringstream ss;
+        ss << "key" << i;
+        ENGINE_ERROR_CODE ret = store(h, h1, NULL, OPERATION_SET,
+                                      ss.str().c_str(), "somevalue", &itm);
+        if (ret == ENGINE_SUCCESS) {
+            i++;
+        }
+        h1->release(h, NULL, itm);
+    }
+
+    wait_for_flusher_to_settle(h, h1);
+    verify_curr_items(h, h1, i, "Wrong number of items");
+    int num_non_resident = get_int_stat(h, h1, "vb_active_num_non_resident");
+    cb_assert(num_non_resident >= ((float)(50/100) * i));
+
+    uint64_t end = get_int_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
+    uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+
+    set_param(h, h1, protocol_binary_engine_param_flush, "max_size", "5242880");
+    cb_assert(get_int_stat(h, h1, "vb_active_perc_mem_resident") < 50);
+
+    const void *cookie = testHarness.create_cookie();
+
+    dcp_stream(h, h1,"unittest", cookie, 0, 0, 0, end, vb_uuid, 0, 0, i, 0, 1,
+               0);
 
     testHarness.destroy_cookie(cookie);
 
@@ -3826,7 +4438,7 @@ static enum test_result test_dcp_producer_stream_latest(ENGINE_HANDLE *h,
 
     uint32_t flags = DCP_ADD_STREAM_FLAG_LATEST;
     dcp_stream(h, h1, "unittest", cookie, 0, flags, 200, 205, vb_uuid, 200, 200,
-               100, 0, 1, 0, 2);
+               100, 0, 1, 0);
 
     testHarness.destroy_cookie(cookie);
 
@@ -3879,18 +4491,19 @@ static test_result test_dcp_agg_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 
     const void *cookie[5];
 
+    uint64_t total_bytes = 0;
     for (int j = 0; j < 5; ++j) {
         char name[12];
         snprintf(name, sizeof(name), "unittest_%d", j);
         cookie[j] = testHarness.create_cookie();
         dcp_stream(h, h1, name, cookie[j], 0, 0, 200, 300, vb_uuid, 200, 200,
-                   100, 0, 1, 0, 2);
+                   100, 0, 1, 0, false, false, 0, false, &total_bytes);
     }
 
     check(get_int_stat(h, h1, "unittest:producer_count", "dcpagg _") == 5,
           "producer count mismatch");
-    check(get_int_stat(h, h1, "unittest:total_bytes", "dcpagg _") == 32860,
-          "aggregate total bytes sent mismatch");
+    check(get_int_stat(h, h1, "unittest:total_bytes", "dcpagg _") ==
+          (int)total_bytes, "aggregate total bytes sent mismatch");
     check(get_int_stat(h, h1, "unittest:items_sent", "dcpagg _") == 500,
           "aggregate total items sent mismatch");
     check(get_int_stat(h, h1, "unittest:items_remaining", "dcpagg _") == 0,
@@ -3927,7 +4540,7 @@ static test_result test_dcp_takeover(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 
     const void *cookie1 = testHarness.create_cookie();
     dcp_stream(h, h1, "unittest", cookie1, 0, flags, 0, 1000, vb_uuid, 0, 0, 20,
-               0, 2, 10, 2);
+               0, 2, 10);
 
     check(verify_vbucket_state(h, h1, 0, vbucket_state_dead), "Wrong vb state");
 
@@ -4032,6 +4645,7 @@ static uint32_t add_stream_for_consumer(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
     dcp_step(h, h1, cookie);
     uint32_t stream_opaque = dcp_last_opaque;
     cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);
+    cb_assert(dcp_last_key.compare("connection_buffer_size") == 0);
     cb_assert(dcp_last_opaque != opaque);
 
     if (get_int_stat(h, h1, "ep_dcp_enable_noop") == 1) {
@@ -4050,6 +4664,18 @@ static uint32_t add_stream_for_consumer(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
         cb_assert(dcp_last_opaque != opaque);
     }
 
+    dcp_step(h, h1, cookie);
+    stream_opaque = dcp_last_opaque;
+    cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);
+    cb_assert(dcp_last_key.compare("set_priority") == 0);
+    cb_assert(dcp_last_opaque != opaque);
+
+    dcp_step(h, h1, cookie);
+    stream_opaque = dcp_last_opaque;
+    cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);
+    cb_assert(dcp_last_key.compare("enable_ext_metadata") == 0);
+    cb_assert(dcp_last_opaque != opaque);
+
     check(h1->dcp.add_stream(h, cookie, opaque, vbucket, flags)
           == ENGINE_SUCCESS, "Add stream request failed");
 
@@ -4171,7 +4797,8 @@ static enum test_result test_dcp_reconnect(ENGINE_HANDLE *h,
     add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
                             PROTOCOL_BINARY_RESPONSE_SUCCESS);
 
-    uint32_t stream_opaque = 2;
+    uint32_t stream_opaque =
+        get_int_stat(h, h1, "eq_dcpq:unittest:stream_0_opaque", "dcp");
     check(h1->dcp.snapshot_marker(h, cookie, stream_opaque, 0, 0, 10, 2)
         == ENGINE_SUCCESS, "Failed to send snapshot marker");
 
@@ -4293,6 +4920,73 @@ static enum test_result test_dcp_consumer_takeover(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+static enum test_result test_failover_scenario_with_dcp(ENGINE_HANDLE *h,
+                                                        ENGINE_HANDLE_V1 *h1) {
+
+    int num_items = 50;
+    for (int j = 0; j < num_items; ++j) {
+        item *i = NULL;
+        std::stringstream ss;
+        ss << "key" << j;
+        check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
+              == ENGINE_SUCCESS, "Failed to store a value");
+        h1->release(h, NULL, i);
+        if (j % 10 == 0) {
+            wait_for_flusher_to_settle(h, h1);
+            createCheckpoint(h, h1);
+        }
+    }
+
+    createCheckpoint(h, h1);
+    wait_for_flusher_to_settle(h, h1);
+
+    const void *cookie = testHarness.create_cookie();
+    uint32_t opaque = 0xFFFF0000;
+    uint32_t flags = 0;
+    const char *name = "unittest";
+    uint16_t nname = strlen(name);
+
+    check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
+          "Failed to set vbucket state.");
+
+    // Open consumer connection
+    check(h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname)
+          == ENGINE_SUCCESS, "Failed dcp Consumer open connection.");
+
+    add_stream_for_consumer(h, h1, cookie, opaque++, 0,
+                            DCP_ADD_STREAM_FLAG_TAKEOVER,
+                            PROTOCOL_BINARY_RESPONSE_SUCCESS);
+
+    uint32_t stream_opaque =
+        get_int_stat(h, h1, "eq_dcpq:unittest:stream_0_opaque", "dcp");
+
+    check(h1->dcp.snapshot_marker(h, cookie, stream_opaque, 0, 200, 300, 300)
+            == ENGINE_SUCCESS, "Failed to send snapshot marker");
+
+    wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0, "dcp");
+
+    check(h1->dcp.close_stream(h, cookie, stream_opaque, 0) == ENGINE_SUCCESS,
+            "Expected success");
+
+    // Simulating a failover scenario, where the replica vbucket will
+    // be marked as active.
+    check(set_vbucket_state(h, h1, 0, vbucket_state_active),
+            "Failed to set vbucket state.");
+
+    item *i = NULL;
+    check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) ==
+            ENGINE_SUCCESS, "Error in SET operation.");
+
+    h1->release(h, NULL, i);
+
+    wait_for_flusher_to_settle(h, h1);
+    check(get_int_stat(h, h1, "ep_diskqueue_items") == 0,
+            "Unexpected diskqueue");
+
+    testHarness.destroy_cookie(cookie);
+    return SUCCESS;
+}
+
 static enum test_result test_dcp_add_stream(ENGINE_HANDLE *h,
                                             ENGINE_HANDLE_V1 *h1) {
     const void *cookie = testHarness.create_cookie();
@@ -4427,9 +5121,8 @@ static enum test_result test_chk_manager_rollback(ENGINE_HANDLE *h,
                               testHarness.get_current_testcase()->cfg,
                               true, false);
 
-    stop_persistence(h, h1);
-
     wait_for_warmup_complete(h, h1);
+    stop_persistence(h, h1);
 
     for (int j = 0; j < num_items / 2; ++j) {
         item *i = NULL;
@@ -4455,6 +5148,12 @@ static enum test_result test_chk_manager_rollback(ENGINE_HANDLE *h,
     check(h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname)
           == ENGINE_SUCCESS, "Failed dcp Consumer open connection.");
 
+    dcp_step(h, h1, cookie);
+    cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);
+
+    dcp_step(h, h1, cookie);
+    cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);
+
     check(h1->dcp.add_stream(h, cookie, ++opaque, vbid, 0)
           == ENGINE_SUCCESS, "Add stream request failed");
 
@@ -4554,6 +5253,12 @@ static enum test_result test_fullrollback_for_consumer(ENGINE_HANDLE *h,
     dcp_step(h, h1, cookie);
     cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);
 
+    dcp_step(h, h1, cookie);
+    cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);
+
+    dcp_step(h, h1, cookie);
+    cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);
+
     check(h1->dcp.add_stream(h, cookie, opaque, 0, 0)
             == ENGINE_SUCCESS, "Add stream request failed");
 
@@ -4676,6 +5381,12 @@ static enum test_result test_partialrollback_for_consumer(ENGINE_HANDLE *h,
     dcp_step(h, h1, cookie);
     cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);
 
+    dcp_step(h, h1, cookie);
+    cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);
+
+    dcp_step(h, h1, cookie);
+    cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);
+
     check(h1->dcp.add_stream(h, cookie, opaque, 0, 0)
             == ENGINE_SUCCESS, "Add stream request failed");
 
@@ -4809,7 +5520,7 @@ static enum test_result test_dcp_producer_flow_control(ENGINE_HANDLE *h,
     std::string name("unittest");
     const void *cookie = testHarness.create_cookie();
     dcp_stream(h, h1, name.c_str(), cookie, 0, 0, 0, num_items, vb_uuid, 0,
-               0, num_items, 0, 1, 0, 2, false, false,
+               0, num_items, 0, 1, 0, false, false, 0, false, NULL,
                0 /* do not enable flow control */,
                true /* do not ack */);
 
@@ -4818,7 +5529,7 @@ static enum test_result test_dcp_producer_flow_control(ENGINE_HANDLE *h,
     std::string name1("unittest1");
     const void *cookie1 = testHarness.create_cookie();
     dcp_stream(h, h1, name1.c_str(), cookie1, 0, 0, 0, num_items, vb_uuid,
-               0, 0, 1, 0, 1, 0, 2, false, false,
+               0, 0, 1, 0, 1, 0, false, false, 0, false, NULL,
                100 /* flow control buf to low value */,
                true /* do not ack */);
 
@@ -5100,7 +5811,7 @@ static enum test_result test_dcp_consumer_mutate(ENGINE_HANDLE *h, ENGINE_HANDLE
     char *data = static_cast<char *>(malloc(dataLen));
     memset(data, 'x', dataLen);
 
-    uint8_t cas = 0;
+    uint8_t cas = 0x1;
     uint16_t vbucket = 0;
     uint8_t datatype = 1;
     uint64_t bySeqno = 10;
@@ -5108,6 +5819,9 @@ static enum test_result test_dcp_consumer_mutate(ENGINE_HANDLE *h, ENGINE_HANDLE
     uint32_t exprtime = 0;
     uint32_t lockTime = 0;
 
+    check(h1->dcp.snapshot_marker(h, cookie, opaque, 0, 10, 10, 1)
+        == ENGINE_SUCCESS, "Failed to send snapshot marker");
+
     // Ensure that we don't accept invalid opaque values
     check(h1->dcp.mutation(h, cookie, opaque + 1, "key", 3, data, dataLen, cas,
                            vbucket, flags, datatype,
@@ -5127,12 +5841,12 @@ static enum test_result test_dcp_consumer_mutate(ENGINE_HANDLE *h, ENGINE_HANDLE
                            lockTime, NULL, 0, 0) == ENGINE_SUCCESS,
           "Failed dcp mutate.");
 
-    check(set_vbucket_state(h, h1, 0, vbucket_state_active),
-          "Failed to set vbucket state.");
-
     wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0,
                         "dcp");
 
+    check(set_vbucket_state(h, h1, 0, vbucket_state_active),
+          "Failed to set vbucket state.");
+
     check_key_value(h, h1, "key", data, dataLen);
 
     testHarness.destroy_cookie(cookie);
@@ -5141,29 +5855,21 @@ static enum test_result test_dcp_consumer_mutate(ENGINE_HANDLE *h, ENGINE_HANDLE
     return SUCCESS;
 }
 
-static enum test_result test_dcp_consumer_delete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
-    // Store an item
-    item *i = NULL;
-    check(store(h, h1, NULL, OPERATION_ADD,"key", "value", &i) == ENGINE_SUCCESS,
-          "Failed to fail to store an item.");
-    h1->release(h, NULL, i);
-    verify_curr_items(h, h1, 1, "one item stored");
-
-    wait_for_flusher_to_settle(h, h1);
+static enum test_result test_dcp_consumer_mutate_with_time_sync(
+                                                        ENGINE_HANDLE *h,
+                                                        ENGINE_HANDLE_V1 *h1) {
 
     check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
           "Failed to set vbucket state.");
 
+    set_drift_counter_state(h, h1, 1000, 0x01);
+
     const void *cookie = testHarness.create_cookie();
-    uint32_t opaque = 0;
-    uint8_t cas = 0;
-    uint16_t vbucket = 0;
+    uint32_t opaque = 0xFFFF0000;
+    uint32_t seqno = 0;
     uint32_t flags = 0;
-    uint64_t bySeqno = 10;
-    uint64_t revSeqno = 0;
     const char *name = "unittest";
     uint16_t nname = strlen(name);
-    uint32_t seqno = 0;
 
     // Open an DCP connection
     check(h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname)
@@ -5176,25 +5882,213 @@ static enum test_result test_dcp_consumer_delete(ENGINE_HANDLE *h, ENGINE_HANDLE
     opaque = add_stream_for_consumer(h, h1, cookie, opaque, 0, 0,
                                      PROTOCOL_BINARY_RESPONSE_SUCCESS);
 
-    // verify that we don't accept invalid opaque id's
-    check(h1->dcp.deletion(h, cookie, opaque + 1, "key", 3, cas, vbucket,
-                           bySeqno, revSeqno, NULL, 0) == ENGINE_KEY_ENOENT,
-          "Failed to detect invalid DCP opaque value.");
+    uint32_t dataLen = 100;
+    char *data = static_cast<char *>(malloc(dataLen));
+    memset(data, 'x', dataLen);
 
-    // Consume an DCP deletion
-    check(h1->dcp.deletion(h, cookie, opaque, "key", 3, cas, vbucket,
-                           bySeqno, revSeqno, NULL, 0) == ENGINE_SUCCESS,
-          "Failed dcp delete.");
+    uint8_t cas = 0x1;
+    uint16_t vbucket = 0;
+    uint8_t datatype = 1;
+    uint64_t bySeqno = 10;
+    uint64_t revSeqno = 0;
+    uint32_t exprtime = 0;
+    uint32_t lockTime = 0;
 
-    wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0,
-                        "dcp");
+    check(h1->dcp.snapshot_marker(h, cookie, opaque, 0, 10, 10, 1)
+        == ENGINE_SUCCESS, "Failed to send snapshot marker");
 
-    wait_for_stat_change(h, h1, "curr_items", 1);
-    verify_curr_items(h, h1, 0, "one item deleted");
-    testHarness.destroy_cookie(cookie);
+    // Consume a DCP mutation with extended meta
+    int64_t adjusted_time1 = gethrtime() * 2;
+    ExtendedMetaData *emd = new ExtendedMetaData(adjusted_time1, false);
+    cb_assert(emd && emd->getStatus() == ENGINE_SUCCESS);
+    std::pair<const char*, uint16_t> meta = emd->getExtMeta();
+    check(h1->dcp.mutation(h, cookie, opaque, "key", 3, data, dataLen, cas,
+                           vbucket, flags, datatype,
+                           bySeqno, revSeqno, exprtime,
+                           lockTime, meta.first, meta.second, 0)
+            == ENGINE_SUCCESS,
+            "Failed dcp mutate.");
+    delete emd;
 
-    return SUCCESS;
-}
+    wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0, "dcp");
+
+    check(h1->dcp.close_stream(h, cookie, opaque, 0) == ENGINE_SUCCESS,
+            "Expected success");
+
+    check(set_vbucket_state(h, h1, 0, vbucket_state_active),
+          "Failed to set vbucket state.");
+
+    wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0,
+                        "dcp");
+
+    check_key_value(h, h1, "key", data, dataLen);
+
+    testHarness.destroy_cookie(cookie);
+    free(data);
+
+    protocol_binary_request_header *request;
+    int64_t adjusted_time2;
+    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
+                           NULL, 0, NULL, 0);
+    h1->unknown_command(h, NULL, request, add_response);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
+            "Expected Success");
+    checkeq(sizeof(int64_t), last_body.size(),
+            "Bodylen didn't match expected value");
+    memcpy(&adjusted_time2, last_body.data(), last_body.size());
+    adjusted_time2 = ntohll(adjusted_time2);
+
+    /**
+     * Check that adjusted_time2 is marginally greater than
+     * adjusted_time1.
+     */
+    check(adjusted_time2 >= adjusted_time1,
+            "Adjusted time after mutation: Not what is expected");
+
+    return SUCCESS;
+}
+
+
+static enum test_result test_dcp_consumer_delete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+    // Store an item
+    item *i = NULL;
+    check(store(h, h1, NULL, OPERATION_ADD,"key", "value", &i) == ENGINE_SUCCESS,
+          "Failed to fail to store an item.");
+    h1->release(h, NULL, i);
+    verify_curr_items(h, h1, 1, "one item stored");
+
+    wait_for_flusher_to_settle(h, h1);
+
+    check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
+          "Failed to set vbucket state.");
+
+    const void *cookie = testHarness.create_cookie();
+    uint32_t opaque = 0;
+    uint8_t cas = 0;
+    uint16_t vbucket = 0;
+    uint32_t flags = 0;
+    uint64_t bySeqno = 10;
+    uint64_t revSeqno = 0;
+    const char *name = "unittest";
+    uint16_t nname = strlen(name);
+    uint32_t seqno = 0;
+
+    // Open an DCP connection
+    check(h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname)
+          == ENGINE_SUCCESS,
+          "Failed dcp producer open connection.");
+
+    std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
+    check(type.compare("consumer") == 0, "Consumer not found");
+
+    opaque = add_stream_for_consumer(h, h1, cookie, opaque, 0, 0,
+                                     PROTOCOL_BINARY_RESPONSE_SUCCESS);
+
+    check(h1->dcp.snapshot_marker(h, cookie, opaque, 0, 10, 10, 1)
+        == ENGINE_SUCCESS, "Failed to send snapshot marker");
+
+    // verify that we don't accept invalid opaque id's
+    check(h1->dcp.deletion(h, cookie, opaque + 1, "key", 3, cas, vbucket,
+                           bySeqno, revSeqno, NULL, 0) == ENGINE_KEY_ENOENT,
+          "Failed to detect invalid DCP opaque value.");
+
+    // Consume an DCP deletion
+    check(h1->dcp.deletion(h, cookie, opaque, "key", 3, cas, vbucket,
+                           bySeqno, revSeqno, NULL, 0) == ENGINE_SUCCESS,
+          "Failed dcp delete.");
+
+    wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0,
+                        "dcp");
+
+    wait_for_stat_change(h, h1, "curr_items", 1);
+    verify_curr_items(h, h1, 0, "one item deleted");
+    testHarness.destroy_cookie(cookie);
+
+    return SUCCESS;
+}
+
+static enum test_result test_dcp_consumer_delete_with_time_sync(
+                                                        ENGINE_HANDLE *h,
+                                                        ENGINE_HANDLE_V1 *h1) {
+
+    set_drift_counter_state(h, h1, 1000, 0x01);
+
+    // Store an item
+    item *i = NULL;
+    check(store(h, h1, NULL, OPERATION_ADD,"key", "value", &i) == ENGINE_SUCCESS,
+          "Failed to fail to store an item.");
+    h1->release(h, NULL, i);
+    verify_curr_items(h, h1, 1, "one item stored");
+
+    wait_for_flusher_to_settle(h, h1);
+
+    check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
+          "Failed to set vbucket state.");
+
+    const void *cookie = testHarness.create_cookie();
+    uint32_t opaque = 0;
+    uint8_t cas = 0x1;
+    uint16_t vbucket = 0;
+    uint32_t flags = 0;
+    uint64_t bySeqno = 10;
+    uint64_t revSeqno = 0;
+    const char *name = "unittest";
+    uint16_t nname = strlen(name);
+    uint32_t seqno = 0;
+
+    // Open an DCP connection
+    check(h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname)
+          == ENGINE_SUCCESS,
+          "Failed dcp producer open connection.");
+
+    std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
+    check(type.compare("consumer") == 0, "Consumer not found");
+
+    opaque = add_stream_for_consumer(h, h1, cookie, opaque, 0, 0,
+                                     PROTOCOL_BINARY_RESPONSE_SUCCESS);
+
+    check(h1->dcp.snapshot_marker(h, cookie, opaque, 0, 10, 10, 1)
+        == ENGINE_SUCCESS, "Failed to send snapshot marker");
+
+    // Consume an DCP deletion
+    int64_t adjusted_time1 = gethrtime() * 2;
+    ExtendedMetaData *emd = new ExtendedMetaData(adjusted_time1, false);
+    cb_assert(emd && emd->getStatus() == ENGINE_SUCCESS);
+    std::pair<const char*, uint16_t> meta = emd->getExtMeta();
+    check(h1->dcp.deletion(h, cookie, opaque, "key", 3, cas, vbucket,
+                           bySeqno, revSeqno, meta.first, meta.second)
+            == ENGINE_SUCCESS,
+            "Failed dcp delete.");
+    delete emd;
+
+    wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0,
+                        "dcp");
+
+    wait_for_stat_change(h, h1, "curr_items", 1);
+    verify_curr_items(h, h1, 0, "one item deleted");
+    testHarness.destroy_cookie(cookie);
+
+    protocol_binary_request_header *request;
+    int64_t adjusted_time2;
+    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
+                           NULL, 0, NULL, 0);
+    h1->unknown_command(h, NULL, request, add_response);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
+            "Expected Success");
+    checkeq(sizeof(int64_t), last_body.size(),
+            "Bodylen didn't match expected value");
+    memcpy(&adjusted_time2, last_body.data(), last_body.size());
+    adjusted_time2 = ntohll(adjusted_time2);
+
+    /**
+     * Check that adjusted_time2 is marginally greater than
+     * adjusted_time1.
+     */
+    check(adjusted_time2 >= adjusted_time1,
+            "Adjusted time after deletion: Not what is expected");
+
+    return SUCCESS;
+}
 
 static enum test_result test_dcp_consumer_noop(ENGINE_HANDLE *h,
                                                ENGINE_HANDLE_V1 *h1) {
@@ -5240,7 +6134,7 @@ static void dcp_stream_to_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
                                   uint64_t start, uint64_t end,
                                   uint64_t snap_start_seqno,
                                   uint64_t snap_end_seqno,
-                                  uint8_t cas = 0, uint8_t datatype = 1,
+                                  uint8_t cas = 0x1, uint8_t datatype = 1,
                                   uint32_t exprtime = 0, uint32_t lockTime = 0,
                                   uint64_t revSeqno = 0)
 {
@@ -5251,7 +6145,7 @@ static void dcp_stream_to_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
             "Failed to send marker!");
     const std::string data("data");
     /* Send DCP mutations */
-    for (int i = start; i <= end; i++) {
+    for (uint64_t i = start; i <= end; i++) {
         std::stringstream key;
         key << "key" << i;
         checkeq(ENGINE_SUCCESS, h1->dcp.mutation(h, cookie, opaque,
@@ -5302,7 +6196,7 @@ static enum test_result test_dcp_replica_stream_backfill(ENGINE_HANDLE *h,
 
     const void *cookie1 = testHarness.create_cookie();
     dcp_stream(h, h1, "unittest1", cookie1, 0, 0, 0, num_items, vb_uuid, 0, 0,
-               num_items, 0, 1, 0, 2);
+               num_items, 0, 1, 0);
 
     testHarness.destroy_cookie(cookie1);
     testHarness.destroy_cookie(cookie);
@@ -5346,7 +6240,7 @@ static enum test_result test_dcp_replica_stream_in_memory(ENGINE_HANDLE *h,
 
     const void *cookie1 = testHarness.create_cookie();
     dcp_stream(h, h1, "unittest1", cookie1, 0, 0, 0, num_items, vb_uuid, 0, 0,
-               num_items, 0, 1, 0, 2);
+               num_items, 0, 1, 0);
 
     testHarness.destroy_cookie(cookie1);
     testHarness.destroy_cookie(cookie);
@@ -5405,7 +6299,7 @@ static enum test_result test_dcp_replica_stream_all(ENGINE_HANDLE *h,
     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
     const void *cookie1 = testHarness.create_cookie();
     dcp_stream(h, h1, "unittest1", cookie1, 0, 0, 0, end, vb_uuid, 0, 0, 300, 0,
-               1, 0, 2);
+               1, 0);
 
     testHarness.destroy_cookie(cookie1);
     testHarness.destroy_cookie(cookie);
@@ -5420,7 +6314,7 @@ static enum test_result test_tap_rcvr_mutate(ENGINE_HANDLE *h, ENGINE_HANDLE_V1
         char *data = static_cast<char *>(malloc(i));
         memset(data, 'x', i);
         check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                             1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 0,
+                             1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 1,
                              PROTOCOL_BINARY_RAW_BYTES,
                              data, i, 0) == ENGINE_SUCCESS,
               "Failed tap notify.");
@@ -5438,12 +6332,12 @@ static enum test_result test_tap_rcvr_checkpoint(ENGINE_HANDLE *h, ENGINE_HANDLE
     for (int i = 1; i < 10; ++i) {
         data = '0' + i;
         check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                             1, 0, TAP_CHECKPOINT_START, 1, "", 0, 828, 0, 0,
+                             1, 0, TAP_CHECKPOINT_START, 1, "", 0, 828, 0, 1,
                              PROTOCOL_BINARY_RAW_BYTES,
                              &data, 1, 1) == ENGINE_SUCCESS,
               "Failed tap notify.");
         check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                             1, 0, TAP_CHECKPOINT_END, 1, "", 0, 828, 0, 0,
+                             1, 0, TAP_CHECKPOINT_END, 1, "", 0, 828, 0, 1,
                              PROTOCOL_BINARY_RAW_BYTES,
                              &data, 1, 1) == ENGINE_SUCCESS,
               "Failed tap notify.");
@@ -5459,7 +6353,7 @@ static enum test_result test_tap_rcvr_set_vbstate(ENGINE_HANDLE *h, ENGINE_HANDL
     // Get the vbucket UUID before vbucket takeover.
     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_1:0:id", "failovers");
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(vb_state),
-                         1, 0, TAP_VBUCKET_SET, 1, "", 0, 828, 0, 0,
+                         1, 0, TAP_VBUCKET_SET, 1, "", 0, 828, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          "", 0, 1) == ENGINE_SUCCESS,
           "Failed tap notify.");
@@ -5473,7 +6367,7 @@ static enum test_result test_tap_rcvr_mutate_dead(ENGINE_HANDLE *h, ENGINE_HANDL
     char eng_specific[9];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 0,
+                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          "data", 4, 1) == ENGINE_NOT_MY_VBUCKET,
           "Expected not my vbucket.");
@@ -5485,7 +6379,7 @@ static enum test_result test_tap_rcvr_mutate_pending(ENGINE_HANDLE *h, ENGINE_HA
     char eng_specific[9];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 0,
+                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          "data", 4, 1) == ENGINE_SUCCESS,
           "Expected expected success.");
@@ -5497,7 +6391,7 @@ static enum test_result test_tap_rcvr_mutate_replica(ENGINE_HANDLE *h, ENGINE_HA
     char eng_specific[9];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 0,
+                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          "data", 4, 1) == ENGINE_SUCCESS,
           "Expected expected success.");
@@ -5522,7 +6416,7 @@ static enum test_result test_tap_rcvr_mutate_replica_locked(ENGINE_HANDLE *h,
     char eng_specific[9];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 0,
+                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          "data", 4, 0) == ENGINE_SUCCESS,
           "Expected expected success.");
@@ -5533,7 +6427,7 @@ static enum test_result test_tap_rcvr_delete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1
     char* eng_specific[8];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_DELETION, 0, "key", 3, 0, 0, 0,
+                         1, 0, TAP_DELETION, 0, "key", 3, 0, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          0, 0, 0) == ENGINE_SUCCESS,
           "Failed tap notify.");
@@ -5544,7 +6438,7 @@ static enum test_result test_tap_rcvr_delete_dead(ENGINE_HANDLE *h, ENGINE_HANDL
     char* eng_specific[8];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 0,
+                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          NULL, 0, 1) == ENGINE_NOT_MY_VBUCKET,
           "Expected not my vbucket.");
@@ -5557,7 +6451,7 @@ static enum test_result test_tap_rcvr_delete_pending(ENGINE_HANDLE *h, ENGINE_HA
     char* eng_specific[8];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 0,
+                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          NULL, 0, 1) == ENGINE_SUCCESS,
           "Expected expected success.");
@@ -5570,7 +6464,7 @@ static enum test_result test_tap_rcvr_delete_replica(ENGINE_HANDLE *h, ENGINE_HA
     char* eng_specific[8];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 0,
+                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          NULL, 0, 1) == ENGINE_SUCCESS,
           "Expected expected success.");
@@ -5593,7 +6487,7 @@ static enum test_result test_tap_rcvr_delete_replica_locked(ENGINE_HANDLE *h,
     char* eng_specific[8];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 0,
+                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          NULL, 0, 0) == ENGINE_SUCCESS,
           "Expected expected success.");
@@ -5801,9 +6695,11 @@ static enum test_result test_tap_sends_deleted(ENGINE_HANDLE *h, ENGINE_HANDLE_V
         case TAP_DISCONNECT:
             break;
         case TAP_MUTATION:
+            h1->release(h, NULL, it);
             num_mutations++;
             break;
         case TAP_DELETION:
+            h1->release(h, NULL, it);
             num_deletes++;
             break;
         default:
@@ -5865,8 +6761,8 @@ static enum test_result test_sent_from_vb(ENGINE_HANDLE *h,
         case TAP_DISCONNECT:
             break;
         case TAP_MUTATION:
-            break;
         case TAP_DELETION:
+            h1->release(h, NULL, it);
             break;
         default:
             std::cerr << "Unexpected event:  " << event << std::endl;
@@ -6418,6 +7314,7 @@ static enum test_result test_tap_implicit_ack_stream(ENGINE_HANDLE *h, ENGINE_HA
         } else {
             if (event == TAP_MUTATION) {
                 ++mutations;
+                h1->release(h, cookie, it);
             }
             if (seqno == static_cast<uint32_t>(4294967294UL)) {
                 testHarness.unlock_cookie(cookie);
@@ -6449,6 +7346,7 @@ static enum test_result test_tap_implicit_ack_stream(ENGINE_HANDLE *h, ENGINE_HA
         } else {
             if (event == TAP_MUTATION) {
                 ++mutations;
+                h1->release(h, cookie, it);
             }
             if (seqno == 1) {
                 testHarness.unlock_cookie(cookie);
@@ -6481,6 +7379,7 @@ static enum test_result test_tap_implicit_ack_stream(ENGINE_HANDLE *h, ENGINE_HA
         } else {
             if (event == TAP_MUTATION) {
                 ++mutations;
+                h1->release(h, cookie, it);
             } else if (event == TAP_DISCONNECT) {
                 done = true;
             }
@@ -6711,53 +7610,44 @@ static enum test_result test_mem_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 
 static enum test_result test_io_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     h1->reset_stats(h, NULL);
-    check(get_int_stat(h, h1, "ep_io_num_read") == 0,
+
+    check(get_int_stat(h, h1, "rw_0:io_num_read", "kvstore") == 0,
           "Expected reset stats to set io_num_read to zero");
-    check(get_int_stat(h, h1, "ep_io_num_write") == 0,
+    check(get_int_stat(h, h1, "rw_0:io_num_write", "kvstore") == 0,
           "Expected reset stats to set io_num_write to zero");
-    check(get_int_stat(h, h1, "ep_io_read_bytes") == 0,
+    check(get_int_stat(h, h1, "rw_0:io_read_bytes", "kvstore") == 0,
           "Expected reset stats to set io_read_bytes to zero");
-    check(get_int_stat(h, h1, "ep_io_write_bytes") == 0,
+    check(get_int_stat(h, h1, "rw_0:io_write_bytes", "kvstore") == 0,
           "Expected reset stats to set io_write_bytes to zero");
     wait_for_persisted_value(h, h1, "a", "b\r\n");
-    check(get_int_stat(h, h1, "ep_io_num_read") == 0 &&
-          get_int_stat(h, h1, "ep_io_read_bytes") == 0,
+    check(get_int_stat(h, h1, "rw_0:io_num_read", "kvstore") == 0 &&
+          get_int_stat(h, h1, "rw_0:io_read_bytes", "kvstore") == 0,
           "Expected storing one value to not change the read counter");
 
-    check(get_int_stat(h, h1, "ep_io_num_write") == 1 &&
-          get_int_stat(h, h1, "ep_io_write_bytes") == 22,
+    check(get_int_stat(h, h1, "rw_0:io_num_write", "kvstore") == 1 &&
+          get_int_stat(h, h1, "rw_0:io_write_bytes", "kvstore") == 23,
           "Expected storing the key to update the write counter");
     evict_key(h, h1, "a", 0, "Ejected.");
 
     check_key_value(h, h1, "a", "b\r\n", 3, 0);
 
-    check(get_int_stat(h, h1, "ep_io_num_read") == 1 &&
-          get_int_stat(h, h1, "ep_io_read_bytes") == 4,
+    check(get_int_stat(h, h1, "ro_0:io_num_read", "kvstore") == 1 &&
+          get_int_stat(h, h1, "ro_0:io_read_bytes", "kvstore") == 4,
           "Expected reading the value back in to update the read counter");
-    check(get_int_stat(h, h1, "ep_io_num_write") == 1 &&
-          get_int_stat(h, h1, "ep_io_write_bytes") == 22,
+    check(get_int_stat(h, h1, "rw_0:io_num_write", "kvstore") == 1 &&
+          get_int_stat(h, h1, "rw_0:io_write_bytes", "kvstore") == 23,
           "Expected reading the value back in to not update the write counter");
 
-    h1->reset_stats(h, NULL);
-    check(get_int_stat(h, h1, "ep_io_num_read") == 0,
-          "Expected reset stats to set io_num_read to zero");
-    check(get_int_stat(h, h1, "ep_io_num_write") == 0,
-          "Expected reset stats to set io_num_write to zero");
-    check(get_int_stat(h, h1, "ep_io_read_bytes") == 0,
-          "Expected reset stats to set io_read_bytes to zero");
-    check(get_int_stat(h, h1, "ep_io_write_bytes") == 0,
-          "Expected reset stats to set io_write_bytes to zero");
-
-
     return SUCCESS;
 }
 
 static enum test_result test_vb_file_stats(ENGINE_HANDLE *h,
                                         ENGINE_HANDLE_V1 *h1) {
     wait_for_flusher_to_settle(h, h1);
+    wait_for_stat_change(h, h1, "ep_db_data_size", 0);
+
     int old_data_size = get_int_stat(h, h1, "ep_db_data_size");
     int old_file_size = get_int_stat(h, h1, "ep_db_file_size");
-    check(old_data_size != 0, "Expected a non-zero value for ep_db_data_size");
     check(old_file_size != 0, "Expected a non-zero value for ep_db_file_size");
 
     // Write a value and test ...
@@ -6774,6 +7664,38 @@ static enum test_result test_vb_file_stats(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+static enum test_result test_vb_file_stats_after_warmup(ENGINE_HANDLE *h,
+                                                        ENGINE_HANDLE_V1 *h1) {
+
+    item *it = NULL;
+    for (int i = 0; i < 100; ++i) {
+        std::stringstream key;
+        key << "key-" << i;
+        check(ENGINE_SUCCESS ==
+              store(h, h1, NULL, OPERATION_SET, key.str().c_str(), "somevalue", &it),
+              "Error setting.");
+        h1->release(h, NULL, it);
+    }
+    wait_for_flusher_to_settle(h, h1);
+
+    int fileSize = get_int_stat(h, h1, "vb_0:db_file_size", "vbucket-details 0");
+    int spaceUsed = get_int_stat(h, h1, "vb_0:db_data_size", "vbucket-details 0");
+
+    // Restart the engine.
+    testHarness.reload_engine(&h, &h1,
+                              testHarness.engine_path,
+                              testHarness.get_current_testcase()->cfg,
+                              true, false);
+    wait_for_warmup_complete(h, h1);
+
+    int newFileSize = get_int_stat(h, h1, "vb_0:db_file_size", "vbucket-details 0");
+    int newSpaceUsed = get_int_stat(h, h1, "vb_0:db_data_size", "vbucket-details 0");
+
+    check((float)newFileSize >= 0.9 * fileSize, "Unexpected fileSize for vbucket");
+    check((float)newSpaceUsed >= 0.9 * spaceUsed, "Unexpected spaceUsed for vbucket");
+
+    return SUCCESS;
+}
 
 static enum test_result test_bg_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     h1->reset_stats(h, NULL);
@@ -7008,101 +7930,343 @@ static enum test_result test_warmup_conf(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)
     return SUCCESS;
 }
 
-static enum test_result test_datatype(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
-    item *itm = NULL;
-    char key[15] = "{\"foo\":\"bar\"}";
-    uint8_t datatype = PROTOCOL_BINARY_DATATYPE_JSON;
-    uint64_t cas = 0;
-
-    ENGINE_ERROR_CODE rv = h1->allocate(h, NULL, &itm, key,
-                                        strlen(key), 1, 0, 0,
-                                        datatype);
-    check(rv == ENGINE_SUCCESS, "Allocation failed.");
-    rv = h1->store(h, NULL, itm, &cas, OPERATION_SET, 0);
-    h1->release(h, NULL, itm);
-
-    check(h1->get(h, NULL, &itm, key, strlen(key), 0) == ENGINE_SUCCESS,
-            "Unable to get stored item");
+static enum test_result test_bloomfilter_conf(ENGINE_HANDLE *h,
+                                              ENGINE_HANDLE_V1 *h1) {
 
-    item_info info;
-    info.nvalue = 1;
-    h1->get_item_info(h, NULL, itm, &info);
-    h1->release(h, NULL, itm);
-    check(info.datatype == 0x01, "Invalid datatype");
+    if (get_int_stat(h, h1, "ep_bfilter_enabled") == 0) {
+        check(set_param(h, h1, protocol_binary_engine_param_flush,
+                    "bfilter_enabled", "true"),
+                "Set bloomfilter_enabled should have worked");
+    }
+    check(get_int_stat(h, h1, "ep_bfilter_enabled") == 1,
+            "Bloom filter wasn't enabled");
 
-    const char* key1 = "foo";
-    const char* val1 = "{\"foo1\":\"bar1\"}";
-    ItemMetaData itm_meta;
-    itm_meta.revSeqno = 10;
-    itm_meta.cas = info.cas;
-    itm_meta.exptime = info.exptime;
-    itm_meta.flags = info.flags;
-    set_with_meta(h, h1, key1, strlen(key1), val1, strlen(val1), 0, &itm_meta,
-                  last_cas, false, info.datatype);
+    check(get_float_stat(h, h1, "ep_bfilter_residency_threshold") == (float)0.1,
+          "Incorrect initial bfilter_residency_threshold.");
 
-    check(h1->get(h, NULL, &itm, key1, strlen(key1), 0) == ENGINE_SUCCESS,
-            "Unable to get stored item");
+    check(set_param(h, h1, protocol_binary_engine_param_flush,
+          "bfilter_enabled", "false"),
+          "Set bloomfilter_enabled should have worked.");
+    check(set_param(h, h1, protocol_binary_engine_param_flush,
+          "bfilter_residency_threshold", "0.15"),
+          "Set bfilter_residency_threshold should have worked.");
 
-    h1->get_item_info(h, NULL, itm, &info);
-    h1->release(h, NULL, itm);
-    check(info.datatype == 0x01, "Invalid datatype, when setWithMeta");
+    check(get_int_stat(h, h1, "ep_bfilter_enabled") == 0,
+          "Bloom filter should have been disabled.");
+    check(get_float_stat(h, h1, "ep_bfilter_residency_threshold") == (float)0.15,
+          "Incorrect bfilter_residency_threshold.");
 
     return SUCCESS;
 }
 
-static enum test_result test_datatype_with_unknown_command(ENGINE_HANDLE *h,
-                                                           ENGINE_HANDLE_V1 *h1) {
-    item *itm = NULL;
-    const char* key = "foo";
-    const char* val = "{\"foo\":\"bar\"}";
-    uint8_t datatype = PROTOCOL_BINARY_DATATYPE_JSON;
-
-    ItemMetaData itm_meta;
-    itm_meta.revSeqno = 10;
-    itm_meta.cas = 0;
-    itm_meta.exptime = 0;
-    itm_meta.flags = 0;
+static enum test_result test_bloomfilters(ENGINE_HANDLE *h,
+                                          ENGINE_HANDLE_V1 *h1) {
 
-    //SET_WITH_META
-    set_with_meta(h, h1, key, strlen(key), val, strlen(val), 0, &itm_meta,
-                  0, false, datatype);
+    if (get_int_stat(h, h1, "ep_bfilter_enabled") == 0) {
+        check(set_param(h, h1, protocol_binary_engine_param_flush,
+                    "bfilter_enabled", "true"),
+                "Set bloomfilter_enabled should have worked");
+    }
+    check(get_int_stat(h, h1, "ep_bfilter_enabled") == 1,
+            "Bloom filter wasn't enabled");
 
-    check(h1->get(h, NULL, &itm, key, strlen(key), 0) == ENGINE_SUCCESS,
-            "Unable to get stored item");
+    int num_read_attempts = get_int_stat(h, h1, "ep_bg_num_samples");
 
-    item_info info;
-    info.nvalue = 1;
-    h1->get_item_info(h, NULL, itm, &info);
-    check(info.datatype == 0x01, "Invalid datatype, when setWithMeta");
+    // Run compaction to start using the bloomfilter
+    useconds_t sleepTime = 128;
+    compact_db(h, h1, 0, 1, 1, 0);
+    while (get_int_stat(h, h1, "ep_pending_compactions") != 0) {
+        decayingSleep(&sleepTime);
+    }
 
-    //SET_RETURN_META
-    set_ret_meta(h, h1, "foo1", 4, val, strlen(val), 0, 0, 0, 0, datatype);
-    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
-          "Expected set returing meta to succeed");
-    check(last_datatype == 0x01, "Invalid datatype, when set_return_meta");
+    int i;
+    item *it = NULL;
 
-    return SUCCESS;
-}
+    // Insert 10 items.
+    for (i = 0; i < 10; ++i) {
+        std::stringstream key;
+        key << "key-" << i;
+        check(ENGINE_SUCCESS ==
+              store(h, h1, NULL, OPERATION_SET, key.str().c_str(),
+                    "somevalue", &it),
+                    "Error setting.");
+        h1->release(h, NULL, it);
+    }
+    wait_for_flusher_to_settle(h, h1);
 
-static enum test_result test_session_cas_validation(ENGINE_HANDLE *h,
-                                                    ENGINE_HANDLE_V1 *h1) {
-    //Testing PROTOCOL_BINARY_CMD_SET_VBUCKET..
-    char ext[4];
-    protocol_binary_request_header *pkt;
-    vbucket_state_t state = vbucket_state_active;
-    uint32_t val = static_cast<uint32_t>(state);
-    val = htonl(val);
-    memcpy(ext, (char*)&val, sizeof(val));
+    // Evict all 10 items.
+    for (i = 0; i < 10; ++i) {
+        std::stringstream key;
+        key << "key-" << i;
+        evict_key(h, h1, key.str().c_str(), 0, "Ejected.");
+    }
+    wait_for_flusher_to_settle(h, h1);
 
-    uint64_t cas = 0x0101010101010101;
-    pkt = createPacket(PROTOCOL_BINARY_CMD_SET_VBUCKET, 0, cas, ext, 4);
-    check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
-            "SET_VBUCKET command failed");
-    free(pkt);
-    cb_assert(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
+    // Ensure 10 items are non-resident.
+    cb_assert(10 == get_int_stat(h, h1, "ep_num_non_resident"));
 
-    cas = 0x0102030405060708;
-    pkt = createPacket(PROTOCOL_BINARY_CMD_SET_VBUCKET, 0, cas, ext, 4);
+    // Issue delete on first 5 items.
+    for (i = 0; i < 5; ++i) {
+        std::stringstream key;
+        key << "key-" << i;
+        check(del(h, h1, key.str().c_str(), 0, 0) == ENGINE_SUCCESS,
+              "Failed remove with value.");
+    }
+    wait_for_flusher_to_settle(h, h1);
+
+    // Ensure that there are 5 non-resident items
+    cb_assert(5 == get_int_stat(h, h1, "ep_num_non_resident"));
+    cb_assert(5 == get_int_stat(h, h1, "curr_items"));
+
+    check(h1->get_stats(h, NULL, NULL, 0, add_stats) == ENGINE_SUCCESS,
+          "Failed to get stats.");
+    std::string eviction_policy = vals.find("ep_item_eviction_policy")->second;
+
+    if (eviction_policy == "value_only") {  // VALUE-ONLY EVICTION MODE
+        check(get_int_stat(h, h1, "ep_bg_num_samples") == num_read_attempts,
+                "Expected bgFetch attempts to remain unchanged");
+
+        for (i = 0; i < 5; ++i) {
+            std::stringstream key;
+            key << "key-" << i;
+            check(get_meta(h, h1, key.str().c_str()), "Get meta failed");
+        }
+
+        // GetMeta would cause bgFetches as bloomfilter contains
+        // the deleted items.
+        check(get_int_stat(h, h1, "ep_bg_num_samples") == num_read_attempts + 5,
+                "Expected bgFetch attempts to increase by five");
+
+        // Run compaction, with drop_deletes
+        compact_db(h, h1, 0, 15, 15, 1);
+        while (get_int_stat(h, h1, "ep_pending_compactions") != 0) {
+            decayingSleep(&sleepTime);
+        }
+
+        for (i = 0; i < 5; ++i) {
+            std::stringstream key;
+            key << "key-" << i;
+            check(get_meta(h, h1, key.str().c_str()), "Get meta failed");
+        }
+        check(get_int_stat(h, h1, "ep_bg_num_samples") == num_read_attempts + 5,
+                "Expected bgFetch attempts to stay as before");
+
+    } else {                                // FULL EVICTION MODE
+        // Because of issuing deletes on non-resident items
+        check(get_int_stat(h, h1, "ep_bg_num_samples") == num_read_attempts + 5,
+                "Expected bgFetch attempts to increase by five, after deletes");
+
+        // Run compaction, with drop_deletes, to exclude deleted items
+        // from bloomfilter.
+        compact_db(h, h1, 0, 15, 15, 1);
+        while (get_int_stat(h, h1, "ep_pending_compactions") != 0) {
+            decayingSleep(&sleepTime);
+        }
+
+        for (i = 0; i < 5; i++) {
+            std::stringstream key;
+            key << "key-" << i;
+            check(h1->get(h, NULL, &it, key.str().c_str(), key.str().length(), 0)
+                  == ENGINE_KEY_ENOENT,
+                  "Unable to get stored item");
+        }
+        // + 6 because last delete is not purged by the compactor
+        check(get_int_stat(h, h1, "ep_bg_num_samples") == num_read_attempts + 6,
+                "Expected bgFetch attempts to stay as before");
+    }
+
+    return SUCCESS;
+}
+
+static enum test_result test_bloomfilters_with_store_apis(ENGINE_HANDLE *h,
+                                                          ENGINE_HANDLE_V1 *h1) {
+    if (get_int_stat(h, h1, "ep_bfilter_enabled") == 0) {
+        check(set_param(h, h1, protocol_binary_engine_param_flush,
+                    "bfilter_enabled", "true"),
+                "Set bloomfilter_enabled should have worked");
+    }
+    check(get_int_stat(h, h1, "ep_bfilter_enabled") == 1,
+            "Bloom filter wasn't enabled");
+
+    int num_read_attempts = get_int_stat(h, h1, "ep_bg_num_samples");
+
+    // Run compaction to start using the bloomfilter
+    useconds_t sleepTime = 128;
+    compact_db(h, h1, 0, 1, 1, 0);
+    while (get_int_stat(h, h1, "ep_pending_compactions") != 0) {
+        decayingSleep(&sleepTime);
+    }
+
+    for (int i = 0; i < 1000; i++) {
+        std::stringstream key;
+        key << "key-" << i;
+        check(get_meta(h, h1, key.str().c_str()) == false,
+                "Get meta should fail.");
+    }
+
+    check(get_int_stat(h, h1, "ep_bg_num_samples") == num_read_attempts + 0,
+            "Expected no bgFetch attempts");
+
+    check(h1->get_stats(h, NULL, NULL, 0, add_stats) == ENGINE_SUCCESS,
+          "Failed to get stats.");
+    std::string eviction_policy = vals.find("ep_item_eviction_policy")->second;
+
+    if (eviction_policy == "full_eviction") {  // FULL EVICTION MODE
+        // Set with Meta
+        int j;
+        for (j = 0; j < 10; j++) {
+            uint64_t cas_for_set = last_cas;
+            // init some random metadata
+            ItemMetaData itm_meta;
+            itm_meta.revSeqno = 10;
+            itm_meta.cas = 0xdeadbeef;
+            itm_meta.exptime = time(NULL) + 300;
+            itm_meta.flags = 0xdeadbeef;
+
+            std::stringstream key;
+            key << "swm-" << j;
+            set_with_meta(h, h1, key.str().c_str(), key.str().length(),
+                          "somevalue", 9, 0, &itm_meta, cas_for_set);
+        }
+
+        check(get_int_stat(h, h1, "ep_bg_num_samples") == num_read_attempts + 0,
+                "Expected no bgFetch attempts");
+
+        item *itm = NULL;
+        // Add
+        for (j = 0; j < 10; j++) {
+            std::stringstream key;
+            key << "add-" << j;
+
+            check(store(h, h1, NULL, OPERATION_ADD, key.str().c_str(),
+                        "newvalue", &itm) == ENGINE_SUCCESS,
+                    "Failed to add value again.");
+            h1->release(h, NULL, itm);
+        }
+
+        check(get_int_stat(h, h1, "ep_bg_num_samples") == num_read_attempts + 0,
+                "Expected no bgFetch attempts");
+
+        // Delete
+        for (j = 0; j < 10; j++) {
+            std::stringstream key;
+            key << "del-" << j;
+            check(del(h, h1, key.str().c_str(), 0, 0) == ENGINE_KEY_ENOENT,
+                    "Failed remove with value.");
+        }
+
+        check(get_int_stat(h, h1, "ep_bg_num_samples") == num_read_attempts + 0,
+                "Expected no bgFetch attempts");
+
+    }
+
+    return SUCCESS;
+}
+
+static enum test_result test_datatype(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+    const void *cookie = testHarness.create_cookie();
+    testHarness.set_datatype_support(cookie, true);
+
+    item *itm = NULL;
+    char key[15] = "{\"foo\":\"bar\"}";
+    uint8_t datatype = PROTOCOL_BINARY_DATATYPE_JSON;
+    uint64_t cas = 0;
+
+    ENGINE_ERROR_CODE rv = h1->allocate(h, cookie, &itm, key,
+                                        strlen(key), 1, 0, 0,
+                                        datatype);
+    check(rv == ENGINE_SUCCESS, "Allocation failed.");
+    rv = h1->store(h, cookie, itm, &cas, OPERATION_SET, 0);
+    h1->release(h, cookie, itm);
+
+    check(h1->get(h, cookie, &itm, key, strlen(key), 0) == ENGINE_SUCCESS,
+            "Unable to get stored item");
+
+    item_info info;
+    info.nvalue = 1;
+    h1->get_item_info(h, cookie, itm, &info);
+    h1->release(h, cookie, itm);
+    check(info.datatype == 0x01, "Invalid datatype");
+
+    const char* key1 = "foo";
+    const char* val1 = "{\"foo1\":\"bar1\"}";
+    ItemMetaData itm_meta;
+    itm_meta.revSeqno = 10;
+    itm_meta.cas = info.cas;
+    itm_meta.exptime = info.exptime;
+    itm_meta.flags = info.flags;
+    set_with_meta(h, h1, key1, strlen(key1), val1, strlen(val1), 0, &itm_meta,
+                  last_cas, false, info.datatype, false, 0, 0, cookie);
+
+    check(h1->get(h, cookie, &itm, key1, strlen(key1), 0) == ENGINE_SUCCESS,
+            "Unable to get stored item");
+
+    h1->get_item_info(h, cookie, itm, &info);
+    h1->release(h, cookie, itm);
+    check(info.datatype == 0x01, "Invalid datatype, when setWithMeta");
+
+    testHarness.destroy_cookie(cookie);
+    return SUCCESS;
+}
+
+static enum test_result test_datatype_with_unknown_command(ENGINE_HANDLE *h,
+                                                           ENGINE_HANDLE_V1 *h1) {
+    const void *cookie = testHarness.create_cookie();
+    testHarness.set_datatype_support(cookie, true);
+    item *itm = NULL;
+    const char* key = "foo";
+    const char* val = "{\"foo\":\"bar\"}";
+    uint8_t datatype = PROTOCOL_BINARY_DATATYPE_JSON;
+
+    ItemMetaData itm_meta;
+    itm_meta.revSeqno = 10;
+    itm_meta.cas = 0x1;
+    itm_meta.exptime = 0;
+    itm_meta.flags = 0;
+
+    //SET_WITH_META
+    set_with_meta(h, h1, key, strlen(key), val, strlen(val), 0, &itm_meta,
+                  0, false, datatype, false, 0, 0, cookie);
+
+    check(h1->get(h, cookie, &itm, key, strlen(key), 0) == ENGINE_SUCCESS,
+            "Unable to get stored item");
+
+    item_info info;
+    info.nvalue = 1;
+    h1->get_item_info(h, cookie, itm, &info);
+    h1->release(h, NULL, itm);
+    check(info.datatype == 0x01, "Invalid datatype, when setWithMeta");
+
+    //SET_RETURN_META
+    set_ret_meta(h, h1, "foo1", 4, val, strlen(val), 0, 0, 0, 0, datatype,
+                 cookie);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
+          "Expected set returing meta to succeed");
+    check(last_datatype == 0x01, "Invalid datatype, when set_return_meta");
+
+    testHarness.destroy_cookie(cookie);
+    return SUCCESS;
+}
+
+static enum test_result test_session_cas_validation(ENGINE_HANDLE *h,
+                                                    ENGINE_HANDLE_V1 *h1) {
+    //Testing PROTOCOL_BINARY_CMD_SET_VBUCKET..
+    char ext[4];
+    protocol_binary_request_header *pkt;
+    vbucket_state_t state = vbucket_state_active;
+    uint32_t val = static_cast<uint32_t>(state);
+    val = htonl(val);
+    memcpy(ext, (char*)&val, sizeof(val));
+
+    uint64_t cas = 0x0101010101010101;
+    pkt = createPacket(PROTOCOL_BINARY_CMD_SET_VBUCKET, 0, cas, ext, 4);
+    check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
+            "SET_VBUCKET command failed");
+    free(pkt);
+    cb_assert(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
+
+    cas = 0x0102030405060708;
+    pkt = createPacket(PROTOCOL_BINARY_CMD_SET_VBUCKET, 0, cas, ext, 4);
     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
             "SET_VBUCKET command failed");
     free(pkt);
@@ -7560,6 +8724,36 @@ static enum test_result test_all_keys_api(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1
     return SUCCESS;
 }
 
+static enum test_result test_all_keys_api_during_bucket_creation(
+                                ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+
+    uint8_t extlen = 4;
+    uint32_t count = htonl(5);
+    char *ext = new char[extlen];
+    memcpy(ext, (char*)&count, sizeof(count));
+    uint16_t keylen = 6;
+
+    protocol_binary_request_header *pkt1 =
+        createPacket(CMD_GET_KEYS, 1, 0, ext, extlen,
+                     "key_10", keylen, NULL, 0, 0x00);
+    delete[] ext;
+
+    stop_persistence(h, h1);
+    check(set_vbucket_state(h, h1, 1, vbucket_state_active),
+          "Failed set vbucket 1 state.");
+
+    ENGINE_ERROR_CODE err = h1->unknown_command(h, NULL, pkt1,
+                                                add_response);
+    start_persistence(h, h1);
+
+    check(err == ENGINE_SUCCESS,
+          "Unexpected return code from all_keys_api");
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
+            "Unexpected response status");
+
+    return SUCCESS;
+}
+
 static enum test_result test_curr_items(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     item *i = NULL;
 
@@ -7588,8 +8782,10 @@ static enum test_result test_curr_items(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)
     verify_curr_items(h, h1, 2, "one item deleted - persisted");
 
     // Verify flush case (remove the two remaining from above)
+    set_degraded_mode(h, h1, NULL, true);
     check(h1->flush(h, NULL, 0) == ENGINE_SUCCESS,
           "Failed to flush");
+    set_degraded_mode(h, h1, NULL, false);
     verify_curr_items(h, h1, 0, "flush");
 
     // Verify dead vbucket case.
@@ -7728,7 +8924,6 @@ static enum test_result test_mb5172(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 
 static enum test_result test_mb3169(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     item *i = NULL;
-    uint64_t cas(0);
     uint64_t result(0);
     check(store(h, h1, NULL, OPERATION_SET, "set", "value", &i, 0, 0)
           == ENGINE_SUCCESS, "Failed to store a value");
@@ -7762,8 +8957,9 @@ static enum test_result test_mb3169(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
           "Expected mutation to mark item resident");
 
     check(h1->arithmetic(h, NULL, "incr", 4, true, false, 1, 1, 0,
-                         &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
+                         &i, PROTOCOL_BINARY_RAW_BYTES, &result,
                          0)  == ENGINE_SUCCESS, "Incr failed");
+    h1->release(h, NULL, i);
 
     check(get_int_stat(h, h1, "ep_num_non_resident") == 2,
           "Expected incr to mark item resident");
@@ -7891,8 +9087,6 @@ static enum test_result test_disk_gt_ram_golden(ENGINE_HANDLE *h,
 
     check(get_int_stat(h, h1, "ep_overhead") == overhead,
           "Fell below initial overhead.");
-    check(get_int_stat(h, h1, "mem_used") == overhead,
-          "mem_used (ep_kv_size + ep_overhead) should be greater than ep_overhead");
 
     return SUCCESS;
 }
@@ -7922,23 +9116,23 @@ static enum test_result test_disk_gt_ram_paged_rm(ENGINE_HANDLE *h,
 
     check(get_int_stat(h, h1, "ep_overhead") == overhead,
           "Fell below initial overhead.");
-    check(get_int_stat(h, h1, "mem_used") == overhead,
-          "mem_used (ep_kv_size + ep_overhead) should be greater than ep_overhead");
 
     return SUCCESS;
 }
 
 static enum test_result test_disk_gt_ram_incr(ENGINE_HANDLE *h,
                                               ENGINE_HANDLE_V1 *h1) {
-    uint64_t cas = 0, result = 0;
+    uint64_t result = 0;
+    item *i = NULL;
     wait_for_persisted_value(h, h1, "k1", "13");
 
     evict_key(h, h1, "k1");
 
     check(h1->arithmetic(h, NULL, "k1", 2, true, false, 1, 1, 0,
-                         &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
+                         &i, PROTOCOL_BINARY_RAW_BYTES, &result,
                          0) == ENGINE_SUCCESS,
           "Failed to incr value.");
+    h1->release(h, NULL, i);
 
     check_key_value(h, h1, "k1", "14", 2);
 
@@ -8010,11 +9204,13 @@ extern "C" {
 
         usleep(2600); // Exacerbate race condition.
 
-        uint64_t cas = 0, result = 0;
+        uint64_t result = 0;
+        item *i = NULL;
         check(td->h1->arithmetic(td->h, NULL, "k1", 2, true, false, 1, 1, 0,
-                                 &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
+                                 &i, PROTOCOL_BINARY_RAW_BYTES, &result,
                                  0) == ENGINE_SUCCESS,
               "Failed to incr value.");
+        td->h1->release(td->h, NULL, i);
 
         delete td;
     }
@@ -8157,7 +9353,6 @@ static enum test_result test_validate_engine_handle(ENGINE_HANDLE *h, ENGINE_HAN
     check(h1->unknown_command != NULL, "unknown_command member should be initialized to a non-NULL value");
     check(h1->tap_notify != NULL, "tap_notify member should be initialized to a non-NULL value");
     check(h1->get_tap_iterator != NULL, "get_tap_iterator member should be initialized to a non-NULL value");
-    check(h1->errinfo == NULL, "errinfo member should be initialized to NULL");
 
     return SUCCESS;
 }
@@ -8322,6 +9517,7 @@ static enum test_result test_dcp_persistence_seqno(ENGINE_HANDLE *h,
 static enum test_result test_dcp_last_items_purged(ENGINE_HANDLE *h,
                                                    ENGINE_HANDLE_V1 *h1) {
     item_info info;
+    mutation_descr_t mut_info;
     uint64_t vb_uuid = 0;
     uint64_t cas = 0;
     uint32_t high_seqno = 0;
@@ -8338,10 +9534,13 @@ static enum test_result test_dcp_last_items_purged(ENGINE_HANDLE *h,
                     0, 0, 0), "Error setting.");
     }
 
+    memset(&mut_info, 0, sizeof(mut_info));
+
     /* Delete last 2 items */
     for (int count = 1; count < num_items; count++){
-        check(h1->remove(h, NULL, key[count], strlen(key[count]), &cas, 0)
-              == ENGINE_SUCCESS, "Failed remove with value.");
+        check(h1->remove(h, NULL, key[count], strlen(key[count]), &cas, 0,
+                         &mut_info) == ENGINE_SUCCESS,
+              "Failed remove with value.");
         cas = 0;
     }
 
@@ -8365,7 +9564,7 @@ static enum test_result test_dcp_last_items_purged(ENGINE_HANDLE *h,
     const void *cookie = testHarness.create_cookie();
     high_seqno = get_ull_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
     dcp_stream(h, h1, "unittest", cookie, 0, 0, 0, high_seqno, vb_uuid, 0, 0,
-               1, 1, 1, 0, 2, false, true);
+               1, 1, 1, 0, false, false, 0, true);
 
     testHarness.destroy_cookie(cookie);
     return SUCCESS;
@@ -8374,6 +9573,7 @@ static enum test_result test_dcp_last_items_purged(ENGINE_HANDLE *h,
 static enum test_result test_dcp_rollback_after_purge(ENGINE_HANDLE *h,
                                                       ENGINE_HANDLE_V1 *h1) {
     item_info info;
+    mutation_descr_t mut_info;
     uint64_t vb_uuid = 0;
     uint64_t cas = 0;
     uint32_t high_seqno = 0;
@@ -8397,14 +9597,16 @@ static enum test_result test_dcp_rollback_after_purge(ENGINE_HANDLE *h,
     /* Create a DCP stream to send 3 items to the replica */
     const void *cookie = testHarness.create_cookie();
     dcp_stream(h, h1, "unittest", cookie, 0, 0, 0, high_seqno, vb_uuid, 0, 0,
-               3, 0, 1, 0, 2, false, true);
+               3, 0, 1, 0, false, false, 0, true);
 
     testHarness.destroy_cookie(cookie);
 
+    memset(&mut_info, 0, sizeof(mut_info));
     /* Delete last 2 items */
     for (int count = 1; count < num_items; count++){
-        check(h1->remove(h, NULL, key[count], strlen(key[count]), &cas, 0)
-              == ENGINE_SUCCESS, "Failed remove with value.");
+        check(h1->remove(h, NULL, key[count], strlen(key[count]), &cas, 0,
+                         &mut_info) == ENGINE_SUCCESS,
+              "Failed remove with value.");
         cas = 0;
     }
     high_seqno = get_ull_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
@@ -8423,12 +9625,12 @@ static enum test_result test_dcp_rollback_after_purge(ENGINE_HANDLE *h,
     wait_for_stat_to_be(h, h1, "vb_0:num_checkpoints", 1, "checkpoint");
 
     /* DCP stream, expect a rollback to seq 0 */
-    dcp_stream_req(h, h1, 1, 0, 3, high_seqno, vb_uuid,
+    dcp_stream_req(h, h1, 1, 0, 0, 3, high_seqno, vb_uuid,
                    3, high_seqno, 0, ENGINE_ROLLBACK);
 
     /* Do not expect rollback when you already have all items in the snapshot
        (that is, start == snap_end_seqno)*/
-    dcp_stream_req(h, h1, 1, 0, high_seqno, high_seqno + 10, vb_uuid,
+    dcp_stream_req(h, h1, 1, 0, 0, high_seqno, high_seqno + 10, vb_uuid,
                    0, high_seqno, 0, ENGINE_SUCCESS);
 
     return SUCCESS;
@@ -8436,7 +9638,6 @@ static enum test_result test_dcp_rollback_after_purge(ENGINE_HANDLE *h,
 
 static enum test_result test_dcp_erroneous_mutations(ENGINE_HANDLE *h,
                                                      ENGINE_HANDLE_V1 *h1) {
-
     check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
           "Failed to set vbucket state");
     wait_for_flusher_to_settle(h, h1);
@@ -8460,10 +9661,9 @@ static enum test_result test_dcp_erroneous_mutations(ENGINE_HANDLE *h,
             ENGINE_SUCCESS,
             "Failed to send snapshot marker!");
     for (int i = 5; i <= 10; i++) {
-        std::stringstream key;
-        key << "key" << i;
-        checkeq(h1->dcp.mutation(h, cookie, stream_opaque, key.str().c_str(),
-                                 key.str().length(), "value", 5, i * 3, 0, 0, 0,
+        std::string key("key" + std::to_string(i));
+        checkeq(h1->dcp.mutation(h, cookie, stream_opaque, key.c_str(),
+                                 key.length(), "value", 5, i * 3, 0, 0, 0,
                                  i, 0, 0, 0, "", 0, INITIAL_NRU_VALUE),
                 ENGINE_SUCCESS,
                 "Unexpected return code for mutation!");
@@ -8533,10 +9733,9 @@ static enum test_result test_dcp_erroneous_marker(ENGINE_HANDLE *h,
             ENGINE_SUCCESS,
             "Failed to send snapshot marker!");
     for (int i = 1; i <= 10; i++) {
-        std::stringstream key;
-        key << "key" << i;
-        checkeq(h1->dcp.mutation(h, cookie1, stream_opaque, key.str().c_str(),
-                                 key.str().length(), "value", 5, i * 3, 0, 0, 0,
+        std::string key("key" + std::to_string(i));
+        checkeq(h1->dcp.mutation(h, cookie1, stream_opaque, key.c_str(),
+                                 key.length(), "value", 5, i * 3, 0, 0, 0,
                                  i, 0, 0, 0, "", 0, INITIAL_NRU_VALUE),
                 ENGINE_SUCCESS,
                 "Unexpected return code for mutation!");
@@ -8575,10 +9774,9 @@ static enum test_result test_dcp_erroneous_marker(ENGINE_HANDLE *h,
             ENGINE_SUCCESS,
             "Failed to send snapshot marker!");
     for (int i = 5; i <= 15; i++) {
-        std::stringstream key;
-        key << "key" << i;
+        std::string key("key_" + std::to_string(i));
         ENGINE_ERROR_CODE err = h1->dcp.mutation(h, cookie2, stream_opaque,
-                                                 key.str().c_str(), key.str().length(),
+                                                 key.c_str(), key.length(),
                                                  "val", 3, i * 3, 0, 0, 0, i,
                                                  0, 0, 0, "", 0, INITIAL_NRU_VALUE);
         if (i <= 10) {
@@ -8597,7 +9795,7 @@ static enum test_result test_dcp_erroneous_marker(ENGINE_HANDLE *h,
 }
 
 static enum test_result test_dcp_invalid_mutation_deletion(ENGINE_HANDLE* h,
-                                                           ENGINE_HANDLE_V1* h1) {
+                                                              ENGINE_HANDLE_V1* h1) {
 
     check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
           "Failed to set vbucket state");
@@ -8662,10 +9860,9 @@ static enum test_result test_dcp_invalid_snapshot_marker(ENGINE_HANDLE* h,
             ENGINE_SUCCESS,
             "Failed to send snapshot marker!");
     for (int i = 1; i <= 10; i++) {
-        std::stringstream key;
-        key << "key" << i;
-        checkeq(h1->dcp.mutation(h, cookie, stream_opaque, key.str().c_str(),
-                                 key.str().length(), "value", 5, i * 3, 0, 0, 0,
+        std::string key("key" + std::to_string(i));
+        checkeq(h1->dcp.mutation(h, cookie, stream_opaque, key.c_str(),
+                                 key.length(), "value", 5, i * 3, 0, 0, 0,
                                  i, 0, 0, 0, "", 0, INITIAL_NRU_VALUE),
                 ENGINE_SUCCESS,
                 "Unexpected return code for mutation!");
@@ -8719,10 +9916,6 @@ static enum test_result test_dcp_early_termination(ENGINE_HANDLE* h,
     }
     wait_for_flusher_to_settle(h, h1);
 
-    // Account for tasks already in the auxIO futureQ
-    int auxIOFutureQ = get_int_stat(h, h1, "ep_workload:LowPrioQ_AuxIO:InQsize",
-                                    "workload");
-
     const void *cookie = testHarness.create_cookie();
     uint32_t opaque = 1;
     check(h1->dcp.open(h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER,
@@ -8749,14 +9942,139 @@ static enum test_result test_dcp_early_termination(ENGINE_HANDLE* h,
     // Destroy the connection
     testHarness.destroy_cookie(cookie);
 
-    // Let all AUXIO (backfills) finish
-    // - FutureQ of auxIO should contain less than or equal to what
-    // was in it before DCP
-    // - ReadyQ of auxIO to contain 0 tasks
-    wait_for_stat_to_be_lte(h, h1, "ep_workload:LowPrioQ_AuxIO:InQsize",
-                            auxIOFutureQ, "workload");
-    wait_for_stat_to_be(h, h1, "ep_workload:LowPrioQ_AuxIO:OutQsize",
-                        0, "workload");
+    // Let all backfills finish
+    wait_for_stat_to_be(h, h1, "ep_dcp_num_running_backfills", 0, "dcp");
+
+    return SUCCESS;
+}
+
+// Check that an incoming DCP mutation which has an invalid CAS is fixed up
+// by the engine.
+static enum test_result test_mb17517_cas_minus_1_dcp(ENGINE_HANDLE *h,
+                                                     ENGINE_HANDLE_V1 *h1) {
+    // Attempt to insert a item with CAS of -1 via DCP.
+    const void *cookie = testHarness.create_cookie();
+    uint32_t opaque = 0xFFFF0000;
+    uint32_t flags = 0;
+    std::string name = "test_mb17517_cas_minus_1";
+
+    // Switch vb 0 to replica (to accept DCP mutaitons).
+    check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
+          "Failed to set vbucket state to replica.");
+
+    // Open consumer connection
+    checkeq(h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name.c_str(),
+                         name.size()),
+            ENGINE_SUCCESS, "Failed DCP Consumer open connection.");
+
+    add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
+                            PROTOCOL_BINARY_RESPONSE_SUCCESS);
+
+    uint32_t stream_opaque = get_int_stat(h, h1,
+                                          ("eq_dcpq:" + name + ":stream_0_opaque").c_str(),
+                                          "dcp");
+
+    h1->dcp.snapshot_marker(h, cookie,  stream_opaque, /*vbid*/0,
+                            /*start*/0, /*end*/3, /*flags*/2);
+
+    // Create two items via a DCP mutation.
+    const std::string prefix{"bad_CAS_DCP"};
+    std::string value{"value"};
+    for (unsigned int ii = 0; ii < 2; ii++) {
+        std::string key{prefix + std::to_string(ii)};
+        checkeq(ENGINE_SUCCESS,
+                h1->dcp.mutation(h, cookie, stream_opaque, key.c_str(), key.size(),
+                                 value.c_str(), value.size(), /*cas*/-1,
+                                 /*vbucket*/0,
+                                 /*flags*/0, PROTOCOL_BINARY_RAW_BYTES,
+                                 /*by_seqno*/ii + 1, /*rev_seqno*/1,
+                                 /*expiration*/0, /*lock_time*/0,
+                                 /*meta*/nullptr, /*nmeta*/0, INITIAL_NRU_VALUE),
+                                 "Expected DCP mutation with CAS:-1 to succeed");
+    }
+
+    // Ensure we have processed the mutations.
+    wait_for_stat_to_be(h, h1, "vb_replica_curr_items", 2);
+
+    // Delete one of them (to allow us to test DCP deletion).
+    std::string delete_key{prefix + "0"};
+    checkeq(ENGINE_SUCCESS,
+            h1->dcp.deletion(h, cookie, stream_opaque, delete_key.c_str(),
+                             delete_key.size(), /*cas*/-1, /*vbucket*/0,
+                             /*by_seqno*/3, /*rev_seqno*/2,
+                             /*meta*/nullptr, /*nmeta*/0),
+                             "Expected DCP deletion with CAS:-1 to succeed");
+
+    // Ensure we have processed the deletion.
+    wait_for_stat_to_be(h, h1, "vb_replica_curr_items", 1);
+
+    // Flip vBucket to active so we can access the documents in it.
+    check(set_vbucket_state(h, h1, 0, vbucket_state_active),
+          "Failed to set vbucket state to active.");
+
+    // Check that a valid CAS was regenerated for the (non-deleted) mutation.
+    std::string key{prefix + "1"};
+    auto cas = get_CAS(h, h1, key);
+    checkne(~uint64_t(0), cas, "CAS via get() is still -1");
+
+    testHarness.destroy_cookie(cookie);
+
+    return SUCCESS;
+}
+
+// Check that an incoming TAP mutation which has an invalid CAS is fixed up
+// by the engine.
+static enum test_result test_mb17517_cas_minus_1_tap(ENGINE_HANDLE *h,
+                                                     ENGINE_HANDLE_V1 *h1) {
+    const uint16_t vbucket = 0;
+    // Need a replica vBucket to send mutations into.
+    check(set_vbucket_state(h, h1, vbucket, vbucket_state_replica),
+          "Failed to set vbucket state.");
+
+    char eng_specific[9];
+    memset(eng_specific, 0, sizeof(eng_specific));
+
+    // Create two items via TAP.
+    std::string prefix{"bad_CAS_TAP"};
+    std::string value{"value"};
+    for (unsigned int ii = 0; ii < 2; ii++) {
+        std::string key{prefix + std::to_string(ii)};
+        checkeq(ENGINE_SUCCESS,
+                h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
+                               /*TTL*/1, /*tap_flags*/0, TAP_MUTATION,
+                               /*tap_seqno*/ii + 1,
+                               key.c_str(), key.size(), /*flags*/0, /*exptime*/0,
+                               /*CAS*/-1, PROTOCOL_BINARY_RAW_BYTES,
+                           value.c_str(), value.size(), vbucket),
+            "Expected tap_notify to succeed.");
+    }
+
+    // Ensure we have processed the mutations.
+    wait_for_stat_to_be(h, h1, "vb_replica_curr_items", 2);
+
+    // Delete one of the items.
+    std::string delete_key{prefix + "0"};
+    checkeq(ENGINE_SUCCESS,
+            h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
+                           /*TTL*/1, /*tap_flags*/0, TAP_DELETION,
+                           /*tap_seqno*/2, delete_key.c_str(),
+                           delete_key.size(), /*flags*/0, /*exptime*/0,
+                           /*CAS*/-1, PROTOCOL_BINARY_RAW_BYTES,
+                       value.c_str(), value.size(), vbucket),
+        "Expected tap_notify to succeed.");
+
+    // Ensure we have processed the deletion.
+    wait_for_stat_to_be(h, h1, "vb_replica_curr_items", 1);
+
+    // Flip vBucket to active so we can access the documents in it.
+    check(set_vbucket_state(h, h1, 0, vbucket_state_active),
+          "Failed to set vbucket state to active.");
+
+    // Check that a valid CAS was regenerated for the (non-deleted) mutation.
+    std::string key{prefix + "1"};
+    auto cas = get_CAS(h, h1, key);
+    checkne(~uint64_t(0), cas, "CAS via get() is still -1");
+
     return SUCCESS;
 }
 
@@ -8891,6 +10209,8 @@ static enum test_result test_get_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)
     check(last_meta.cas == it->getCas(), "Expected cas to match");
     check(last_meta.exptime == it->getExptime(), "Expected exptime to match");
     check(last_meta.flags == it->getFlags(), "Expected flags to match");
+    check(last_conflict_resolution_mode == static_cast<uint8_t>(-1),
+            "Expected to not receive the conflict resolution mode");
     // check the stat again
     temp = get_int_stat(h, h1, "ep_num_ops_get_meta");
     check(temp == 1, "Expect one getMeta op");
@@ -8899,19 +10219,68 @@ static enum test_result test_get_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)
     return SUCCESS;
 }
 
-static enum test_result test_get_meta_deleted(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)
+static enum test_result test_get_meta_with_extras(ENGINE_HANDLE *h,
+                                                  ENGINE_HANDLE_V1 *h1)
 {
-    char const *key = "k1";
+    const char *key1 = "test_getm_one";
     item *i = NULL;
-
-    check(store(h, h1, NULL, OPERATION_SET, key, "somevalue", &i) == ENGINE_SUCCESS,
-          "Failed set.");
-    h1->release(h, NULL, i);
-    check(store(h, h1, NULL, OPERATION_SET, key, "somevalue", &i) == ENGINE_SUCCESS,
+    check(store(h, h1, NULL, OPERATION_SET, key1, "somevalue", &i) ==
+            ENGINE_SUCCESS,
           "Failed set.");
+    Item *it1 = reinterpret_cast<Item*>(i);
+    // check the stat
+    size_t temp = get_int_stat(h, h1, "ep_num_ops_get_meta");
+    check(temp == 0, "Expect zero getMeta ops");
 
-    Item *it = reinterpret_cast<Item*>(i);
-    wait_for_flusher_to_settle(h, h1);
+    check(get_meta(h, h1, key1, true), "Expected to get meta");
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+    check(last_meta.revSeqno == it1->getRevSeqno(), "Expected seqno to match");
+    check(last_meta.cas == it1->getCas(), "Expected cas to match");
+    check(last_meta.exptime == it1->getExptime(), "Expected exptime to match");
+    check(last_meta.flags == it1->getFlags(), "Expected flags to match");
+    check(last_conflict_resolution_mode == 0,
+            "Expected to receive the conflict resolution mode revid_based");
+    // check the stat again
+    temp = get_int_stat(h, h1, "ep_num_ops_get_meta");
+    check(temp == 1, "Expect one getMeta op");
+    h1->release(h, NULL, i);
+
+    // Enable time synchronization
+    set_drift_counter_state(h, h1, 1000, 0x01);
+
+    const char *key2 = "test_getm_two";
+    check(store(h, h1, NULL, OPERATION_SET, key2, "somevalue", &i) ==
+            ENGINE_SUCCESS,
+          "Failed set.");
+    Item *it2 = reinterpret_cast<Item*>(i);
+    check(get_meta(h, h1, key2, true), "Expected to get meta");
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+    check(last_meta.revSeqno == it2->getRevSeqno(), "Expected seqno to match");
+    check(last_meta.cas == it2->getCas(), "Expected cas to match");
+    check(last_meta.exptime == it2->getExptime(), "Expected exptime to match");
+    check(last_meta.flags == it2->getFlags(), "Expected flags to match");
+    check(last_conflict_resolution_mode == 1,
+            "Expected to receive the conflict resolution mode lww_based");
+    temp = get_int_stat(h, h1, "ep_num_ops_get_meta");
+    check(temp == 2, "Expect one getMeta op");
+    h1->release(h, NULL, i);
+
+    return SUCCESS;
+}
+
+static enum test_result test_get_meta_deleted(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)
+{
+    char const *key = "k1";
+    item *i = NULL;
+
+    check(store(h, h1, NULL, OPERATION_SET, key, "somevalue", &i) == ENGINE_SUCCESS,
+          "Failed set.");
+    h1->release(h, NULL, i);
+    check(store(h, h1, NULL, OPERATION_SET, key, "somevalue", &i) == ENGINE_SUCCESS,
+          "Failed set.");
+
+    Item *it = reinterpret_cast<Item*>(i);
+    wait_for_flusher_to_settle(h, h1);
 
     check(del(h, h1, key, it->getCas(), 0) == ENGINE_SUCCESS, "Delete failed");
     wait_for_flusher_to_settle(h, h1);
@@ -9138,9 +10507,13 @@ static enum test_result test_add_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h
 
 static enum test_result test_delete_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 
-    const char *key = "delete_with_meta_key";
-    const size_t keylen = strlen(key);
+    const char *key1 = "delete_with_meta_key1";
+    const char *key2 = "delete_with_meta_key2";
+    const char *key3 = "delete_with_meta_key3";
+    const size_t keylen = strlen(key1);
     ItemMetaData itemMeta;
+    uint64_t vb_uuid;
+    uint32_t high_seqno;
     // check the stat
     size_t temp = get_int_stat(h, h1, "ep_num_ops_del_meta");
     check(temp == 0, "Expect zero setMeta ops");
@@ -9153,17 +10526,51 @@ static enum test_result test_delete_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1
 
     // store an item
     item *i = NULL;
-    check(store(h, h1, NULL, OPERATION_SET, key,
+    check(store(h, h1, NULL, OPERATION_SET, key1,
                 "somevalue", &i) == ENGINE_SUCCESS, "Failed set.");
 
+    check(store(h, h1, NULL, OPERATION_SET, key2,
+                "somevalue2", &i) == ENGINE_SUCCESS, "Failed set.");
+
+    check(store(h, h1, NULL, OPERATION_SET, key3,
+                "somevalue3", &i) == ENGINE_SUCCESS, "Failed set.");
+
+    vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    high_seqno = get_ull_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
+
+    const void *cookie = testHarness.create_cookie();
+
     // delete an item with meta data
-    del_with_meta(h, h1, key, keylen, 0, &itemMeta);
+    del_with_meta(h, h1, key1, keylen, 0, &itemMeta, 0, false, false,
+                  0, 0, cookie);
+
+    check(last_uuid == vb_uuid, "Expected valid vbucket uuid");
+    check(last_seqno == high_seqno + 1, "Expected valid sequence number");
     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
     // check the stat
     temp = get_int_stat(h, h1, "ep_num_ops_del_meta");
     check(temp == 1, "Expect more setMeta ops");
 
+    testHarness.set_mutation_extras_handling(cookie, false);
+
+    // delete an item with meta data
+    del_with_meta(h, h1, key2, keylen, 0, &itemMeta, 0, false, false,
+                  0, 0, cookie);
+
+    check(last_uuid == vb_uuid, "Expected same vbucket uuid");
+    check(last_seqno == high_seqno + 1, "Expected same sequence number");
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+
+    // delete an item with meta data
+    del_with_meta(h, h1, key3, keylen, 0, &itemMeta);
+
+    check(last_uuid == vb_uuid, "Expected valid vbucket uuid");
+    check(last_seqno == high_seqno + 3, "Expected valid sequence number");
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+
     h1->release(h, NULL, i);
+
+    testHarness.destroy_cookie(cookie);
     return SUCCESS;
 }
 
@@ -9188,7 +10595,6 @@ static enum test_result test_delete_with_meta_deleted(ENGINE_HANDLE *h,
     wait_for_stat_to_be(h, h1, "curr_items", 0);
 
     // get metadata of deleted key
-
     check(get_meta(h, h1, key), "Expected to get meta");
     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
     check(last_deleted_flag, "Expected deleted flag to be set");
@@ -9297,6 +10703,58 @@ static enum test_result test_delete_with_meta_nonexistent(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+static enum test_result test_delete_with_meta_nonexistent_no_temp(ENGINE_HANDLE *h,
+                                                                  ENGINE_HANDLE_V1 *h1) {
+    const char *key1 = "delete_with_meta_no_temp_key1";
+    const size_t keylen1 = strlen(key1);
+    ItemMetaData itm_meta1;
+
+    // Run compaction to start using the bloomfilter
+    useconds_t sleepTime = 128;
+    compact_db(h, h1, 0, 1, 1, 0);
+    while (get_int_stat(h, h1, "ep_pending_compactions") != 0) {
+        decayingSleep(&sleepTime);
+    }
+
+    // put some random metadata and delete the item with new meta data
+    itm_meta1.revSeqno = 10;
+    itm_meta1.cas = 0xdeadbeef;
+    itm_meta1.exptime = 1735689600; // expires in 2025
+    itm_meta1.flags = 0xdeadbeef;
+
+    // do delete with meta with the correct cas value.
+    // skipConflictResolution false
+    del_with_meta(h, h1, key1, keylen1, 0, &itm_meta1, 0, false);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+    wait_for_flusher_to_settle(h, h1);
+
+    check(get_int_stat(h, h1, "ep_num_ops_del_meta") == 1, "Expect one op");
+    wait_for_stat_to_be(h, h1, "curr_items", 0);
+    check(get_int_stat(h, h1, "curr_temp_items") == 0, "Expected zero temp_items");
+
+    // do delete with meta with the correct cas value.
+    // skipConflictResolution true
+    const char *key2 = "delete_with_meta_no_temp_key2";
+    const size_t keylen2 = strlen(key2);
+    ItemMetaData itm_meta2;
+
+    // put some random metadata and delete the item with new meta data
+    itm_meta2.revSeqno = 10;
+    itm_meta2.cas = 0xdeadbeef;
+    itm_meta2.exptime = 1735689600; // expires in 2025
+    itm_meta2.flags = 0xdeadbeef;
+
+    del_with_meta(h, h1, key2, keylen2, 0, &itm_meta2, 0, true);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+    wait_for_flusher_to_settle(h, h1);
+
+    check(get_int_stat(h, h1, "ep_num_ops_del_meta") == 2, "Expect one op");
+    wait_for_stat_to_be(h, h1, "curr_items", 0);
+    check(get_int_stat(h, h1, "curr_temp_items") == 0, "Expected zero temp_items");
+
+    return SUCCESS;
+}
+
 static enum test_result test_delete_with_meta_race_with_set(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)
 {
     char const *key1 = "key1";
@@ -9371,6 +10829,7 @@ static enum test_result test_delete_with_meta_race_with_delete(ENGINE_HANDLE *h,
     uint16_t keylen2 = (uint16_t)strlen(key2);
     item *i = NULL;
     ItemMetaData itm_meta;
+    itm_meta.cas = 0x1;
     // check the stat
     size_t temp = get_int_stat(h, h1, "ep_num_ops_del_meta");
     check(temp == 0, "Expect zero ops");
@@ -9449,6 +10908,8 @@ static enum test_result test_set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h
     const char* val = "somevalue";
     const char* newVal = "someothervalue";
     size_t newValLen = strlen(newVal);
+    uint64_t vb_uuid;
+    uint32_t high_seqno;
 
     // check the stat
     check(get_int_stat(h, h1, "ep_num_ops_set_meta") == 0, "Expect zero ops");
@@ -9491,9 +10952,18 @@ static enum test_result test_set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h
     // check the stat
     check(get_int_stat(h, h1, "ep_num_ops_set_meta") == 0, "Failed op does not count");
 
+    vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    high_seqno = get_ull_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
+
+    const void *cookie = testHarness.create_cookie();
+
     // do set with meta with the correct cas value. should pass.
-    set_with_meta(h, h1, key, keylen, newVal, newValLen, 0, &itm_meta, cas_for_set);
+    set_with_meta(h, h1, key, keylen, newVal, newValLen, 0, &itm_meta, cas_for_set,
+                  false, 0, false, 0, 0, cookie);
     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+    check(last_uuid == vb_uuid, "Expected valid vbucket uuid");
+    check(last_seqno == high_seqno + 1, "Expected valid sequence number");
+
     // check the stat
     check(get_int_stat(h, h1, "ep_num_ops_set_meta") == 1, "Expect some ops");
     check(get_int_stat(h, h1, "curr_items") == 1, "Expect one item");
@@ -9506,11 +10976,29 @@ static enum test_result test_set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h
     check(last_meta.cas == 0xdeadbeef, "Expected cas to match");
     check(last_meta.flags == 0xdeadbeef, "Expected flags to match");
 
+    //disable getting vb uuid and seqno as extras
+    testHarness.set_mutation_extras_handling(cookie, false);
+    itm_meta.revSeqno++;
+    cas_for_set = last_meta.cas;
+    set_with_meta(h, h1, key, keylen, newVal, newValLen, 0, &itm_meta, cas_for_set,
+                  false, 0, false, 0, 0, cookie);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+    check(last_uuid == vb_uuid, "Expected same vbucket uuid");
+    check(last_seqno == high_seqno + 1, "Expected same sequence number");
+
+    itm_meta.revSeqno++;
+    cas_for_set = last_meta.cas;
+    set_with_meta(h, h1, key, keylen, newVal, newValLen, 0, &itm_meta, cas_for_set);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+    check(last_uuid == vb_uuid, "Expected valid vbucket uuid");
+    check(last_seqno == high_seqno + 3, "Expected valid sequence number");
+
     // Make sure the item expiration was processed correctly
     testHarness.time_travel(301);
     check(h1->get(h, NULL, &i, key, keylen, 0) == ENGINE_KEY_ENOENT, "Failed to get value.");
 
     h1->release(h, NULL, i);
+    testHarness.destroy_cookie(cookie);
     return SUCCESS;
 }
 
@@ -9634,7 +11122,7 @@ static enum test_result test_set_with_meta_nonexistent(ENGINE_HANDLE *h, ENGINE_
     check(!get_meta(h, h1, key), "Expected get meta to return false");
     check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, "Expected enoent");
     check(get_int_stat(h, h1, "curr_items") == 0, "Expected zero curr_items");
-    check(get_int_stat(h, h1, "curr_temp_items") == 1, "Expected single temp_items");
+    check(get_int_stat(h, h1, "curr_temp_items") == 1, "Expected no temp_items");
 
     // this is the cas to be used with a subsequent set with meta
     uint64_t cas_for_set = last_cas;
@@ -9967,6 +11455,67 @@ static enum test_result test_set_meta_conflict_resolution(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+static enum test_result test_set_meta_lww_conflict_resolution(ENGINE_HANDLE *h,
+                                                              ENGINE_HANDLE_V1 *h1) {
+    // put some random metadata
+    ItemMetaData itemMeta;
+    itemMeta.revSeqno = 10;
+    itemMeta.cas = 0xdeadbeef;
+    itemMeta.exptime = 0;
+    itemMeta.flags = 0xdeadbeef;
+
+    //Set initial drift and enable time synchronization
+    set_drift_counter_state(h, h1, 0, 0x01);
+
+    check(get_int_stat(h, h1, "ep_num_ops_set_meta") == 0,
+          "Expect zero setMeta ops");
+
+    set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
+                  PROTOCOL_BINARY_RAW_BYTES, true, gethrtime(), 1);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+    check(get_int_stat(h, h1, "ep_bg_meta_fetched") == 1,
+          "Expected one bg meta fetch");
+
+    // Check all meta data is the same
+    set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
+                  PROTOCOL_BINARY_RAW_BYTES, true, gethrtime(), 1);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, "Expected exists");
+    check(get_int_stat(h, h1, "ep_num_ops_set_meta_res_fail") == 1,
+          "Expected set meta conflict resolution failure");
+
+    // Check that an older cas fails
+    itemMeta.cas = 0xdeadbeee;
+    set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
+                  PROTOCOL_BINARY_RAW_BYTES, true, gethrtime(), 1);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, "Expected exists");
+    check(get_int_stat(h, h1, "ep_num_ops_set_meta_res_fail") == 2,
+          "Expected set meta conflict resolution failure");
+
+    // Check that a higher cas passes
+    itemMeta.cas = 0xdeadbeff;
+    set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
+                  PROTOCOL_BINARY_RAW_BYTES, true, gethrtime(), 1);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+
+    // Check that a higher cas, lower rev seqno and conflict resolution
+    // with revision seqno will fail
+    itemMeta.cas = 0xdeadbfff;
+    itemMeta.revSeqno = 9;
+    set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
+                 PROTOCOL_BINARY_RAW_BYTES, true, gethrtime(), 0);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, "Expected exists");
+    check(get_int_stat(h, h1, "ep_num_ops_set_meta_res_fail") == 3,
+          "Expected set meta conflict resolution failure");
+
+    // Check that a lower cas, higher rev seqno and conflict resolution
+    // with revision seqno will pass
+    itemMeta.revSeqno = 11;
+    set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
+                 PROTOCOL_BINARY_RAW_BYTES, true, gethrtime(), 0);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+    return SUCCESS;
+}
+
 static enum test_result test_del_meta_conflict_resolution(ENGINE_HANDLE *h,
                                                           ENGINE_HANDLE_V1 *h1) {
 
@@ -9992,129 +11541,478 @@ static enum test_result test_del_meta_conflict_resolution(ENGINE_HANDLE *h,
     del_with_meta(h, h1, "key", 3, 0, &itemMeta);
     check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, "Expected exists");
     check(get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail") == 1,
-          "Expected set meta conflict resolution failure");
+          "Expected delete meta conflict resolution failure");
 
     // Check has older flags fails
     itemMeta.flags = 0xdeadbeee;
     del_with_meta(h, h1, "key", 3, 0, &itemMeta);
     check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, "Expected exists");
     check(get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail") == 2,
-          "Expected set meta conflict resolution failure");
+          "Expected delete meta conflict resolution failure");
 
     // Check that smaller exptime loses
     itemMeta.exptime = 0;
     del_with_meta(h, h1, "key", 3, 0, &itemMeta);
     check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, "Expected exists");
     check(get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail") == 3,
-          "Expected set meta conflict resolution failure");
+          "Expected delete meta conflict resolution failure");
 
     // Check testing with old seqno
     itemMeta.revSeqno--;
     del_with_meta(h, h1, "key", 3, 0, &itemMeta);
     check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, "Expected exists");
     check(get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail") == 4,
-          "Expected set meta conflict resolution failure");
+          "Expected delete meta conflict resolution failure");
 
     itemMeta.revSeqno += 10;
     del_with_meta(h, h1, "key", 3, 0, &itemMeta);
     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
     check(get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail") == 4,
-          "Expected set meta conflict resolution failure");
+          "Expected delete meta conflict resolution failure");
 
     return SUCCESS;
 }
 
-// ------------------------------ end of XDCR unit tests -----------------------//
+static enum test_result test_del_meta_lww_conflict_resolution(ENGINE_HANDLE *h,
+                                                              ENGINE_HANDLE_V1 *h1) {
 
-static enum test_result test_observe_no_data(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
-    std::map<std::string, uint16_t> obskeys;
-    observe(h, h1, obskeys);
-    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
-    return SUCCESS;
-}
+    item *i = NULL;
+    item_info info;
 
-static enum test_result test_observe_single_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
-    stop_persistence(h, h1);
+    set_drift_counter_state(h, h1, 0, 0x01);
 
-    // Set an item
-    item *it = NULL;
-    uint64_t cas1;
-    check(h1->allocate(h, NULL, &it, "key", 3, 100, 0, 0,
-          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
-    check(h1->store(h, NULL, it, &cas1, OPERATION_SET, 0)== ENGINE_SUCCESS,
-          "Set should work.");
-    h1->release(h, NULL, it);
+    check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
+          "Failed set.");
 
-    // Do an observe
-    std::map<std::string, uint16_t> obskeys;
-    obskeys["key"] = 0;
-    observe(h, h1, obskeys);
+    info.nvalue = 1;
+    h1->get_item_info(h, NULL, i, &info);
+    wait_for_flusher_to_settle(h, h1);
+    h1->release(h, NULL, i);
+
+    // put some random metadata
+    ItemMetaData itemMeta;
+    itemMeta.revSeqno = 10;
+    itemMeta.cas = info.cas + 1;
+    itemMeta.exptime = 0;
+    itemMeta.flags = 0xdeadbeef;
+
+    del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime(), 1);
     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+    wait_for_flusher_to_settle(h, h1);
+    wait_for_stat_to_be(h, h1, "curr_items", 0);
 
-    // Check that the key is not persisted
-    uint16_t vb;
-    uint16_t keylen;
-    char key[3];
-    uint8_t persisted;
-    uint64_t cas;
+    // Check all meta data is the same
+    del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime(), 1);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, "Expected exists");
+    check(get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail") == 1,
+          "Expected delete meta conflict resolution failure");
 
-    memcpy(&vb, last_body.data(), sizeof(uint16_t));
-    check(ntohs(vb) == 0, "Wrong vbucket in result");
-    memcpy(&keylen, last_body.data() + 2, sizeof(uint16_t));
-    check(ntohs(keylen) == 3, "Wrong keylen in result");
-    memcpy(&key, last_body.data() + 4, ntohs(keylen));
-    check(strncmp(key, "key", 3) == 0, "Wrong key in result");
-    memcpy(&persisted, last_body.data() + 7, sizeof(uint8_t));
-    check(persisted == OBS_STATE_NOT_PERSISTED, "Expected persisted in result");
-    memcpy(&cas, last_body.data() + 8, sizeof(uint64_t));
-    check(ntohll(cas) == cas1, "Wrong cas in result");
+    // Check that higher rev seqno but lower cas fails
+    itemMeta.cas = info.cas;
+    itemMeta.revSeqno = 11;
+    del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime(), 1);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, "Expected exists");
+    check(get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail") == 2,
+          "Expected delete meta conflict resolution failure");
+
+    // Check that a higher cas and lower rev seqno passes
+    itemMeta.cas = info.cas + 2;
+    itemMeta.revSeqno = 9;
+    del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime(), 1);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected sucess");
+
+    // Check that a higher rev seqno and lower cas and conflict resolution of
+    // revision seqno passes
+    itemMeta.revSeqno = 10;
+    itemMeta.cas = info.cas + 1;
+    del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime(), 0);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+
+    // Check that a lower rev seqno and higher cas and conflict resolution of
+    // revision seqno fails
+    itemMeta.revSeqno = 9;
+    itemMeta.cas = info.cas + 2;
+    del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime(), 0);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, "Expected exists");
+    check(get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail") == 3,
+          "Expected delete meta conflict resolution failure");
 
     return SUCCESS;
 }
 
-static enum test_result test_observe_multi_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
-    // Create some vbuckets
-    check(set_vbucket_state(h, h1, 1, vbucket_state_active), "Failed to set vbucket state.");
+static enum test_result test_adjusted_time_apis(ENGINE_HANDLE *h,
+                                                ENGINE_HANDLE_V1 *h1) {
 
-    // Set some keys to observe
-    item *it = NULL;
-    uint64_t cas1, cas2, cas3;
-    check(h1->allocate(h, NULL, &it, "key1", 4, 100, 0, 0,
-          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
-    check(h1->store(h, NULL, it, &cas1, OPERATION_SET, 0)== ENGINE_SUCCESS,
-          "Set should work.");
-    h1->release(h, NULL, it);
+    int64_t adjusted_time1, adjusted_time2;
+    protocol_binary_request_header *request;
 
-    check(h1->allocate(h, NULL, &it, "key2", 4, 100, 0, 0,
-          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
-    check(h1->store(h, NULL, it, &cas2, OPERATION_SET, 1)== ENGINE_SUCCESS,
-          "Set should work.");
-    h1->release(h, NULL, it);
+    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
+                           NULL, 0, NULL, 0);
+    h1->unknown_command(h, NULL, request, add_response);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED,
+            "Expected Not Supported, as Time sync hasn't been enabled yet");
 
-    check(h1->allocate(h, NULL, &it, "key3", 4, 100, 0, 0,
-          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
-    check(h1->store(h, NULL, it, &cas3, OPERATION_SET, 1)== ENGINE_SUCCESS,
-          "Set should work.");
-    h1->release(h, NULL, it);
+    set_drift_counter_state(h, h1, 1000, 0x01);
 
-    wait_for_stat_to_be(h, h1, "ep_total_persisted", 3);
+    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
+                           NULL, 0, NULL, 0);
+    h1->unknown_command(h, NULL, request, add_response);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
+            "Expected Success");
+    checkeq(sizeof(int64_t), last_body.size(),
+            "Bodylen didn't match expected value");
+    memcpy(&adjusted_time1, last_body.data(), last_body.size());
+    adjusted_time1 = ntohll(adjusted_time1);
 
-    // Do observe
-    std::map<std::string, uint16_t> obskeys;
-    obskeys["key1"] = 0;
-    obskeys["key2"] = 1;
-    obskeys["key3"] = 1;
-    observe(h, h1, obskeys);
+    set_drift_counter_state(h, h1, 1000000, 0x01);
+
+    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
+                           NULL, 0, NULL, 0);
+    h1->unknown_command(h, NULL, request, add_response);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
+            "Expected Success");
+    checkeq(sizeof(int64_t), last_body.size(),
+            "Bodylen didn't match expected value");
+    memcpy(&adjusted_time2, last_body.data(), last_body.size());
+    adjusted_time2 = ntohll(adjusted_time2);
+
+    // adjusted_time2 should be greater than adjusted_time1 marginally
+    // by adjusted_time1 + (difference in the 2 driftCounts set previously)
+    check(adjusted_time2 >= adjusted_time1 + 999000,
+            "Adjusted_time2: now what expected");
+
+    // Test sending adjustedTime with SetWithMeta
+    ItemMetaData itm_meta;
+    itm_meta.flags = 0xdeadbeef;
+    itm_meta.exptime = 0;
+    itm_meta.revSeqno = 10;
+    itm_meta.cas = 0xdeadbeef;
+    set_with_meta(h, h1, "key", 3, "value", 5, 0, &itm_meta, last_cas,
+                  false, 0x00, true, adjusted_time2 * 2);
+    wait_for_flusher_to_settle(h, h1);
+
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
+            "Expected a SUCCESS");
+    check_key_value(h, h1, "key", "value", 5, 0);
+
+    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
+            NULL, 0, NULL, 0);
+    h1->unknown_command(h, NULL, request, add_response);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
+            "Expected Success");
+    checkeq(sizeof(int64_t), last_body.size(),
+            "Bodylen didn't match expected value");
+    memcpy(&adjusted_time1, last_body.data(), last_body.size());
+    adjusted_time1 = ntohll(adjusted_time1);
+
+    // Check that adjusted_time1 should be marginally greater than
+    // adjusted_time2 * 2
+    check(adjusted_time1 >= adjusted_time2 * 2,
+            "Adjusted_time1: not what is expected");
+
+    // Test sending adjustedTime with DelWithMeta
+    item *i = NULL;
+    check(store(h, h1, NULL, OPERATION_SET, "key2", "value2", &i) == ENGINE_SUCCESS,
+            "Failed set.");
+    h1->release(h, NULL, i);
+    del_with_meta(h, h1, "key2", 4, 0, &itm_meta, last_cas, false,
+                  true, adjusted_time1 * 2);
+    wait_for_flusher_to_settle(h, h1);
     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
 
-    // Check the result
-    uint16_t vb;
-    uint16_t keylen;
-    char key[10];
-    uint8_t persisted;
-    uint64_t cas;
+    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
+            NULL, 0, NULL, 0);
+    h1->unknown_command(h, NULL, request, add_response);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
+            "Expected Success");
+    checkeq(sizeof(int64_t), last_body.size(),
+            "Bodylen didn't match expected value");
+    memcpy(&adjusted_time2, last_body.data(), last_body.size());
+    adjusted_time2 = ntohll(adjusted_time2);
 
-    memcpy(&vb, last_body.data(), sizeof(uint16_t));
+    // Check that adjusted_time2 should be marginally greater than
+    // adjusted_time1 * 2
+    check(adjusted_time2 >= adjusted_time1 * 2,
+            "Adjusted_time2: not what is expected");
+
+    return SUCCESS;
+}
+
+// ------------------------------ end of XDCR unit tests -----------------------//
+
+static enum test_result test_observe_no_data(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+    std::map<std::string, uint16_t> obskeys;
+    observe(h, h1, obskeys);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+    return SUCCESS;
+}
+
+static enum test_result test_observe_seqno_basic_tests(ENGINE_HANDLE *h,
+                                                       ENGINE_HANDLE_V1 *h1) {
+    // Check observe seqno for vbucket with id 1
+    check(set_vbucket_state(h, h1, 1, vbucket_state_active), "Failed to set vbucket state.");
+
+    //Check the output when there is no data in the vbucket
+    uint64_t vb_uuid = get_ull_stat(h, h1, "vb_1:0:id", "failovers");
+    uint64_t high_seqno = get_int_stat(h, h1, "vb_1:high_seqno", "vbucket-seqno");
+    observe_seqno(h, h1, 1, vb_uuid);
+
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+
+    check_observe_seqno(false, 0, 1, vb_uuid, high_seqno, high_seqno);
+
+    //Add some mutations and verify the output
+    int num_items = 10;
+    for (int j = 0; j < num_items; ++j) {
+        // Set an item
+        item *it = NULL;
+        std::stringstream ss;
+        ss << "key" << j;
+        uint64_t cas1;
+        check(h1->allocate(h, NULL, &it, ss.str().c_str(), 4, 100, 0, 0,
+              PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
+        check(h1->store(h, NULL, it, &cas1, OPERATION_SET, 1)== ENGINE_SUCCESS,
+              "Expected set to succeed");
+        h1->release(h, NULL, it);
+    }
+
+    wait_for_flusher_to_settle(h, h1);
+
+    int total_persisted = get_int_stat(h, h1, "ep_total_persisted");
+    high_seqno = get_int_stat(h, h1, "vb_1:high_seqno", "vbucket-seqno");
+
+    check(total_persisted == num_items,
+          "Expected ep_total_persisted equals the number of items");
+
+    observe_seqno(h, h1, 1, vb_uuid);
+
+    check_observe_seqno(false, 0, 1, vb_uuid, total_persisted, high_seqno);
+    //Stop persistence. Add more mutations and check observe result
+    stop_persistence(h, h1);
+
+    num_items = 20;
+    for (int j = 10; j < num_items; ++j) {
+        // Set an item
+        item *it = NULL;
+        std::stringstream ss;
+        ss << "key" << j;
+        uint64_t cas1;
+        check(h1->allocate(h, NULL, &it, ss.str().c_str(), 5, 100, 0, 0,
+              PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
+        check(h1->store(h, NULL, it, &cas1, OPERATION_SET, 1)== ENGINE_SUCCESS,
+              "Expected set to succeed");
+        h1->release(h, NULL, it);
+    }
+
+    high_seqno = get_int_stat(h, h1, "vb_1:high_seqno", "vbucket-seqno");
+    observe_seqno(h, h1, 1, vb_uuid);
+
+    check_observe_seqno(false, 0, 1, vb_uuid, total_persisted, high_seqno);
+    start_persistence(h, h1);
+    wait_for_flusher_to_settle(h, h1);
+    total_persisted = get_int_stat(h, h1, "ep_total_persisted");
+
+    observe_seqno(h, h1, 1, vb_uuid);
+
+    check_observe_seqno(false, 0, 1, vb_uuid, total_persisted, high_seqno);
+    return SUCCESS;
+}
+
+static enum test_result test_observe_seqno_failover(ENGINE_HANDLE *h,
+                                                    ENGINE_HANDLE_V1 *h1) {
+    int num_items = 10;
+    for (int j = 0; j < num_items; ++j) {
+        // Set an item
+        item *it = NULL;
+        std::stringstream ss;
+        ss << "key" << j;
+        uint64_t cas1;
+        check(h1->allocate(h, NULL, &it, ss.str().c_str(), 4, 100, 0, 0,
+              PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
+        check(h1->store(h, NULL, it, &cas1, OPERATION_SET, 0)== ENGINE_SUCCESS,
+              "Expected set to succeed");
+        h1->release(h, NULL, it);
+    }
+
+    wait_for_flusher_to_settle(h, h1);
+
+    uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    uint64_t high_seqno = get_int_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
+
+    // restart
+    testHarness.reload_engine(&h, &h1,
+                              testHarness.engine_path,
+                              testHarness.get_current_testcase()->cfg,
+                              true, true);
+    wait_for_warmup_complete(h, h1);
+
+    uint64_t new_vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+
+    observe_seqno(h, h1, 0, vb_uuid);
+
+    check_observe_seqno(true, 1, 0, new_vb_uuid, high_seqno, high_seqno,
+                        vb_uuid, high_seqno);
+
+    return SUCCESS;
+}
+
+static enum test_result test_observe_seqno_error(ENGINE_HANDLE *h,
+                                                 ENGINE_HANDLE_V1 *h1) {
+
+    //not my vbucket test
+    uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    observe_seqno(h, h1, 10, vb_uuid);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET,
+          "Expected not my vbucket");
+
+    //invalid uuid for vbucket
+    vb_uuid = 0xdeadbeef;
+    std::stringstream invalid_data;
+    invalid_data.write((char *) &vb_uuid, sizeof(uint64_t));
+
+    protocol_binary_request_header *request;
+
+    request = createPacket(PROTOCOL_BINARY_CMD_OBSERVE_SEQNO, 0, 0, NULL, 0,
+                           NULL, 0, invalid_data.str().data(),
+                           invalid_data.str().length());
+    h1->unknown_command(h, NULL, request, add_response);
+
+    check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
+          "Expected vb uuid not found");
+
+    return SUCCESS;
+}
+
+static enum test_result test_observe_single_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+    stop_persistence(h, h1);
+
+    // Set an item
+    item *it = NULL;
+    uint64_t cas1;
+    check(h1->allocate(h, NULL, &it, "key", 3, 100, 0, 0,
+          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
+    check(h1->store(h, NULL, it, &cas1, OPERATION_SET, 0)== ENGINE_SUCCESS,
+          "Set should work.");
+    h1->release(h, NULL, it);
+
+    // Do an observe
+    std::map<std::string, uint16_t> obskeys;
+    obskeys["key"] = 0;
+    observe(h, h1, obskeys);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+
+    // Check that the key is not persisted
+    uint16_t vb;
+    uint16_t keylen;
+    char key[3];
+    uint8_t persisted;
+    uint64_t cas;
+
+    memcpy(&vb, last_body.data(), sizeof(uint16_t));
+    check(ntohs(vb) == 0, "Wrong vbucket in result");
+    memcpy(&keylen, last_body.data() + 2, sizeof(uint16_t));
+    check(ntohs(keylen) == 3, "Wrong keylen in result");
+    memcpy(&key, last_body.data() + 4, ntohs(keylen));
+    check(strncmp(key, "key", 3) == 0, "Wrong key in result");
+    memcpy(&persisted, last_body.data() + 7, sizeof(uint8_t));
+    check(persisted == OBS_STATE_NOT_PERSISTED, "Expected persisted in result");
+    memcpy(&cas, last_body.data() + 8, sizeof(uint64_t));
+    check(ntohll(cas) == cas1, "Wrong cas in result");
+
+    return SUCCESS;
+}
+
+static enum test_result test_observe_temp_item(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+    char const *k1 = "key";
+    item *i = NULL;
+
+    check(store(h, h1, NULL, OPERATION_SET, k1, "somevalue", &i) == ENGINE_SUCCESS,
+          "Failed set.");
+    h1->release(h, NULL, i);
+    wait_for_flusher_to_settle(h, h1);
+
+    check(del(h, h1, k1, 0, 0) == ENGINE_SUCCESS, "Delete failed");
+    wait_for_flusher_to_settle(h, h1);
+    wait_for_stat_to_be(h, h1, "curr_items", 0);
+
+    check(get_meta(h, h1, k1), "Expected to get meta");
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+    check(last_deleted_flag, "Expected deleted flag to be set");
+    check(get_int_stat(h, h1, "curr_items") == 0, "Expected zero curr_items");
+
+    // Make sure there is one temp_item
+    check(get_int_stat(h, h1, "curr_temp_items") == 1, "Expected single temp_items");
+
+    // Do an observe
+    std::map<std::string, uint16_t> obskeys;
+    obskeys["key"] = 0;
+    observe(h, h1, obskeys);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+
+    // Check that the key is not found
+    uint16_t vb;
+    uint16_t keylen;
+    char key[3];
+    uint8_t persisted;
+    uint64_t cas;
+
+    memcpy(&vb, last_body.data(), sizeof(uint16_t));
+    check(ntohs(vb) == 0, "Wrong vbucket in result");
+    memcpy(&keylen, last_body.data() + 2, sizeof(uint16_t));
+    check(ntohs(keylen) == 3, "Wrong keylen in result");
+    memcpy(&key, last_body.data() + 4, ntohs(keylen));
+    check(strncmp(key, "key", 3) == 0, "Wrong key in result");
+    memcpy(&persisted, last_body.data() + 7, sizeof(uint8_t));
+    check(persisted == OBS_STATE_NOT_FOUND, "Expected NOT_FOUND in result");
+    memcpy(&cas, last_body.data() + 8, sizeof(uint64_t));
+    check(ntohll(cas) == 0, "Wrong cas in result");
+
+    return SUCCESS;
+}
+
+static enum test_result test_observe_multi_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+    // Create some vbuckets
+    check(set_vbucket_state(h, h1, 1, vbucket_state_active), "Failed to set vbucket state.");
+
+    // Set some keys to observe
+    item *it = NULL;
+    uint64_t cas1, cas2, cas3;
+    check(h1->allocate(h, NULL, &it, "key1", 4, 100, 0, 0,
+          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
+    check(h1->store(h, NULL, it, &cas1, OPERATION_SET, 0)== ENGINE_SUCCESS,
+          "Set should work.");
+    h1->release(h, NULL, it);
+
+    check(h1->allocate(h, NULL, &it, "key2", 4, 100, 0, 0,
+          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
+    check(h1->store(h, NULL, it, &cas2, OPERATION_SET, 1)== ENGINE_SUCCESS,
+          "Set should work.");
+    h1->release(h, NULL, it);
+
+    check(h1->allocate(h, NULL, &it, "key3", 4, 100, 0, 0,
+          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
+    check(h1->store(h, NULL, it, &cas3, OPERATION_SET, 1)== ENGINE_SUCCESS,
+          "Set should work.");
+    h1->release(h, NULL, it);
+
+    wait_for_stat_to_be(h, h1, "ep_total_persisted", 3);
+
+    // Do observe
+    std::map<std::string, uint16_t> obskeys;
+    obskeys["key1"] = 0;
+    obskeys["key2"] = 1;
+    obskeys["key3"] = 1;
+    observe(h, h1, obskeys);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
+
+    // Check the result
+    uint16_t vb;
+    uint16_t keylen;
+    char key[10];
+    uint8_t persisted;
+    uint64_t cas;
+
+    memcpy(&vb, last_body.data(), sizeof(uint16_t));
     check(ntohs(vb) == 0, "Wrong vbucket in result");
     memcpy(&keylen, last_body.data() + 2, sizeof(uint16_t));
     check(ntohs(keylen) == 4, "Wrong keylen in result");
@@ -10330,7 +12228,9 @@ static enum test_result test_CBD_152(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
           "Failed to set flushall_enabled param");
     // flush should succeed
+    set_degraded_mode(h, h1, NULL, true);
     check(h1->flush(h, NULL, 0) == ENGINE_SUCCESS, "Flush should be enabled");
+    set_degraded_mode(h, h1, NULL, false);
     //expect missing key
     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
 
@@ -10368,46 +12268,52 @@ static enum test_result test_control_data_traffic(ENGINE_HANDLE *h, ENGINE_HANDL
 }
 
 static enum test_result test_item_pager(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
-    std::vector<std::string> keys;
+
+    // 1. Create enough 1KB items to hit the high watermark (i.e. get TEMP_OOM).
     char data[1024];
     memset(&data, 'x', sizeof(data)-1);
     data[1023] = '\0';
 
-    for (int j = 0; j < 100; ++j) {
+    // Create documents, until we hit TempOOM. Due to accurate memory tracking
+    // & overheads it's impossible to exactly predict how many we will need...
+    int docs_stored = 0;
+    for (int j = 0; ; ++j) {
         std::stringstream ss;
         ss << "key-" << j;
         std::string key(ss.str());
-        keys.push_back(key);
-    }
 
-    std::vector<std::string>::iterator it;
-    for (it = keys.begin(); it != keys.end(); ++it) {
         item *i;
-        check(store(h, h1, NULL, OPERATION_SET, it->c_str(), data, &i)
-              == ENGINE_SUCCESS, "Failed to store a value");
+        ENGINE_ERROR_CODE err = store(h, h1, NULL, OPERATION_SET, key.c_str(),
+                                      data, &i);
         h1->release(h, NULL, i);
-        // Reference each item multiple times.
-        for (int k = 0; k < 5; ++k) {
-            check(h1->get(h, NULL, &i, it->c_str(), strlen(it->c_str()), 0) == ENGINE_SUCCESS,
-                  "Failed to get value.");
-            h1->release(h, NULL, i);
+
+        check(err == ENGINE_SUCCESS || err == ENGINE_TMPFAIL,
+              "Failed to store a value");
+        if (err == ENGINE_TMPFAIL) {
+            break;
         }
+        docs_stored++;
     }
-    keys.clear();
 
-    for (int j = 100; j < 200;  ++j) {
+    // We should have stored at least a reasonable number of docs so we can
+    // then have NRU act on 50% of them.
+    check(docs_stored > 10,
+          "Failed to store enough documents before hitting TempOOM\n");
+
+    // Reference the first 50% of the stored documents making them have a
+    // lower NRU and not candidates for ejection.
+    for (int j = 0; j < docs_stored / 2; ++j) {
         std::stringstream ss;
         ss << "key-" << j;
         std::string key(ss.str());
-        keys.push_back(key);
-    }
-
-    for (it = keys.begin(); it != keys.end(); ++it) {
-        item *i;
-        store(h, h1, NULL, OPERATION_SET, it->c_str(), data, &i);
-        h1->release(h, NULL, i);
+        // Reference each stored item multiple times.
+        for (int k = 0; k < 5; ++k) {
+            item *i;
+            check(h1->get(h, NULL, &i, key.c_str(), key.length(), 0) == ENGINE_SUCCESS,
+                  "Failed to get value.");
+            h1->release(h, NULL, i);
+        }
     }
-    keys.clear();
 
     testHarness.time_travel(5);
 
@@ -10430,19 +12336,21 @@ static enum test_result test_item_pager(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)
     }
 #endif
 
-    check(get_int_stat(h, h1, "ep_num_non_resident") > 0,
-          "Expect some non-resident items");
+    int num_non_resident = get_int_stat(h, h1, "ep_num_non_resident");
 
-    for (int j = 0; j < 100; ++j) {
+    if (num_non_resident == 0) {
+        wait_for_stat_change(h, h1, "ep_num_non_resident", 0);
+    }
+
+    // Check we can successfully fetch all of the documents (even ones not
+    // resident).
+    for (int j = 0; j < docs_stored; ++j) {
         std::stringstream ss;
         ss << "key-" << j;
         std::string key(ss.str());
-        keys.push_back(key);
-    }
 
-    for (it = keys.begin(); it != keys.end(); ++it) {
         item *i;
-        check(h1->get(h, NULL, &i, it->c_str(), strlen(it->c_str()), 0) == ENGINE_SUCCESS,
+        check(h1->get(h, NULL, &i, key.c_str(), key.length(), 0) == ENGINE_SUCCESS,
              "Failed to get value.");
         h1->release(h, NULL, i);
     }
@@ -10788,6 +12696,7 @@ static enum test_result test_est_vb_move(ENGINE_HANDLE *h,
             break;
         case TAP_CHECKPOINT_START:
         case TAP_CHECKPOINT_END:
+            h1->release(h, NULL, it);
             break;
         case TAP_MUTATION:
         case TAP_DELETION:
@@ -10803,6 +12712,7 @@ static enum test_result test_est_vb_move(ENGINE_HANDLE *h,
                 remaining = 10 - total_sent;
                 check(chk_items == remaining, "Invalid Estimate of chk items");
             }
+            h1->release(h, NULL, it);
             break;
         default:
             std::cerr << "Unexpected event:  " << event << std::endl;
@@ -11153,6 +13063,123 @@ static enum test_result test_setWithMeta_with_item_eviction(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+struct multi_meta_args {
+    ENGINE_HANDLE *h;
+    ENGINE_HANDLE_V1 *h1;
+    int start;
+    int end;
+};
+
+extern "C" {
+    static void multi_set_with_meta(void *args) {
+        struct multi_meta_args *mma = static_cast<multi_meta_args *>(args);
+
+        for (int i = mma->start; i < mma->end; i++) {
+            // init some random metadata
+            ItemMetaData itm_meta;
+            itm_meta.revSeqno = 10;
+            itm_meta.cas = 0xdeadbeef;
+            itm_meta.exptime = 0;
+            itm_meta.flags = 0xdeadbeef;
+
+            std::stringstream key;
+            key << "key" << i;
+
+            set_with_meta(mma->h, mma->h1, key.str().c_str(),
+                          key.str().length(), "somevalueEdited", 15,
+                          0, &itm_meta, last_cas);
+        }
+    }
+
+    static void multi_del_with_meta(void *args) {
+        struct multi_meta_args *mma = static_cast<multi_meta_args *>(args);
+
+        for (int i = mma->start; i < mma->end; i++) {
+            // init some random metadata
+            ItemMetaData itm_meta;
+            itm_meta.revSeqno = 10;
+            itm_meta.cas = 0xdeadbeef;
+            itm_meta.exptime = 0;
+            itm_meta.flags = 0xdeadbeef;
+
+            std::stringstream key;
+            key << "key" << i;
+
+            del_with_meta(mma->h, mma->h1, key.str().c_str(),
+                          key.str().length(), 0, &itm_meta, last_cas);
+        }
+    }
+}
+
+static enum test_result test_multiple_set_delete_with_metas_full_eviction(
+                                    ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+
+    check(h1->get_stats(h, NULL, NULL, 0, add_stats) == ENGINE_SUCCESS,
+            "Failed to get stats");
+    std::string eviction_policy = vals.find("ep_item_eviction_policy")->second;
+    cb_assert(eviction_policy == "full_eviction");
+
+    int i = 0;
+    while(i < 1000) {
+        uint64_t cas_for_set = last_cas;
+        // init some random metadata
+        ItemMetaData itm_meta;
+        itm_meta.revSeqno = 10;
+        itm_meta.cas = 0xdeadbeef;
+        itm_meta.exptime = 0;
+        itm_meta.flags = 0xdeadbeef;
+
+        std::stringstream key;
+        key << "key" << i;
+
+        set_with_meta(h, h1, key.str().c_str(), key.str().length(),
+                "somevalue", 9, 0, &itm_meta, cas_for_set);
+        i++;
+    }
+
+    wait_for_flusher_to_settle(h, h1);
+
+    int curr_vb_items = get_int_stat(h, h1, "vb_0:num_items", "vbucket-details 0");
+    int num_ops_set_with_meta = get_int_stat(h, h1, "ep_num_ops_set_meta");
+    cb_assert(curr_vb_items == num_ops_set_with_meta && curr_vb_items > 0);
+
+    cb_thread_t thread1, thread2;
+    struct multi_meta_args mma1, mma2;
+    mma1.h = h;
+    mma1.h1 = h1;
+    mma1.start = 0;
+    mma1.end = 100;
+    cb_assert(cb_create_thread(&thread1, multi_set_with_meta, &mma1, 0) == 0);
+
+    mma2.h = h;
+    mma2.h1 = h1;
+    mma2.start = curr_vb_items - 100;
+    mma2.end = curr_vb_items;
+    cb_assert(cb_create_thread(&thread2, multi_del_with_meta, &mma2, 0) == 0);
+
+    cb_assert(cb_join_thread(thread1) == 0);
+    cb_assert(cb_join_thread(thread2) == 0);
+
+    wait_for_flusher_to_settle(h, h1);
+
+    cb_assert(get_int_stat(h, h1, "ep_num_ops_set_meta") > num_ops_set_with_meta);
+    cb_assert(get_int_stat(h ,h1, "ep_num_ops_del_meta") > 0);
+
+    curr_vb_items = get_int_stat(h, h1, "vb_0:num_items", "vbucket-details 0");
+
+    testHarness.reload_engine(&h, &h1,
+                              testHarness.engine_path,
+                              testHarness.get_current_testcase()->cfg,
+                              true, true);
+    wait_for_warmup_complete(h, h1);
+
+    check(get_int_stat(h, h1, "vb_0:num_items", "vbucket-details 0")
+          == curr_vb_items, "Unexpected item count in vbucket");
+
+    return SUCCESS;
+}
+
+
 static enum test_result test_add_with_item_eviction(ENGINE_HANDLE *h,
                                                     ENGINE_HANDLE_V1 *h1) {
     item *i = NULL;
@@ -11294,10 +13321,18 @@ static enum test_result test_del_with_item_eviction(ENGINE_HANDLE *h,
     h1->release(h, NULL, i);
 
     uint64_t cas = 0;
-    check(h1->remove(h, NULL, "key", 3, &cas, 0) == ENGINE_SUCCESS,
+    uint64_t vb_uuid;
+    mutation_descr_t mut_info;
+    uint32_t high_seqno;
+
+    vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    high_seqno = get_ull_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
+    check(h1->remove(h, NULL, "key", 3, &cas, 0, &mut_info) == ENGINE_SUCCESS,
           "Failed remove with value.");
     check(orig_cas + 1 == cas, "Cas mismatch on delete");
     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
+    check(vb_uuid == mut_info.vbucket_uuid, "Expected valid vbucket uuid");
+    check(high_seqno + 1 == mut_info.seqno, "Expected valid sequence number");
 
     return SUCCESS;
 }
@@ -11425,6 +13460,18 @@ static enum test_result test_expired_item_with_item_eviction(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+static enum test_result test_non_existent_get_and_delete(ENGINE_HANDLE *h,
+                                                         ENGINE_HANDLE_V1 *h1) {
+
+    item *i = NULL;
+    check(h1->get(h, NULL, &i, "key1", 4, 0) == ENGINE_KEY_ENOENT,
+            "Unexpected return status");
+    check(get_int_stat(h, h1, "curr_temp_items") == 0, "Unexpected temp item");
+    check(del(h, h1, "key3", 0, 0) == ENGINE_KEY_ENOENT, "Unexpected return status");
+    check(get_int_stat(h, h1, "curr_temp_items") == 0, "Unexpected temp item");
+    return SUCCESS;
+}
+
 static enum test_result test_mb16421(ENGINE_HANDLE *h,
                                      ENGINE_HANDLE_V1 *h1) {
     // Store the item!
@@ -11548,69 +13595,255 @@ static enum test_result test_failover_log_behavior(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
-static enum test_result test_get_all_vb_seqnos(ENGINE_HANDLE *h,
-                                               ENGINE_HANDLE_V1 *h1) {
-    const int num_items = 5;
+/* The Defragmenter (and hence it's unit tests) depend on using jemalloc as the
+ * memory allocator.
+ */
+#if defined(HAVE_JEMALLOC)
+/* Waits for mapped memory value to drop below the specified value, or for
+ * the maximum_sleep_time to be reached.
+ * @returns True if the mapped memory value dropped, or false if the maximum
+ * sleep time was reached.
+ */
+static bool wait_for_mapped_below(size_t mapped_threshold,
+                                  useconds_t max_sleep_time) {
+    useconds_t sleepTime = 128;
+    useconds_t totalSleepTime = 0;
+    while (testHarness.get_mapped_bytes() > mapped_threshold) {
+        decayingSleep(&sleepTime);
+        totalSleepTime += sleepTime;
+        if (totalSleepTime > max_sleep_time) {
+            return false;
+        }
+    }
+    return true;
+}
 
-    /* Replica vbucket 0; snapshot 0 to 10, but write just 1 item */
-    const int rep_vb_num = 0;
-    check(set_vbucket_state(h, h1, rep_vb_num, vbucket_state_replica),
-          "Failed to set vbucket state");
+// Create a number of documents, spanning at least two or more pages, then
+// delete most (but not all) of them - crucially ensure that one document from
+// each page is still present. This will result in the rest of that page
+// being 'wasted'. Then verify that after defragmentation the actual memory
+// usage drops down to (close to) mem_used.
+static enum test_result test_defragmenter(ENGINE_HANDLE *h,
+                                          ENGINE_HANDLE_V1 *h1) {
+    const void *cookie = testHarness.create_cookie();
+
+    // Sanity check - need memory tracker to be able to check our memory usage.
+    check(get_str_stat(h, h1, "ep_mem_tracker_enabled") == "true",
+          "Memory tracker not enabled");
+
+    // Enable vbucket 1 in addition to vbucket zero.
+    const uint16_t num_vbuckets = 2;
+    check(set_vbucket_state(h, h1, 1, vbucket_state_active),
+          "Failed to set vbucket state.");
+
+    // 0. Get baseline memory usage (before creating any objects).
+    //    First ensure stats are up-to-date, by getting directly from the
+    //    allocator (normally they are retrieved periodically by a worker
+    //    thread).
+    size_t mem_used_0 = get_ull_stat(h, h1, "mem_used", NULL);
+    size_t mapped_0 = testHarness.get_mapped_bytes();
+
+    // 1. Create a number of small documents. Doesn't really matter that
+    //    they are small, main thing is we create enough to span multiple
+    //    pages (so we can later leave 'holes' when they are deleted).
+    const size_t size = 128;
+    const size_t num_docs = 40000;
+    std::string data(size, 'x');
+    for (unsigned int i = 0; i < num_docs; i++ ) {
+        // Deliberately using C-style int-to-string conversion (instead of
+        // stringstream) to minimize heap pollution while filling ep_engine.
+        char key[16];
+        snprintf(key, sizeof(key), "%d", i);
+        item *item = NULL;
+        uint16_t vb = i % num_vbuckets;
+        check(storeCasVb11(h, h1, cookie, OPERATION_ADD, key, data.c_str(),
+                           data.length(), 0, &item, 0, vb, 0, 0)
+              == ENGINE_SUCCESS, "Failed to store a value");
+        h1->release(h, NULL, item);
+    }
     wait_for_flusher_to_settle(h, h1);
 
-    const void *cookie = testHarness.create_cookie();
-    uint32_t opaque = 0xFFFF0000;
-    uint32_t flags = 0;
-    std::string name("unittest");
-    uint8_t cas = 0;
-    uint8_t datatype = 1;
-    uint64_t bySeqno = 10;
-    uint64_t revSeqno = 0;
-    uint32_t exprtime = 0;
-    uint32_t lockTime = 0;
+    // Record memory usage after creation.
+    size_t mem_used_1 = get_ull_stat(h, h1, "mem_used", NULL);
+    size_t mapped_1 = testHarness.get_mapped_bytes();
+
+    // Sanity check - mem_used should be at least size * count bytes larger than
+    // initial.
+    check(mem_used_1 >= mem_used_0 + (size * num_docs),
+          "mem_used smaller than expected after creating documents");
+
+    // 2. Determine how many documents are in each page, and then remove all but
+    //    one from each page.
+    size_t num_remaining = num_docs;
+    const size_t LOG_PAGE_SIZE = 12; // 4K page
+    {
+        typedef std::map<uintptr_t, std::vector<int> > page_to_keys_t;
+        page_to_keys_t page_to_keys;
+        // Build a map of pages to keys
+        for (unsigned int i = 0; i < num_docs; i++ ) {
+            char key[16];
+            snprintf(key, sizeof(key), "%d", i);
+            uint16_t vb = i % num_vbuckets;
+
+            item_info info;
+            check(get_item_info(h, h1, &info, key, vb), "Unable to get item_info");
+            check(info.nvalue == 1, "info.nvalue != 1");
+            const uintptr_t page = uintptr_t(info.value[0].iov_base) >> LOG_PAGE_SIZE;
+            page_to_keys[page].emplace_back(i);
+        }
 
-    checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)(name.c_str()),
-                         name.size()),
-            "Failed to open DCP consumer connection!");
-    add_stream_for_consumer(h, h1, cookie, opaque++, rep_vb_num, 0,
-                            PROTOCOL_BINARY_RESPONSE_SUCCESS);
+        // Now remove all but one document from each page.
+        for (page_to_keys_t::iterator kv = page_to_keys.begin();
+             kv != page_to_keys.end();
+             kv++) {
+            // Free all but one document on this page.
+            while (kv->second.size() > 1) {
+                auto doc_id = kv->second.back();
+                char key[16];
+                snprintf(key, sizeof(key), "%d", doc_id);
+                uint16_t vb = doc_id % num_vbuckets;
+
+                uint64_t cas = 0;
+                mutation_descr_t mut_info;
+                check(h1->remove(h, NULL, key, strlen(key), &cas, vb,
+                                 &mut_info) == ENGINE_SUCCESS,
+                      "Failed to remove key.");
+                kv->second.pop_back();
+                num_remaining--;
+            }
+        }
+        wait_for_flusher_to_settle(h, h1);
+    }
 
-    std::string opaqueStr("eq_dcpq:" + name + ":stream_0_opaque");
-    uint32_t stream_opaque = get_int_stat(h, h1, opaqueStr.c_str(), "dcp");
+    // Release free memory back to OS to minimize our footprint after
+    // removing the documents above.
+    testHarness.release_free_memory();
 
-    checkeq(ENGINE_SUCCESS,
-            h1->dcp.snapshot_marker(h, cookie, stream_opaque, rep_vb_num, 0, 10,
-                                    1),
-            "Failed to send snapshot marker!");
+    // Sanity check - mem_used should have reduced down by approximately how
+    // many documents were removed.
+    // Allow some extra, to handle any increase in data structure sizes used
+    // to actually manage the objects.
+    const double fuzz_factor = 1.2;
+    const size_t all_docs_size = mem_used_1 - mem_used_0;
+    const size_t remaining_size = (all_docs_size / num_docs) * num_remaining;
+    const size_t expected_mem = (mem_used_0 + remaining_size) * fuzz_factor;
+    wait_for_memory_usage_below(h, h1, expected_mem);
 
-    check(h1->dcp.mutation(h, cookie, stream_opaque, "key", 3, "value", 5,
-                           cas, rep_vb_num, flags, datatype,
-                           bySeqno, revSeqno, exprtime,
-                           lockTime, NULL, 0, 0) == ENGINE_SUCCESS,
-          "Failed dcp mutate.");
+    size_t mapped_2 = testHarness.get_mapped_bytes();
 
-    /* Active vbucket 1; write 5 items */
-    check(set_vbucket_state(h, h1, 1, vbucket_state_active),
-          "Failed to set vbucket state.");
-    for (int j= 0; j < num_items; j++) {
-        std::stringstream ss;
-        ss << "key" << j;
-        checkeq(ENGINE_SUCCESS,
-                store(h, h1, NULL, OPERATION_SET, ss.str().c_str(),
-                      "value", NULL, 0, 1),
-                "Failed to store an item.");
+    // Sanity check (2) - mapped memory should still be high - at least 90% of
+    // the value after creation, before delete.
+    check(mapped_2 - mapped_0 >= 0.9 * (double)(mapped_1 - mapped_0),
+          "Mapped memory lower than expected");
+
+    // 3. Trigger defragmentation
+    // (Enable defragmenter task if it was disabled)
+
+    if (get_int_stat(h, h1, "defragmenter_enabled") == 0) {
+        check(set_param(h, h1, protocol_binary_engine_param_flush,
+                    "defragmenter_enabled", "true"),
+                "Set defragmenter_enabled should have worked");
     }
 
-    /* Create request to get vb seqno of all vbuckets */
-    get_all_vb_seqnos(h, h1, static_cast<vbucket_state_t>(0), cookie);
+    check(set_param(h, h1, protocol_binary_engine_param_flush, "defragmenter_run",
+                    "true"),
+          "Failed to trigger defragmenter");
 
-    /* Check if the response received is correct */
-    verify_all_vb_seqnos(h, h1, 0, 1);
+    // Check that mapped memory has decreased after defragmentation - should be
+    // less than 60% of the amount before defrag (this is pretty conservative,
+    // but it's hard to accurately predict the whole-application size).
+    // Give it 10 seconds to drop.
+    const size_t expected_mapped = ((mapped_2 - mapped_0) * 0.6) + mapped_0;
+    check(wait_for_mapped_below(expected_mapped,
+                                10 * 1000 * 1000),
+          "Mapped memory didn't reduce as expected after defragmentation");
 
     testHarness.destroy_cookie(cookie);
     return SUCCESS;
 }
+#endif // defined(HAVE_JEMALLOC)
+
+static enum test_result test_hlc_cas(ENGINE_HANDLE *h,
+                                     ENGINE_HANDLE_V1 *h1) {
+    const char *key = "key";
+    item *i = NULL;
+    item_info info;
+    uint64_t curr_cas = 0, prev_cas = 0;
+
+    memset(&info, 0, sizeof(info));
+
+    //enabled time sync
+    set_drift_counter_state(h, h1, 100000, true);
+    check(store(h, h1, NULL, OPERATION_ADD, key, "data1", &i, 0, 0)
+          == ENGINE_SUCCESS, "Failed to store an item");
+    h1->release(h, NULL, i);
+
+    check(get_item_info(h, h1, &info, key), "Error in getting item info");
+    curr_cas = info.cas;
+    check(curr_cas > prev_cas, "CAS is not monotonically increasing");
+    prev_cas = curr_cas;
+
+    //set a lesser drift and ensure that the CAS is monotonically
+    //increasing
+    set_drift_counter_state(h, h1, 100, true);
+
+    check(store(h, h1, NULL, OPERATION_SET, key, "data2", &i, 0, 0)
+          == ENGINE_SUCCESS, "Failed to store an item");
+    h1->release(h, NULL, i);
+
+    check(get_item_info(h, h1, &info, key), "Error getting item info");
+    curr_cas = info.cas;
+    check(curr_cas > prev_cas, "CAS is not monotonically increasing");
+    prev_cas = curr_cas;
+
+    //ensure that the adjusted time will be negative
+    int64_t drift_counter = (-1) * (gethrtime() + 100000);
+    set_drift_counter_state(h, h1, drift_counter, true);
+
+    protocol_binary_request_header *request;
+    int64_t adjusted_time;
+    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
+                           NULL, 0, NULL, 0);
+    h1->unknown_command(h, NULL, request, add_response);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
+            "Expected Success");
+    checkeq(sizeof(int64_t), last_body.size(),
+            "Bodylen didn't match expected value");
+    memcpy(&adjusted_time, last_body.data(), last_body.size());
+    adjusted_time = ntohll(adjusted_time);
+    check(adjusted_time < 0, "Adjusted time is supposed to negative");
+
+    check(store(h, h1, NULL, OPERATION_REPLACE, key, "data3", &i, 0, 0)
+          == ENGINE_SUCCESS, "Failed to store an item");
+    h1->release(h, NULL, i);
+
+    check(get_item_info(h, h1, &info, key), "Error in getting item info");
+    curr_cas = info.cas;
+    check(curr_cas > prev_cas, "CAS is not monotonically increasing");
+    prev_cas = curr_cas;
+
+    //disable time sync
+    set_drift_counter_state(h, h1, 0, false);
+
+    getl(h, h1, key, 0, 10);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
+          "Expected to be able to getl on first try");
+    curr_cas = last_cas;
+    check(curr_cas > prev_cas, "CAS is not monotonically increasing");
+    prev_cas = curr_cas;
+
+    uint64_t result = 0;
+    check(h1->arithmetic(h, NULL, "key2", 4, true, true, 1, 1, 0,
+                         &i, PROTOCOL_BINARY_RAW_BYTES, &result, 0)
+                         == ENGINE_SUCCESS, "Failed arithmetic operation");
+    h1->release(h, NULL, i);
+
+    check(get_item_info(h, h1, &info, "key2"), "Error in getting item info");
+    curr_cas = info.cas;
+    check(curr_cas > prev_cas, "CAS is not monotonically increasing");
+
+    return SUCCESS;
+}
 
 static enum test_result test_failover_log_dcp(ENGINE_HANDLE *h,
                                               ENGINE_HANDLE_V1 *h1) {
@@ -11625,64 +13858,159 @@ static enum test_result test_failover_log_dcp(ENGINE_HANDLE *h,
         h1->release(h, NULL, i);
     }
 
-    wait_for_flusher_to_settle(h, h1);
-    wait_for_stat_to_be(h, h1, "curr_items", 10);
+    wait_for_flusher_to_settle(h, h1);
+    wait_for_stat_to_be(h, h1, "curr_items", 10);
+
+    testHarness.reload_engine(&h, &h1,
+                              testHarness.engine_path,
+                              testHarness.get_current_testcase()->cfg,
+                              true, true);
+    wait_for_warmup_complete(h, h1);
+
+    wait_for_stat_to_be(h, h1, "curr_items", 10);
+
+    uint64_t start = 0;
+    uint64_t end = 1000;
+    uint64_t uuid = 0;
+    uint64_t snap_start_seq = start;
+    uint64_t snap_end_seq = start;
+    dcp_stream_req(h, h1, 1, 0, 0, start, end, uuid,
+                   snap_start_seq, snap_end_seq, 0, ENGINE_SUCCESS);
+
+    start = 0;
+    end = 1000;
+    uuid = get_ull_stat(h, h1, "vb_0:1:id", "failovers");
+    snap_start_seq = start;
+    snap_end_seq = start;
+    dcp_stream_req(h, h1, 1, 0, 0, start, end, uuid,
+                   snap_start_seq, snap_end_seq, 0, ENGINE_SUCCESS);
+
+    start = 2;
+    end = 1000;
+    uuid = get_ull_stat(h, h1, "vb_0:1:id", "failovers");
+    snap_start_seq = start;
+    snap_end_seq = start;
+    dcp_stream_req(h, h1, 1, 0, 0, start, end, uuid,
+                   snap_start_seq, snap_end_seq, 0, ENGINE_SUCCESS);
+
+    start = 10;
+    end = 1000;
+    uuid = get_ull_stat(h, h1, "vb_0:1:id", "failovers");
+    snap_start_seq = start;
+    snap_end_seq = start;
+    dcp_stream_req(h, h1, 1, 0, 0, start, end, uuid,
+                   snap_start_seq, snap_end_seq, 0, ENGINE_SUCCESS);
+
+    start = 12;
+    end = 1000;
+    uuid = get_ull_stat(h, h1, "vb_0:1:id", "failovers");
+    snap_start_seq = start;
+    snap_end_seq = start;
+    dcp_stream_req(h, h1, 1, 0, 0, start, end, uuid,
+                   snap_start_seq, snap_end_seq, 10, ENGINE_ROLLBACK);
+
+    start = 2;
+    end = 1000;
+    uuid = 123456;
+    snap_start_seq = start;
+    snap_end_seq = start;
+    dcp_stream_req(h, h1, 1, 0, 0, start, end, uuid,
+                   snap_start_seq, snap_end_seq, 0, ENGINE_ROLLBACK);
+
+    /* Test a case where start_seqno > vb_high_seqno and flags
+       DCP_ADD_STREAM_FLAG_LATEST/DCP_ADD_STREAM_FLAG_DISKONLY set */
+    start = 12;
+    end = 1000;
+    uuid = get_ull_stat(h, h1, "vb_0:1:id", "failovers");
+    snap_start_seq = start;
+    snap_end_seq = start;
+
+    /* Expect rollback */
+    dcp_stream_req(h, h1, 1, DCP_ADD_STREAM_FLAG_LATEST, 0, start, end, uuid,
+                   snap_start_seq, snap_end_seq, 10, ENGINE_ROLLBACK);
+
+    /* Expect rollback */
+    dcp_stream_req(h, h1, 1, DCP_ADD_STREAM_FLAG_DISKONLY, 0, start, end, uuid,
+                   snap_start_seq, snap_end_seq, 10, ENGINE_ROLLBACK);
+
+    return SUCCESS;
+}
+
+static enum test_result test_get_all_vb_seqnos(ENGINE_HANDLE *h,
+                                               ENGINE_HANDLE_V1 *h1) {
+    const void *cookie = testHarness.create_cookie();
+
+    const int num_vbuckets = 10;
+
+    /* Replica vbucket 0; snapshot 0 to 10, but write just 1 item */
+    const int rep_vb_num = 0;
+    check(set_vbucket_state(h, h1, rep_vb_num, vbucket_state_replica),
+          "Failed to set vbucket state");
+    wait_for_flusher_to_settle(h, h1);
+
+    uint32_t opaque = 0xFFFF0000;
+    uint32_t flags = 0;
+    std::string name("unittest");
+    uint8_t cas = 0;
+    uint8_t datatype = 1;
+    uint64_t bySeqno = 10;
+    uint64_t revSeqno = 0;
+    uint32_t exprtime = 0;
+    uint32_t lockTime = 0;
+
+    checkeq(ENGINE_SUCCESS,
+            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)(name.c_str()),
+                         name.size()),
+            "Failed to open DCP consumer connection!");
+    add_stream_for_consumer(h, h1, cookie, opaque++, rep_vb_num, 0,
+                            PROTOCOL_BINARY_RESPONSE_SUCCESS);
+
+    std::string opaqueStr("eq_dcpq:" + name + ":stream_0_opaque");
+    uint32_t stream_opaque = get_int_stat(h, h1, opaqueStr.c_str(), "dcp");
+
+    checkeq(ENGINE_SUCCESS,
+            h1->dcp.snapshot_marker(h, cookie, stream_opaque, rep_vb_num, 0, 10,
+                                    1),
+            "Failed to send snapshot marker!");
 
-    testHarness.reload_engine(&h, &h1,
-                              testHarness.engine_path,
-                              testHarness.get_current_testcase()->cfg,
-                              true, true);
-    wait_for_warmup_complete(h, h1);
+    check(h1->dcp.mutation(h, cookie, stream_opaque, "key", 3, "value", 5,
+                           cas, rep_vb_num, flags, datatype,
+                           bySeqno, revSeqno, exprtime,
+                           lockTime, NULL, 0, 0) == ENGINE_SUCCESS,
+          "Failed dcp mutate.");
 
-    wait_for_stat_to_be(h, h1, "curr_items", 10);
+    /* Create active vbuckets */
+    for (int i = 1; i < num_vbuckets; i++) {
+        /* Active vbuckets */
+        check(set_vbucket_state(h, h1, i, vbucket_state_active),
+              "Failed to set vbucket state.");
+        for (int j= 0; j < i; j++) {
+            std::string key("key" + std::to_string(i));
+            check(store(h, h1, NULL, OPERATION_SET, key.c_str(),
+                        "value", NULL, 0, i)
+                  == ENGINE_SUCCESS, "Failed to store an item.");
+        }
+    }
 
-    uint64_t start = 0;
-    uint64_t end = 1000;
-    uint64_t uuid = 0;
-    uint64_t snap_start_seq = start;
-    uint64_t snap_end_seq = start;
-    dcp_stream_req(h, h1, 1, 0, start, end, uuid,
-                   snap_start_seq, snap_end_seq, 0, ENGINE_SUCCESS);
+    /* Create request to get vb seqno of all vbuckets */
+    get_all_vb_seqnos(h, h1, static_cast<vbucket_state_t>(0), cookie);
 
-    start = 0;
-    end = 1000;
-    uuid = get_ull_stat(h, h1, "vb_0:1:id", "failovers");
-    snap_start_seq = start;
-    snap_end_seq = start;
-    dcp_stream_req(h, h1, 1, 0, start, end, uuid,
-                   snap_start_seq, snap_end_seq, 0, ENGINE_SUCCESS);
+    /* Check if the response received is correct */
+    verify_all_vb_seqnos(h, h1, 0, num_vbuckets - 1);
 
-    start = 2;
-    end = 1000;
-    uuid = get_ull_stat(h, h1, "vb_0:1:id", "failovers");
-    snap_start_seq = start;
-    snap_end_seq = start;
-    dcp_stream_req(h, h1, 1, 0, start, end, uuid,
-                   snap_start_seq, snap_end_seq, 0, ENGINE_SUCCESS);
+    /* Create request to get vb seqno of active vbuckets */
+    get_all_vb_seqnos(h, h1, vbucket_state_active, cookie);
 
-    start = 10;
-    end = 1000;
-    uuid = get_ull_stat(h, h1, "vb_0:1:id", "failovers");
-    snap_start_seq = start;
-    snap_end_seq = start;
-    dcp_stream_req(h, h1, 1, 0, start, end, uuid,
-                   snap_start_seq, snap_end_seq, 0, ENGINE_SUCCESS);
+    /* Check if the response received is correct */
+    verify_all_vb_seqnos(h, h1, 1, num_vbuckets - 1);
 
-    start = 12;
-    end = 1000;
-    uuid = get_ull_stat(h, h1, "vb_0:1:id", "failovers");
-    snap_start_seq = start;
-    snap_end_seq = start;
-    dcp_stream_req(h, h1, 1, 0, start, end, uuid,
-                   snap_start_seq, snap_end_seq, 10, ENGINE_ROLLBACK);
+    /* Create request to get vb seqno of replica vbuckets */
+    get_all_vb_seqnos(h, h1, vbucket_state_replica, cookie);
 
-    start = 2;
-    end = 1000;
-    uuid = 123456;
-    snap_start_seq = start;
-    snap_end_seq = start;
-    dcp_stream_req(h, h1, 1, 0, start, end, uuid,
-                   snap_start_seq, snap_end_seq, 0, ENGINE_ROLLBACK);
+    /* Check if the response received is correct */
+    verify_all_vb_seqnos(h, h1, 0, 0);
+
+    testHarness.destroy_cookie(cookie);
 
     return SUCCESS;
 }
@@ -11781,6 +14109,348 @@ static enum test_result test_mb16357(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+// Regression test for MB-17517 - ensure that if an item is locked when TAP
+// attempts to stream it, it doesn't get a CAS of -1.
+static enum test_result test_mb17517_tap_with_locked_key(ENGINE_HANDLE *h,
+                                                         ENGINE_HANDLE_V1 *h1) {
+    const uint16_t vbid = 0;
+    // Store an item and immediately lock it.
+    item *it = NULL;
+    std::string key("key");
+    checkeq(store(h, h1, NULL, OPERATION_SET, key.c_str(), "value",
+                  &it, 0, vbid, 3600, PROTOCOL_BINARY_RAW_BYTES),
+            ENGINE_SUCCESS,
+            "Failed to store an item.");
+    h1->release(h, NULL, it);
+
+    uint32_t lock_timeout = 10;
+    getl(h, h1, key.c_str(), vbid, lock_timeout);
+    checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
+            "Expected to be able to getl on first try");
+
+    wait_for_flusher_to_settle(h, h1);
+
+    // Create the TAP connection and try to get the items.
+    const void *cookie = testHarness.create_cookie();
+    std::string name("test_mb17517_tap_with_locked_key");
+    TAP_ITERATOR iter = h1->get_tap_iterator(h, cookie, name.c_str(),
+                                             name.length(),
+                                             TAP_CONNECT_FLAG_DUMP, NULL, 0);
+    check(iter != NULL, "Failed to create a tap iterator");
+
+    void *engine_specific;
+    uint16_t nengine_specific;
+    uint8_t ttl;
+    uint16_t flags;
+    uint32_t seqno;
+    uint16_t vbucket;
+    tap_event_t event;
+
+    uint16_t unlikely_vbucket_identifier = 17293;
+
+    do {
+        vbucket = unlikely_vbucket_identifier;
+        event = iter(h, cookie, &it, &engine_specific,
+                     &nengine_specific, &ttl, &flags,
+                     &seqno, &vbucket);
+
+        switch (event) {
+        case TAP_PAUSE:
+            testHarness.waitfor_cookie(cookie);
+            break;
+        case TAP_OPAQUE:
+        case TAP_NOOP:
+            break;
+        case TAP_MUTATION: {
+            testHarness.unlock_cookie(cookie);
+
+            item_info info;
+            info.nvalue = 1;
+            if (!h1->get_item_info(h, NULL, it, &info)) {
+                fprintf(stderr, "test_mb17517_tap_with_locked_key: "
+                        "get_item_info failed\n");
+                return FAIL;
+            }
+
+            // Check the CAS.
+            if (info.cas == ~0ull) {
+                fprintf(stderr, "test_mb17517_tap_with_locked_key: "
+                        "Got CAS of -1 in TAP_MUTATION\n");
+                return FAIL;
+            }
+
+            testHarness.lock_cookie(cookie);
+            break;
+        }
+        case TAP_DISCONNECT:
+            break;
+        default:
+            std::cerr << "Unexpected event:  " << event << std::endl;
+            return FAIL;
+        }
+
+    } while (event != TAP_DISCONNECT);
+
+    testHarness.unlock_cookie(cookie);
+    testHarness.destroy_cookie(cookie);
+
+    return SUCCESS;
+}
+
+static void force_vbstate_to_25x(std::string dbname, int vbucket) {
+    std::string filename = dbname +
+                           DIRECTORY_SEPARATOR_CHARACTER +
+                           std::to_string(vbucket) +
+                           ".couch.1";
+    Db* handle;
+    couchstore_error_t err = couchstore_open_db(filename.c_str(),
+                                                COUCHSTORE_OPEN_FLAG_CREATE,
+                                                &handle);
+
+    checkeq(COUCHSTORE_SUCCESS, err, "Failed to open new database");
+
+    // Create 2.5 _local/vbstate
+    std::string vbstate2_5_x ="{\"state\": \"active\","
+                              " \"checkpoint_id\": \"1\","
+                              " \"max_deleted_seqno\": \"0\"}";
+    LocalDoc vbstate;
+    vbstate.id.buf = (char *)"_local/vbstate";
+    vbstate.id.size = sizeof("_local/vbstate") - 1;
+    vbstate.json.buf = (char *)vbstate2_5_x.c_str();
+    vbstate.json.size = vbstate2_5_x.size();
+    vbstate.deleted = 0;
+
+    err = couchstore_save_local_document(handle, &vbstate);
+    checkeq(COUCHSTORE_SUCCESS, err, "Failed to write local document");
+    couchstore_commit(handle);
+    couchstore_close_db(handle);
+}
+
+// Regression test for MB-19635
+// Check that warming up from a 2.x couchfile doesn't end up with a UUID of 0
+// we warmup 2 vbuckets and ensure they get unique IDs.
+static enum test_result test_mb19635_upgrade_from_25x(ENGINE_HANDLE *h,
+                                                      ENGINE_HANDLE_V1 *h1) {
+    std::string dbname = dbname_env;
+
+    force_vbstate_to_25x(dbname, 0);
+    force_vbstate_to_25x(dbname, 1);
+
+    // Now shutdown engine force and restart to warmup from the 2.5.x data.
+    testHarness.reload_engine(&h, &h1,
+                              testHarness.engine_path,
+                              testHarness.get_current_testcase()->cfg,
+                              true, false);
+    wait_for_warmup_complete(h, h1);
+    uint64_t vb_uuid0 = get_ull_stat(h, h1, "vb_0:uuid", "vbucket-details");
+    uint64_t vb_uuid1 = get_ull_stat(h, h1, "vb_1:uuid", "vbucket-details");
+    checkne(vb_uuid0, vb_uuid1, "UUID is not unique");
+    return SUCCESS;
+}
+
+static enum test_result test_set_dcp_param(ENGINE_HANDLE *h,
+                                           ENGINE_HANDLE_V1 *h1)
+{
+    auto func = [h, h1](std::string key, size_t newValue, bool expectedSetParam){
+        std::string statKey = "ep_" + key;
+        size_t param = get_int_stat(h,
+                                    h1,
+                                    statKey.c_str());
+        std::string value = std::to_string(newValue);
+        check(expectedSetParam == set_param(h, h1,
+                                            protocol_binary_engine_param_dcp,
+                                            key.c_str(),
+                                            value.c_str()),
+                "Set param not expected");
+        check(newValue != param,
+              "Forcing failure as nothing will change");
+
+        if (expectedSetParam) {
+            checkeq(newValue,
+                    size_t(get_int_stat(h,
+                                        h1,
+                                        statKey.c_str())),
+                "Incorrect dcp param value after calling set_param");
+        }
+    };
+
+    func("dcp_consumer_process_buffered_messages_yield_limit", 1000, true);
+    func("dcp_consumer_process_buffered_messages_batch_size", 1000, true);
+    func("dcp_consumer_process_buffered_messages_yield_limit", 0, false);
+    func("dcp_consumer_process_buffered_messages_batch_size", 0, false);
+    return SUCCESS;
+}
+
+
+/*
+ * Test MB-18452
+ * Drive DCP consumer by halting all NONIO tasks
+ * Writing numItems mutations (they get buffered)
+ * Then trigger the NONIO tasks, which will trigger the DCP consumer
+ *  to consume the buffered items.
+ * If the DCP consumer is friendly and not hogging the NONIO threads
+ * we should see it being scheduled many times.
+ * This test function returns the number of times the processor task was
+ * dispatched.
+ */
+static int test_mb18452(ENGINE_HANDLE *h,
+                        ENGINE_HANDLE_V1 *h1,
+                        size_t numItems,
+                        size_t yieldValue,
+                        size_t batchSize) {
+
+    // 1. Setup the consumer params.
+    std::string value = std::to_string(yieldValue);
+    set_param(h, h1, protocol_binary_engine_param_dcp,
+              "dcp_consumer_process_buffered_messages_yield_limit",
+              value.c_str());
+    value = std::to_string(batchSize);
+    set_param(h, h1, protocol_binary_engine_param_dcp,
+              "dcp_consumer_process_buffered_messages_batch_size",
+              value.c_str());
+
+    const uint16_t vbid = 0;
+    const uint32_t opaque = 0xFFFF0000;
+    const uint32_t flags = 0;
+    const void* cookie = testHarness.create_cookie();
+
+    // 2. We need to use a replica
+    check(set_vbucket_state(h, h1, vbid, vbucket_state_replica),
+          "Failed to set vbucket state.");
+
+    // 3. Force the engine to not run any NONIO tasks whilst we 'load up'
+    set_param(h, h1, protocol_binary_engine_param_flush,
+              "max_num_nonio",
+              "0");
+
+    // 4. Create a consumer and one stream for the vbucket
+    std::string consumer("unittest");
+    checkeq(h1->dcp.open(h,
+                         cookie,
+                         opaque,
+                         0/*seqno*/,
+                         flags,
+                         (void*)consumer.c_str(),
+                         consumer.length()),
+            ENGINE_SUCCESS,
+            "Failed dcp Consumer open connection.");
+    add_stream_for_consumer(h, h1, cookie, opaque + 1, vbid, flags,
+                            PROTOCOL_BINARY_RESPONSE_SUCCESS);
+
+    uint32_t stream_opaque = get_int_stat(h, h1,
+                                          "eq_dcpq:unittest:stream_0_opaque",
+                                          "dcp");
+    checkeq(ENGINE_SUCCESS,
+            h1->dcp.snapshot_marker(h,
+                                    cookie,
+                                    stream_opaque,
+                                    vbid,
+                                    1,//snap start
+                                    numItems,//snap end
+                                    2), //flags
+            "Failed to send snapshot marker");
+
+    for (uint64_t seqno = 1; seqno <= numItems; seqno++) {
+        std::string key = "key" + std::to_string(seqno);
+        checkeq(ENGINE_SUCCESS,
+                h1->dcp.mutation(h,
+                                 cookie,
+                                 stream_opaque,
+                                 key.c_str(),
+                                 key.length(),
+                                 "value", // item value
+                                 sizeof("value"), // item value length
+                                 seqno, // cas
+                                 vbid, // vbucket
+                                 0, // flags
+                                 PROTOCOL_BINARY_RAW_BYTES,
+                                 seqno, // bySeqno
+                                 1, // revSeqno
+                                 0, // expiration
+                                 0, // locktime
+                                 "", //meta
+                                 0, // metalen
+                                 INITIAL_NRU_VALUE),
+                "Failed to send dcp mutation");
+
+        // At n - 1, enable NONIO tasks, the nth mutation will wake up the task.
+        if (seqno == (numItems - 1)) {
+               set_param(h, h1, protocol_binary_engine_param_flush,
+              "max_num_nonio",
+              "1");
+        }
+    }
+
+    wait_for_stat_to_be(h, h1, "vb_replica_curr_items", numItems);
+
+    // 3. Force the engine to not run any NONIO tasks whilst we 'count up'
+    set_param(h, h1, protocol_binary_engine_param_flush,
+              "max_num_nonio",
+              "0");
+
+    // Now we should count how many times the NONIO task ran
+    // This is slighly racy, but if the task is yielding we expect many
+    // runs from it, not a small number
+    check(h1->get_stats(h, NULL, "dispatcher",
+                        strlen("dispatcher"), add_stats) == ENGINE_SUCCESS,
+                        "Failed to get worker stats");
+
+    // Count up how many times the Processing task was logged
+    int count = 0;
+    const std::string key1 = "nonio_worker_";
+    const std::string key2 = "Processing buffered items for eq_dcpq:unittest";
+    for (auto kv : vals) {
+        if (kv.first.find(key1) != std::string::npos &&
+            kv.second.find(key2) != std::string::npos) {
+            count++;
+        }
+    }
+
+    // 4. Re-enable NONIO so we can shutdown
+    set_param(h, h1, protocol_binary_engine_param_flush,
+              "max_num_nonio",
+              "1");
+    return count;
+}
+
+/**
+ * Test the behaviour of DCP consumer under load.
+ * The consumer use a NONIO task to process data from an input buffer.
+ * This task when given lots of data should voluntarily yield if it finds
+ * itself running for n iterations...
+ */
+static enum test_result test_mb18452_smallYield(ENGINE_HANDLE* h,
+                                                 ENGINE_HANDLE_V1* h1) {
+    const int batchSize = 10;
+    const int numItems = 1000;
+    const int yield = 10;
+
+    int processorRuns = test_mb18452(h, h1, numItems, yield, batchSize);
+
+    // Before the ep-engine updates, the processor run count was usually 1 or 2
+    // with the fix it's up around 80 (appears to saturate the log).
+
+    // So we check that it ran the same or more times than the numItems/(yield*batch)
+    check(processorRuns >= (numItems / (yield * batchSize)),
+          "DCP Processor ran less times than expected.");
+    return SUCCESS;
+}
+
+static enum test_result test_mb18452_largeYield(ENGINE_HANDLE* h,
+                                                ENGINE_HANDLE_V1* h1) {
+    const int batchSize = 10;
+    const int numItems = 10000;
+    const int yield = 10000;
+    int processorRuns =  test_mb18452(h, h1, numItems, yield, batchSize);
+    // Here we expect very few yields, so very few runs (definitely not enough to fill
+    // the task log (TASK_LOG_SIZE)
+    check(processorRuns < 80,
+          "DCP Processor ran more times than expected.");
+
+
+    return SUCCESS;
+}
+
 /**
  * This test demonstrates bucket shutdown when there is a rogue
  * backfill (whose producer and stream are already closed).
@@ -11982,9 +14652,12 @@ engine_test_t* get_tests(void) {
         // basic tests
         TestCase("test alloc limit", test_alloc_limit, test_setup, teardown,
                  NULL, prepare, cleanup),
+        TestCase("test_memory_tracking", test_memory_tracking, test_setup,
+                 teardown, NULL, prepare, cleanup),
         TestCase("test total memory limit", test_memory_limit,
                  test_setup, teardown,
-                 "max_size=5492;ht_locks=1;ht_size=3;chk_remover_stime=1;chk_period=60",
+                 "max_size=2097152" // 2MB
+                 ";ht_locks=1;ht_size=3;chk_remover_stime=1;chk_period=60",
                  prepare, cleanup),
         TestCase("test max_size changes", test_max_size_settings,
                  test_setup, teardown,
@@ -12002,6 +14675,10 @@ engine_test_t* get_tests(void) {
                  teardown, NULL, prepare, cleanup),
         TestCase("concurrent incr", test_conc_incr, test_setup,
                  teardown, NULL, prepare, cleanup),
+        TestCase("test_conc_incr_new_itm", test_conc_incr_new_itm, test_setup,
+                 teardown, NULL, prepare, cleanup),
+        TestCase("multi set", test_multi_set, test_setup,
+                 teardown, NULL, prepare, cleanup),
         TestCase("set+get hit", test_set_get_hit, test_setup,
                  teardown, NULL, prepare, cleanup),
         TestCase("test getl then del with cas", test_getl_delete_with_cas,
@@ -12054,6 +14731,9 @@ engine_test_t* get_tests(void) {
                  test_setup, teardown, NULL, prepare, cleanup),
         TestCase("incr expiry", test_bug2799, test_setup,
                  teardown, NULL, prepare, cleanup),
+        TestCase("incr with full eviction", test_incr_full_eviction,
+                 test_setup, teardown, "item_eviction_policy=full_eviction",
+                 prepare, cleanup),
         TestCase("test touch", test_touch, test_setup, teardown,
                  NULL, prepare, cleanup),
         TestCase("test touch (MB-7342)", test_touch_mb7342, test_setup, teardown,
@@ -12092,6 +14772,8 @@ engine_test_t* get_tests(void) {
                  test_setup, teardown, "max_vbuckets=1024", prepare, cleanup),
         TestCase("flush", test_flush, test_setup, teardown,
                  NULL, prepare, cleanup),
+        TestCase("multiple flush requests", test_multiple_flush, test_setup,
+                 teardown, NULL, prepare, cleanup),
         TestCase("flush with stats", test_flush_stats, test_setup, teardown,
                  "flushall_enabled=true;chk_remover_stime=1;chk_period=60",
                  prepare, cleanup),
@@ -12134,6 +14816,8 @@ engine_test_t* get_tests(void) {
                  NULL, prepare, cleanup),
         TestCase("test observe single key", test_observe_single_key, test_setup, teardown,
                  NULL, prepare, cleanup),
+        TestCase("test observe on temp item", test_observe_temp_item, test_setup, teardown,
+                 NULL, prepare, cleanup),
         TestCase("test observe multi key", test_observe_multi_key, test_setup, teardown,
                  NULL, prepare, cleanup),
         TestCase("test multiple observes", test_multiple_observes, test_setup, teardown,
@@ -12142,10 +14826,30 @@ engine_test_t* get_tests(void) {
                  teardown, NULL, prepare, cleanup),
         TestCase("test observe not my vbucket", test_observe_errors, test_setup,
                  teardown, NULL, prepare, cleanup),
+        TestCase("test observe seqno basic tests", test_observe_seqno_basic_tests,
+                 test_setup, teardown, NULL, prepare, cleanup),
+        TestCase("test observe seqno failover", test_observe_seqno_failover,
+                 test_setup, teardown, NULL, prepare, cleanup),
+        TestCase("test observe seqno error", test_observe_seqno_error,
+                 test_setup, teardown, NULL, prepare, cleanup),
         TestCase("test item pager", test_item_pager, test_setup,
-                 teardown, "max_size=204800", prepare, cleanup),
+                 teardown, "max_size=2048000", prepare, cleanup),
         TestCase("warmup conf", test_warmup_conf, test_setup,
                  teardown, NULL, prepare, cleanup),
+        TestCase("bloomfilter conf", test_bloomfilter_conf, test_setup,
+                 teardown, NULL, prepare, cleanup),
+        TestCase("test bloomfilters with value-only eviction",
+                 test_bloomfilters, test_setup,
+                 teardown, NULL, prepare, cleanup),
+        TestCase("test bloomfilters with full eviction",
+                 test_bloomfilters, test_setup,
+                 teardown, "item_eviction_policy=full_eviction", prepare, cleanup),
+        TestCase("test bloomfilters with store apis - value_only eviction",
+                 test_bloomfilters_with_store_apis, test_setup,
+                 teardown, NULL, prepare, cleanup),
+        TestCase("test bloomfilters with store apis - full_eviction",
+                 test_bloomfilters_with_store_apis, test_setup,
+                 teardown, "item_eviction_policy=full_eviction", prepare, cleanup),
         TestCase("test datatype", test_datatype, test_setup,
                  teardown, NULL, prepare, cleanup),
         TestCase("test datatype with unknown command", test_datatype_with_unknown_command,
@@ -12162,6 +14866,8 @@ engine_test_t* get_tests(void) {
                  NULL, prepare, cleanup),
         TestCase("file stats", test_vb_file_stats, test_setup, teardown,
                  NULL, prepare, cleanup),
+        TestCase("file stats post warmup", test_vb_file_stats_after_warmup,
+                 test_setup, teardown, NULL, prepare, cleanup),
         TestCase("bg stats", test_bg_stats, test_setup, teardown,
                  NULL, prepare, cleanup),
         TestCase("bg meta stats", test_bg_meta_stats, test_setup, teardown,
@@ -12209,6 +14915,10 @@ engine_test_t* get_tests(void) {
                  test_all_keys_api,
                  test_setup, teardown,
                  NULL, prepare, cleanup),
+        TestCase("test ALL_KEYS api during bucket creation",
+                 test_all_keys_api_during_bucket_creation,
+                 test_setup, teardown,
+                 NULL, prepare, cleanup),
         TestCase("ep worker stats", test_worker_stats,
                  test_setup, teardown,
                  "max_num_workers=8;max_threads=8", prepare, cleanup),
@@ -12505,6 +15215,8 @@ engine_test_t* get_tests(void) {
         // XDCR unit tests
         TestCase("get meta", test_get_meta, test_setup,
                  teardown, NULL, prepare, cleanup),
+        TestCase("get meta with extras", test_get_meta_with_extras,
+                 test_setup, teardown, NULL, prepare, cleanup),
         TestCase("get meta deleted", test_get_meta_deleted,
                  test_setup, teardown, NULL, prepare, cleanup),
         TestCase("get meta nonexistent", test_get_meta_nonexistent,
@@ -12524,9 +15236,16 @@ engine_test_t* get_tests(void) {
         TestCase("delete with meta nonexistent",
                  test_delete_with_meta_nonexistent, test_setup,
                  teardown, NULL, prepare, cleanup),
+        TestCase("delete with meta nonexistent no temp",
+                 test_delete_with_meta_nonexistent_no_temp, test_setup,
+                 teardown, NULL, prepare, cleanup),
         TestCase("delete_with_meta race with concurrent delete",
                  test_delete_with_meta_race_with_delete, test_setup,
                  teardown, NULL, prepare, cleanup),
+        TestCase("delete_with_meta race with concurrent delete",
+                 test_delete_with_meta_race_with_delete, test_setup,
+                 teardown, "item_eviction_policy=full_eviction",
+                 prepare, cleanup),
         TestCase("delete_with_meta race with concurrent set",
                  test_delete_with_meta_race_with_set, test_setup,
                  teardown, NULL, prepare, cleanup),
@@ -12555,11 +15274,20 @@ engine_test_t* get_tests(void) {
         TestCase("test set meta conflict resolution",
                  test_set_meta_conflict_resolution, test_setup, teardown, NULL,
                  prepare, cleanup),
+        TestCase("test del meta lww conflict resolution",
+                 test_del_meta_lww_conflict_resolution, test_setup, teardown, NULL,
+                 prepare, cleanup),
+        TestCase("test set meta lww conflict resolution",
+                 test_set_meta_lww_conflict_resolution, test_setup, teardown, NULL,
+                 prepare, cleanup),
         TestCase("temp item deletion", test_temp_item_deletion,
                  test_setup, teardown,
                  "exp_pager_stime=3", prepare, cleanup),
         TestCase("test estimate vb move", test_est_vb_move,
                  test_setup, teardown, NULL, prepare, cleanup),
+        TestCase("test getAdjustedTime, setDriftCounter apis",
+                 test_adjusted_time_apis, test_setup, teardown, NULL,
+                 prepare, cleanup),
 
         // Data traffic control tests
         TestCase("control data traffic", test_control_data_traffic,
@@ -12591,6 +15319,9 @@ engine_test_t* get_tests(void) {
                  NULL, prepare, cleanup),
         TestCase("test open consumer", test_dcp_consumer_open,
                  test_setup, teardown, NULL, prepare, cleanup),
+        TestCase("test dcp consumer flow control buffer size",
+                 test_dcp_consumer_flow_control_buf_sz,
+                 test_setup, teardown, NULL, prepare, cleanup),
         TestCase("test open producer", test_dcp_producer_open,
                  test_setup, teardown, NULL, prepare, cleanup),
         TestCase("test dcp noop", test_dcp_noop, test_setup, teardown, NULL,
@@ -12611,6 +15342,10 @@ engine_test_t* get_tests(void) {
         TestCase("test producer stream request (partial)",
                  test_dcp_producer_stream_req_partial, test_setup, teardown,
                  "chk_remover_stime=1;chk_max_items=100", prepare, cleanup),
+        TestCase("test producer stream request with time sync (partial)",
+                 test_dcp_producer_stream_req_partial_with_time_sync,
+                 test_setup, teardown,
+                 "chk_remover_stime=1;chk_max_items=100", prepare, cleanup),
         TestCase("test producer stream request (full)",
                  test_dcp_producer_stream_req_full, test_setup, teardown,
                  "chk_remover_stime=1;chk_max_items=100", prepare, cleanup),
@@ -12623,6 +15358,9 @@ engine_test_t* get_tests(void) {
         TestCase("test producer stream request (memory only)",
                  test_dcp_producer_stream_req_mem, test_setup, teardown,
                  "chk_remover_stime=1;chk_max_items=100", prepare, cleanup),
+        TestCase("test producer stream request (DGM)",
+                 test_dcp_producer_stream_req_dgm, test_setup, teardown,
+                 "chk_remover_stime=1;max_size=2621440", prepare, cleanup),
         TestCase("test producer stream request (latest flag)",
                  test_dcp_producer_stream_latest, test_setup, teardown, NULL,
                  prepare, cleanup),
@@ -12638,6 +15376,9 @@ engine_test_t* get_tests(void) {
                  test_setup, teardown, "chk_remover_stime=1", prepare, cleanup),
         TestCase("test dcp consumer takeover", test_dcp_consumer_takeover,
                  test_setup, teardown, NULL, prepare, cleanup),
+        TestCase("test failover scenario with dcp",
+                 test_failover_scenario_with_dcp, test_setup, teardown,
+                 NULL, prepare, cleanup),
         TestCase("test add stream", test_dcp_add_stream, test_setup, teardown,
                  "dcp_enable_flow_control=true;dcp_enable_noop=false", prepare,
                  cleanup),
@@ -12711,9 +15452,17 @@ engine_test_t* get_tests(void) {
         TestCase("dcp consumer mutate", test_dcp_consumer_mutate, test_setup,
                  teardown, "dcp_enable_flow_control=true;dcp_enable_noop=false",
                  prepare, cleanup),
+        TestCase("dcp consumer mutate with time sync",
+                 test_dcp_consumer_mutate_with_time_sync, test_setup,
+                 teardown, "dcp_enable_flow_control=true;dcp_enable_noop=false",
+                 prepare, cleanup),
         TestCase("dcp consumer delete", test_dcp_consumer_delete, test_setup,
                  teardown, "dcp_enable_flow_control=true;dcp_enable_noop=false",
                  prepare, cleanup),
+        TestCase("dcp consumer delete with time sync",
+                 test_dcp_consumer_delete_with_time_sync, test_setup,
+                 teardown, "dcp_enable_flow_control=true;dcp_enable_noop=false",
+                 prepare, cleanup),
         TestCase("dcp failover log", test_failover_log_dcp, test_setup,
                  teardown, NULL, prepare, cleanup),
         TestCase("dcp persistence seqno", test_dcp_persistence_seqno, test_setup,
@@ -12739,6 +15488,11 @@ engine_test_t* get_tests(void) {
         TestCase("test set_with_meta with item_eviction",
                  test_setWithMeta_with_item_eviction, test_setup, teardown,
                  "item_eviction_policy=full_eviction", prepare, cleanup),
+        TestCase("test multiple set and del with meta with item_eviction",
+                 test_multiple_set_delete_with_metas_full_eviction,
+                 test_setup, teardown,
+                 "item_eviction_policy=full_eviction",
+                 prepare, cleanup),
         TestCase("test add with item_eviction",
                  test_add_with_item_eviction, test_setup, teardown,
                  "item_eviction_policy=full_eviction", prepare, cleanup),
@@ -12771,6 +15525,9 @@ engine_test_t* get_tests(void) {
                  "item_eviction_policy=full_eviction;flushall_enabled=true", prepare, cleanup),
         TestCase("warmup stats", test_warmup_stats, test_setup,
                  teardown, "item_eviction_policy=full_eviction", prepare, cleanup),
+        TestCase("test get & delete on non existent items",
+                 test_non_existent_get_and_delete, test_setup, teardown,
+                 "item_eviction_policy=full_eviction", prepare, cleanup),
         TestCase("test MB-16421", test_mb16421,
                  test_setup, teardown, "item_eviction_policy=full_eviction",
                  prepare, cleanup),
@@ -12780,8 +15537,21 @@ engine_test_t* get_tests(void) {
 
         TestCase("test failover log behavior", test_failover_log_behavior,
                  test_setup, teardown, NULL, prepare, cleanup),
+
+#if defined(HAVE_JEMALLOC)
+        TestCase("test defragmenter", test_defragmenter,
+                 test_setup, teardown,
+                 "defragmenter_interval=9999"
+                 ";defragmenter_age_threshold=0"
+                 ";defragmenter_chunk_duration=99999",
+                 prepare, cleanup),
+#endif
+
+        TestCase("test hlc cas", test_hlc_cas, test_setup, teardown,
+                 NULL, prepare, cleanup),
         TestCase("test get all vb seqnos", test_get_all_vb_seqnos, test_setup,
                  teardown, NULL, prepare, cleanup),
+
         TestCase("test MB-16357", test_mb16357,
                  test_setup, teardown, "compaction_exp_mem_threshold=85",
                  prepare, cleanup),
@@ -12790,6 +15560,33 @@ engine_test_t* get_tests(void) {
 
         TestCase("test dcp early termination", test_dcp_early_termination,
                  test_setup, teardown, NULL, prepare, cleanup),
+
+        TestCase("test MB-17517 CAS -1 DCP", test_mb17517_cas_minus_1_dcp,
+                 test_setup, teardown, NULL, prepare, cleanup),
+
+        TestCase("test MB-17517 CAS -1 TAP", test_mb17517_cas_minus_1_tap,
+                 test_setup, teardown, NULL, prepare, cleanup),
+
+        TestCase("test_mb17517_tap_with_locked_key",
+                 test_mb17517_tap_with_locked_key, test_setup, teardown, NULL,
+                 prepare, cleanup),
+
+        TestCase("test_mb19635_upgrade_from_25x",
+                 test_mb19635_upgrade_from_25x, test_setup, teardown, NULL,
+                 prepare, cleanup),
+
+        TestCase("test_set_dcp_param",
+                 test_set_dcp_param, test_setup, teardown, NULL,
+                 prepare, cleanup),
+
+        TestCase("test_mb18452_largeYield",
+                 test_mb18452_largeYield, test_setup, teardown, "max_num_nonio=1",
+                 prepare, cleanup),
+
+        TestCase("test_mb18452_smallYield",
+                 test_mb18452_smallYield, test_setup, teardown, "max_num_nonio=1",
+                 prepare, cleanup),
+
         TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
     };