Merge remote-tracking branch 'couchbase/3.0.x' into sherlock 77/64977/1
authorDave Rigby <daver@couchbase.com>
Thu, 16 Jun 2016 08:42:22 +0000 (09:42 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 16 Jun 2016 10:24:15 +0000 (11:24 +0100)
* couchbase/3.0.x:
  MB-19204: ep_testsuite: Don't release the item while we're using it
  MB-19204: Address data race in ep_test_apis/testsuite
  MB-19204: ep_testsuite: Use std::string for last_key/body
  MB-19204: Remove alarm() call from atomic_ptr_test, reduce iteration count
  MB-19204: hash_table_test: Fix TSan issues

Start of merge of 3.1.5+ changes into sherlock, broken into multiple
merges due to the size.

Change-Id: I65530d3c81d6b5e8b0171d0e3e1da3e14e0bb308

1  2 
tests/ep_test_apis.cc
tests/ep_test_apis.h
tests/ep_testsuite.cc
tests/module_tests/hash_table_test.cc

  #define check(expr, msg) \
      static_cast<void>((expr) ? 0 : abort_msg(#expr, msg, __LINE__))
  
 -extern "C" bool abort_msg(const char *expr, const char *msg, int line);
 -
  std::map<std::string, std::string> vals;
  bool dump_stats = false;
- protocol_binary_response_status last_status =
-     static_cast<protocol_binary_response_status>(0);
- uint32_t last_bodylen = 0;
- char *last_key = NULL;
- char *last_body = NULL;
- bool last_deleted_flag = false;
- uint8_t last_conflict_resolution_mode = static_cast<uint8_t>(-1);
- uint64_t last_cas = 0;
- uint8_t last_datatype = 0x00;
+ AtomicValue<protocol_binary_response_status> last_status(
+     static_cast<protocol_binary_response_status>(0));
+ std::string last_key;
+ std::string last_body;
+ bool last_deleted_flag(false);
++AtomicValue<uint8_t> last_conflict_resolution_mode{0xff};
+ AtomicValue<uint64_t> last_cas(0);
+ AtomicValue<uint8_t> last_datatype(0x00);
  ItemMetaData last_meta;
 +uint64_t last_uuid = 0;
 +uint64_t last_seqno = 0;
  
  extern "C" bool add_response_get_meta(const void *key, uint16_t keylen,
                                        const void *ext, uint8_t extlen,
@@@ -637,56 -489,6 +608,56 @@@ bool set_vbucket_state(ENGINE_HANDLE *h
      return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
  }
  
-     std::string seqno_body(last_body, last_bodylen);
 +bool get_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
 +                       vbucket_state_t state, const void *cookie) {
 +    protocol_binary_request_header *pkt;
 +    if (state) {
 +        char ext[sizeof(vbucket_state_t)];
 +        encodeExt(ext, static_cast<uint32_t>(state));
 +        pkt = createPacket(PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS, 0, 0, ext,
 +                           sizeof(vbucket_state_t));
 +    } else {
 +        pkt = createPacket(PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS);
 +    }
 +
 +    check(h1->unknown_command(h, cookie, pkt, add_response) ==
 +          ENGINE_SUCCESS, "Error in getting all vb info");
 +
 +    free(pkt);
 +    return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
 +}
 +
 +void verify_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
 +                          uint16_t vb_start, uint16_t vb_end) {
 +    const int per_vb_resp_size = sizeof(uint16_t) + sizeof(uint64_t);
 +    const int high_seqno_offset = sizeof(uint16_t);
 +
-     checkeq((uint32_t)((vb_end - vb_start + 1) * per_vb_resp_size),
-             last_bodylen, "Failed to get all vb info.");
++    std::string seqno_body = last_body;
 +
 +    /* Check if the total response length is as expected. We expect 10 bytes
 +       (2 for vb_id + 8 for seqno) */
++    checkeq((size_t)((vb_end - vb_start + 1) * per_vb_resp_size),
++            last_body.size(), "Failed to get all vb info.");
 +    /* Check if the contents are correct */
 +    for (int i = 0; i < (vb_end - vb_start + 1); i++) {
 +        /* Check for correct vb_id */
 +        checkeq(static_cast<const uint16_t>(vb_start + i),
 +                ntohs(*(reinterpret_cast<const uint16_t*>(seqno_body.data() +
 +                                                          per_vb_resp_size * i))),
 +                "vb_id mismatch");
 +        /* Check for correct high_seqno */
 +        std::string vb_stat_seqno("vb_" + std::to_string(vb_start + i) +
 +                                  ":high_seqno");
 +        uint64_t high_seqno_vb = get_ull_stat(h, h1, vb_stat_seqno.c_str(),
 +                                              "vbucket-seqno");
 +        checkeq(high_seqno_vb,
 +                ntohll(*(reinterpret_cast<const uint64_t*>(seqno_body.data() +
 +                                                           per_vb_resp_size * i +
 +                                                           high_seqno_offset))),
 +              "high_seqno mismatch");
 +    }
 +}
 +
  void set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                     const size_t keylen, const char *val, const size_t vallen,
                     const uint32_t vb, ItemMetaData *itemMeta,
@@@ -1198,33 -991,3 +1170,33 @@@ void dcp_step(ENGINE_HANDLE *h, ENGINE_
      }
      free(producers);
  }
-         if (last_body) {
 +
 +void set_degraded_mode(ENGINE_HANDLE *h,
 +                       ENGINE_HANDLE_V1 *h1,
 +                       const void* cookie,
 +                       bool enable)
 +{
 +    protocol_binary_request_header *pkt;
 +    if (enable) {
 +        pkt = createPacket(PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC, 0, 0);
 +    } else {
 +        pkt = createPacket(PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC, 0, 0);
 +    }
 +
 +    ENGINE_ERROR_CODE errcode = h1->unknown_command(h, NULL, pkt, add_response);
 +    if (errcode != ENGINE_SUCCESS) {
 +        std::cerr << "Failed to set degraded mode to " << enable
 +                  << ". api call return engine code: " << errcode << std::endl;
 +        cb_assert(false);
 +    }
 +
 +    if (last_status != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
 +        std::cerr << "Failed to set degraded mode to " << enable
 +                  << ". protocol code: " << last_status << std::endl;
++        if (last_body.size() > 0) {
 +            std::cerr << "\tBody: [" << last_body << "]" << std::endl;
 +        }
 +
 +        cb_assert(false);
 +    }
 +}
@@@ -63,18 -61,14 +63,17 @@@ ENGINE_ERROR_CODE vb_map_response(cons
  }
  #endif
  
- extern protocol_binary_response_status last_status;
- extern char *last_key;
- extern char *last_body;
+ extern AtomicValue<protocol_binary_response_status> last_status;
+ extern std::string last_key;
+ extern std::string last_body;
  extern bool dump_stats;
  extern std::map<std::string, std::string> vals;
- extern uint32_t last_bodylen;
- extern uint64_t last_cas;
- extern uint8_t last_datatype;
+ extern AtomicValue<uint64_t> last_cas;
+ extern AtomicValue<uint8_t> last_datatype;
 +extern uint64_t last_uuid;
 +extern uint64_t last_seqno;
  extern bool last_deleted_flag;
- extern uint8_t last_conflict_resolution_mode;
++extern AtomicValue<uint8_t> last_conflict_resolution_mode;
  extern ItemMetaData last_meta;
  
  extern uint8_t dcp_last_op;
@@@ -164,55 -151,6 +164,55 @@@ static void check_key_value(ENGINE_HAND
      check(memcmp(info.value[0].iov_base, val, vlen) == 0, "Data mismatch");
  }
  
-     memcpy(&recv_format_type, last_body, sizeof(uint8_t));
 +// Fetches the CAS of the specified key.
 +static uint64_t get_CAS(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
 +                        const std::string& key) {
 +    item *i = NULL;
 +    checkeq(ENGINE_SUCCESS,
 +            h1->get(h, NULL, &i, key.c_str(), key.size(), /*vBucket*/0),
 +            "Failed to get key");
 +
 +    item_info info;
 +    info.nvalue = 1;
 +    check(h1->get_item_info(h, NULL, i, &info),
 +          "Failed to get item info for key");
 +    h1->release(h, NULL, i);
 +
 +    return info.cas;
 +}
 +
 +static void check_observe_seqno(bool failover, uint8_t format_type, uint16_t vb_id,
 +                                uint64_t vb_uuid, uint64_t last_persisted_seqno,
 +                                uint64_t current_seqno, uint64_t failover_vbuuid = 0,
 +                                uint64_t failover_seqno = 0) {
 +    uint8_t  recv_format_type;
 +    uint16_t recv_vb_id;
 +    uint64_t recv_vb_uuid;
 +    uint64_t recv_last_persisted_seqno;
 +    uint64_t recv_current_seqno;
 +    uint64_t recv_failover_vbuuid;
 +    uint64_t recv_failover_seqno;
 +
-     memcpy(&recv_vb_id, last_body + 1, sizeof(uint16_t));
++    memcpy(&recv_format_type, last_body.data(), sizeof(uint8_t));
 +    check(recv_format_type == format_type, "Wrong format type in result");
-     memcpy(&recv_vb_uuid, last_body + 3, sizeof(uint64_t));
++    memcpy(&recv_vb_id, last_body.data() + 1, sizeof(uint16_t));
 +    check(ntohs(recv_vb_id) == vb_id, "Wrong vbucket id in result");
-     memcpy(&recv_last_persisted_seqno, last_body + 11, sizeof(uint64_t));
++    memcpy(&recv_vb_uuid, last_body.data() + 3, sizeof(uint64_t));
 +    check(ntohll(recv_vb_uuid) == vb_uuid, "Wrong vbucket uuid in result");
-     memcpy(&recv_current_seqno, last_body + 19, sizeof(uint64_t));
++    memcpy(&recv_last_persisted_seqno, last_body.data() + 11, sizeof(uint64_t));
 +    check(ntohll(recv_last_persisted_seqno) == last_persisted_seqno,
 +          "Wrong persisted seqno in result");
-         memcpy(&recv_failover_vbuuid, last_body + 27, sizeof(uint64_t));
++    memcpy(&recv_current_seqno, last_body.data() + 19, sizeof(uint64_t));
 +    check(ntohll(recv_current_seqno) == current_seqno, "Wrong current seqno in result");
 +
 +    if (failover) {
-         memcpy(&recv_failover_seqno, last_body + 35, sizeof(uint64_t));
++        memcpy(&recv_failover_vbuuid, last_body.data() + 27, sizeof(uint64_t));
 +        check(ntohll(recv_failover_vbuuid) == failover_vbuuid, "Wrong failover uuid in result");
++        memcpy(&recv_failover_seqno, last_body.data() + 35, sizeof(uint64_t));
 +        check(ntohll(recv_failover_seqno) == failover_seqno, "Wrong failover seqno in result");
 +    }
 +}
 +
  static bool test_setup(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
      wait_for_warmup_complete(h, h1);
  
@@@ -5880,213 -5176,25 +5882,213 @@@ static enum test_result test_dcp_consum
      opaque = add_stream_for_consumer(h, h1, cookie, opaque, 0, 0,
                                       PROTOCOL_BINARY_RESPONSE_SUCCESS);
  
 -    // verify that we don't accept invalid opaque id's
 -    check(h1->dcp.deletion(h, cookie, opaque + 1, "key", 3, cas, vbucket,
 -                           bySeqno, revSeqno, NULL, 0) == ENGINE_KEY_ENOENT,
 -          "Failed to detect invalid DCP opaque value.");
 +    uint32_t dataLen = 100;
 +    char *data = static_cast<char *>(malloc(dataLen));
 +    memset(data, 'x', dataLen);
  
 -    // Consume an DCP deletion
 -    check(h1->dcp.deletion(h, cookie, opaque, "key", 3, cas, vbucket,
 -                           bySeqno, revSeqno, NULL, 0) == ENGINE_SUCCESS,
 -          "Failed dcp delete.");
 +    uint8_t cas = 0x1;
 +    uint16_t vbucket = 0;
 +    uint8_t datatype = 1;
 +    uint64_t bySeqno = 10;
 +    uint64_t revSeqno = 0;
 +    uint32_t exprtime = 0;
 +    uint32_t lockTime = 0;
  
 -    wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0,
 -                        "dcp");
 +    check(h1->dcp.snapshot_marker(h, cookie, opaque, 0, 10, 10, 1)
 +        == ENGINE_SUCCESS, "Failed to send snapshot marker");
  
 -    wait_for_stat_change(h, h1, "curr_items", 1);
 -    verify_curr_items(h, h1, 0, "one item deleted");
 -    testHarness.destroy_cookie(cookie);
 +    // Consume a DCP mutation with extended meta
 +    int64_t adjusted_time1 = gethrtime() * 2;
 +    ExtendedMetaData *emd = new ExtendedMetaData(adjusted_time1, false);
 +    cb_assert(emd && emd->getStatus() == ENGINE_SUCCESS);
 +    std::pair<const char*, uint16_t> meta = emd->getExtMeta();
 +    check(h1->dcp.mutation(h, cookie, opaque, "key", 3, data, dataLen, cas,
 +                           vbucket, flags, datatype,
 +                           bySeqno, revSeqno, exprtime,
 +                           lockTime, meta.first, meta.second, 0)
 +            == ENGINE_SUCCESS,
 +            "Failed dcp mutate.");
 +    delete emd;
  
 -    return SUCCESS;
 -}
 +    wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0, "dcp");
 +
 +    check(h1->dcp.close_stream(h, cookie, opaque, 0) == ENGINE_SUCCESS,
 +            "Expected success");
 +
 +    check(set_vbucket_state(h, h1, 0, vbucket_state_active),
 +          "Failed to set vbucket state.");
 +
 +    wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0,
 +                        "dcp");
 +
 +    check_key_value(h, h1, "key", data, dataLen);
 +
 +    testHarness.destroy_cookie(cookie);
 +    free(data);
 +
 +    protocol_binary_request_header *request;
 +    int64_t adjusted_time2;
 +    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
 +                           NULL, 0, NULL, 0);
 +    h1->unknown_command(h, NULL, request, add_response);
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
 +            "Expected Success");
-     check(last_bodylen == sizeof(int64_t),
++    checkeq(sizeof(int64_t), last_body.size(),
 +            "Bodylen didn't match expected value");
-     memcpy(&adjusted_time2, last_body, last_bodylen);
++    memcpy(&adjusted_time2, last_body.data(), last_body.size());
 +    adjusted_time2 = ntohll(adjusted_time2);
 +
 +    /**
 +     * Check that adjusted_time2 is marginally greater than
 +     * adjusted_time1.
 +     */
 +    check(adjusted_time2 >= adjusted_time1,
 +            "Adjusted time after mutation: Not what is expected");
 +
 +    return SUCCESS;
 +}
 +
 +
 +static enum test_result test_dcp_consumer_delete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 +    // Store an item
 +    item *i = NULL;
 +    check(store(h, h1, NULL, OPERATION_ADD,"key", "value", &i) == ENGINE_SUCCESS,
 +          "Failed to fail to store an item.");
 +    h1->release(h, NULL, i);
 +    verify_curr_items(h, h1, 1, "one item stored");
 +
 +    wait_for_flusher_to_settle(h, h1);
 +
 +    check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
 +          "Failed to set vbucket state.");
 +
 +    const void *cookie = testHarness.create_cookie();
 +    uint32_t opaque = 0;
 +    uint8_t cas = 0;
 +    uint16_t vbucket = 0;
 +    uint32_t flags = 0;
 +    uint64_t bySeqno = 10;
 +    uint64_t revSeqno = 0;
 +    const char *name = "unittest";
 +    uint16_t nname = strlen(name);
 +    uint32_t seqno = 0;
 +
 +    // Open an DCP connection
 +    check(h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname)
 +          == ENGINE_SUCCESS,
 +          "Failed dcp producer open connection.");
 +
 +    std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
 +    check(type.compare("consumer") == 0, "Consumer not found");
 +
 +    opaque = add_stream_for_consumer(h, h1, cookie, opaque, 0, 0,
 +                                     PROTOCOL_BINARY_RESPONSE_SUCCESS);
 +
 +    check(h1->dcp.snapshot_marker(h, cookie, opaque, 0, 10, 10, 1)
 +        == ENGINE_SUCCESS, "Failed to send snapshot marker");
 +
 +    // verify that we don't accept invalid opaque id's
 +    check(h1->dcp.deletion(h, cookie, opaque + 1, "key", 3, cas, vbucket,
 +                           bySeqno, revSeqno, NULL, 0) == ENGINE_KEY_ENOENT,
 +          "Failed to detect invalid DCP opaque value.");
 +
 +    // Consume an DCP deletion
 +    check(h1->dcp.deletion(h, cookie, opaque, "key", 3, cas, vbucket,
 +                           bySeqno, revSeqno, NULL, 0) == ENGINE_SUCCESS,
 +          "Failed dcp delete.");
 +
 +    wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0,
 +                        "dcp");
 +
 +    wait_for_stat_change(h, h1, "curr_items", 1);
 +    verify_curr_items(h, h1, 0, "one item deleted");
 +    testHarness.destroy_cookie(cookie);
 +
 +    return SUCCESS;
 +}
 +
 +static enum test_result test_dcp_consumer_delete_with_time_sync(
 +                                                        ENGINE_HANDLE *h,
 +                                                        ENGINE_HANDLE_V1 *h1) {
 +
 +    set_drift_counter_state(h, h1, 1000, 0x01);
 +
 +    // Store an item
 +    item *i = NULL;
 +    check(store(h, h1, NULL, OPERATION_ADD,"key", "value", &i) == ENGINE_SUCCESS,
 +          "Failed to fail to store an item.");
 +    h1->release(h, NULL, i);
 +    verify_curr_items(h, h1, 1, "one item stored");
 +
 +    wait_for_flusher_to_settle(h, h1);
 +
 +    check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
 +          "Failed to set vbucket state.");
 +
 +    const void *cookie = testHarness.create_cookie();
 +    uint32_t opaque = 0;
 +    uint8_t cas = 0x1;
 +    uint16_t vbucket = 0;
 +    uint32_t flags = 0;
 +    uint64_t bySeqno = 10;
 +    uint64_t revSeqno = 0;
 +    const char *name = "unittest";
 +    uint16_t nname = strlen(name);
 +    uint32_t seqno = 0;
 +
 +    // Open an DCP connection
 +    check(h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname)
 +          == ENGINE_SUCCESS,
 +          "Failed dcp producer open connection.");
 +
 +    std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
 +    check(type.compare("consumer") == 0, "Consumer not found");
 +
 +    opaque = add_stream_for_consumer(h, h1, cookie, opaque, 0, 0,
 +                                     PROTOCOL_BINARY_RESPONSE_SUCCESS);
 +
 +    check(h1->dcp.snapshot_marker(h, cookie, opaque, 0, 10, 10, 1)
 +        == ENGINE_SUCCESS, "Failed to send snapshot marker");
 +
 +    // Consume an DCP deletion
 +    int64_t adjusted_time1 = gethrtime() * 2;
 +    ExtendedMetaData *emd = new ExtendedMetaData(adjusted_time1, false);
 +    cb_assert(emd && emd->getStatus() == ENGINE_SUCCESS);
 +    std::pair<const char*, uint16_t> meta = emd->getExtMeta();
 +    check(h1->dcp.deletion(h, cookie, opaque, "key", 3, cas, vbucket,
 +                           bySeqno, revSeqno, meta.first, meta.second)
 +            == ENGINE_SUCCESS,
 +            "Failed dcp delete.");
 +    delete emd;
 +
 +    wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0,
 +                        "dcp");
 +
 +    wait_for_stat_change(h, h1, "curr_items", 1);
 +    verify_curr_items(h, h1, 0, "one item deleted");
 +    testHarness.destroy_cookie(cookie);
 +
 +    protocol_binary_request_header *request;
 +    int64_t adjusted_time2;
 +    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
 +                           NULL, 0, NULL, 0);
 +    h1->unknown_command(h, NULL, request, add_response);
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
 +            "Expected Success");
-     check(last_bodylen == sizeof(int64_t),
++    checkeq(sizeof(int64_t), last_body.size(),
 +            "Bodylen didn't match expected value");
-     memcpy(&adjusted_time2, last_body, last_bodylen);
++    memcpy(&adjusted_time2, last_body.data(), last_body.size());
 +    adjusted_time2 = ntohll(adjusted_time2);
 +
 +    /**
 +     * Check that adjusted_time2 is marginally greater than
 +     * adjusted_time1.
 +     */
 +    check(adjusted_time2 >= adjusted_time1,
 +            "Adjusted time after deletion: Not what is expected");
 +
 +    return SUCCESS;
 +}
  
  static enum test_result test_dcp_consumer_noop(ENGINE_HANDLE *h,
                                                 ENGINE_HANDLE_V1 *h1) {
@@@ -11635,409 -10072,79 +11640,409 @@@ static enum test_result test_del_meta_l
      return SUCCESS;
  }
  
 -static enum test_result test_observe_multi_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 -    // Create some vbuckets
 -    check(set_vbucket_state(h, h1, 1, vbucket_state_active), "Failed to set vbucket state.");
 +static enum test_result test_adjusted_time_apis(ENGINE_HANDLE *h,
 +                                                ENGINE_HANDLE_V1 *h1) {
  
 -    // Set some keys to observe
 -    item *it = NULL;
 -    uint64_t cas1, cas2, cas3;
 -    check(h1->allocate(h, NULL, &it, "key1", 4, 100, 0, 0,
 -          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
 -    check(h1->store(h, NULL, it, &cas1, OPERATION_SET, 0)== ENGINE_SUCCESS,
 -          "Set should work.");
 -    h1->release(h, NULL, it);
 +    int64_t adjusted_time1, adjusted_time2;
 +    protocol_binary_request_header *request;
  
 -    check(h1->allocate(h, NULL, &it, "key2", 4, 100, 0, 0,
 -          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
 -    check(h1->store(h, NULL, it, &cas2, OPERATION_SET, 1)== ENGINE_SUCCESS,
 -          "Set should work.");
 -    h1->release(h, NULL, it);
 +    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
 +                           NULL, 0, NULL, 0);
 +    h1->unknown_command(h, NULL, request, add_response);
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED,
 +            "Expected Not Supported, as Time sync hasn't been enabled yet");
  
 -    check(h1->allocate(h, NULL, &it, "key3", 4, 100, 0, 0,
 -          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
 -    check(h1->store(h, NULL, it, &cas3, OPERATION_SET, 1)== ENGINE_SUCCESS,
 -          "Set should work.");
 -    h1->release(h, NULL, it);
 +    set_drift_counter_state(h, h1, 1000, 0x01);
  
 -    wait_for_stat_to_be(h, h1, "ep_total_persisted", 3);
 +    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
 +                           NULL, 0, NULL, 0);
 +    h1->unknown_command(h, NULL, request, add_response);
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
 +            "Expected Success");
-     check(last_bodylen == sizeof(int64_t),
++    checkeq(sizeof(int64_t), last_body.size(),
 +            "Bodylen didn't match expected value");
-     memcpy(&adjusted_time1, last_body, last_bodylen);
++    memcpy(&adjusted_time1, last_body.data(), last_body.size());
 +    adjusted_time1 = ntohll(adjusted_time1);
  
 -    // Do observe
 -    std::map<std::string, uint16_t> obskeys;
 -    obskeys["key1"] = 0;
 -    obskeys["key2"] = 1;
 -    obskeys["key3"] = 1;
 -    observe(h, h1, obskeys);
 +    set_drift_counter_state(h, h1, 1000000, 0x01);
 +
 +    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
 +                           NULL, 0, NULL, 0);
 +    h1->unknown_command(h, NULL, request, add_response);
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
 +            "Expected Success");
-     check(last_bodylen == sizeof(int64_t),
++    checkeq(sizeof(int64_t), last_body.size(),
 +            "Bodylen didn't match expected value");
-     memcpy(&adjusted_time2, last_body, last_bodylen);
++    memcpy(&adjusted_time2, last_body.data(), last_body.size());
 +    adjusted_time2 = ntohll(adjusted_time2);
 +
 +    // adjusted_time2 should be greater than adjusted_time1 marginally
 +    // by adjusted_time1 + (difference in the 2 driftCounts set previously)
 +    check(adjusted_time2 >= adjusted_time1 + 999000,
 +            "Adjusted_time2: now what expected");
 +
 +    // Test sending adjustedTime with SetWithMeta
 +    ItemMetaData itm_meta;
 +    itm_meta.flags = 0xdeadbeef;
 +    itm_meta.exptime = 0;
 +    itm_meta.revSeqno = 10;
 +    itm_meta.cas = 0xdeadbeef;
 +    set_with_meta(h, h1, "key", 3, "value", 5, 0, &itm_meta, last_cas,
 +                  false, 0x00, true, adjusted_time2 * 2);
 +    wait_for_flusher_to_settle(h, h1);
 +
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
 +            "Expected a SUCCESS");
 +    check_key_value(h, h1, "key", "value", 5, 0);
 +
 +    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
 +            NULL, 0, NULL, 0);
 +    h1->unknown_command(h, NULL, request, add_response);
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
 +            "Expected Success");
-     check(last_bodylen == sizeof(int64_t),
++    checkeq(sizeof(int64_t), last_body.size(),
 +            "Bodylen didn't match expected value");
-     memcpy(&adjusted_time1, last_body, last_bodylen);
++    memcpy(&adjusted_time1, last_body.data(), last_body.size());
 +    adjusted_time1 = ntohll(adjusted_time1);
 +
 +    // Check that adjusted_time1 should be marginally greater than
 +    // adjusted_time2 * 2
 +    check(adjusted_time1 >= adjusted_time2 * 2,
 +            "Adjusted_time1: not what is expected");
 +
 +    // Test sending adjustedTime with DelWithMeta
 +    item *i = NULL;
 +    check(store(h, h1, NULL, OPERATION_SET, "key2", "value2", &i) == ENGINE_SUCCESS,
 +            "Failed set.");
 +    h1->release(h, NULL, i);
 +    del_with_meta(h, h1, "key2", 4, 0, &itm_meta, last_cas, false,
 +                  true, adjusted_time1 * 2);
 +    wait_for_flusher_to_settle(h, h1);
      check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
  
 -    // Check the result
 -    uint16_t vb;
 -    uint16_t keylen;
 -    char key[10];
 -    uint8_t persisted;
 -    uint64_t cas;
 +    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
 +            NULL, 0, NULL, 0);
 +    h1->unknown_command(h, NULL, request, add_response);
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
 +            "Expected Success");
-     check(last_bodylen == sizeof(int64_t),
++    checkeq(sizeof(int64_t), last_body.size(),
 +            "Bodylen didn't match expected value");
-     memcpy(&adjusted_time2, last_body, last_bodylen);
++    memcpy(&adjusted_time2, last_body.data(), last_body.size());
 +    adjusted_time2 = ntohll(adjusted_time2);
  
 -    memcpy(&vb, last_body.data(), sizeof(uint16_t));
 +    // Check that adjusted_time2 should be marginally greater than
 +    // adjusted_time1 * 2
 +    check(adjusted_time2 >= adjusted_time1 * 2,
 +            "Adjusted_time2: not what is expected");
 +
 +    return SUCCESS;
 +}
 +
 +// ------------------------------ end of XDCR unit tests -----------------------//
 +
 +static enum test_result test_observe_no_data(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 +    std::map<std::string, uint16_t> obskeys;
 +    observe(h, h1, obskeys);
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
 +    return SUCCESS;
 +}
 +
 +static enum test_result test_observe_seqno_basic_tests(ENGINE_HANDLE *h,
 +                                                       ENGINE_HANDLE_V1 *h1) {
 +    // Check observe seqno for vbucket with id 1
 +    check(set_vbucket_state(h, h1, 1, vbucket_state_active), "Failed to set vbucket state.");
 +
 +    //Check the output when there is no data in the vbucket
 +    uint64_t vb_uuid = get_ull_stat(h, h1, "vb_1:0:id", "failovers");
 +    uint64_t high_seqno = get_int_stat(h, h1, "vb_1:high_seqno", "vbucket-seqno");
 +    observe_seqno(h, h1, 1, vb_uuid);
 +
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
 +
 +    check_observe_seqno(false, 0, 1, vb_uuid, high_seqno, high_seqno);
 +
 +    //Add some mutations and verify the output
 +    int num_items = 10;
 +    for (int j = 0; j < num_items; ++j) {
 +        // Set an item
 +        item *it = NULL;
 +        std::stringstream ss;
 +        ss << "key" << j;
 +        uint64_t cas1;
 +        check(h1->allocate(h, NULL, &it, ss.str().c_str(), 4, 100, 0, 0,
 +              PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
 +        check(h1->store(h, NULL, it, &cas1, OPERATION_SET, 1)== ENGINE_SUCCESS,
 +              "Expected set to succeed");
 +        h1->release(h, NULL, it);
 +    }
 +
 +    wait_for_flusher_to_settle(h, h1);
 +
 +    int total_persisted = get_int_stat(h, h1, "ep_total_persisted");
 +    high_seqno = get_int_stat(h, h1, "vb_1:high_seqno", "vbucket-seqno");
 +
 +    check(total_persisted == num_items,
 +          "Expected ep_total_persisted equals the number of items");
 +
 +    observe_seqno(h, h1, 1, vb_uuid);
 +
 +    check_observe_seqno(false, 0, 1, vb_uuid, total_persisted, high_seqno);
 +    //Stop persistence. Add more mutations and check observe result
 +    stop_persistence(h, h1);
 +
 +    num_items = 20;
 +    for (int j = 10; j < num_items; ++j) {
 +        // Set an item
 +        item *it = NULL;
 +        std::stringstream ss;
 +        ss << "key" << j;
 +        uint64_t cas1;
 +        check(h1->allocate(h, NULL, &it, ss.str().c_str(), 5, 100, 0, 0,
 +              PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
 +        check(h1->store(h, NULL, it, &cas1, OPERATION_SET, 1)== ENGINE_SUCCESS,
 +              "Expected set to succeed");
 +        h1->release(h, NULL, it);
 +    }
 +
 +    high_seqno = get_int_stat(h, h1, "vb_1:high_seqno", "vbucket-seqno");
 +    observe_seqno(h, h1, 1, vb_uuid);
 +
 +    check_observe_seqno(false, 0, 1, vb_uuid, total_persisted, high_seqno);
 +    start_persistence(h, h1);
 +    wait_for_flusher_to_settle(h, h1);
 +    total_persisted = get_int_stat(h, h1, "ep_total_persisted");
 +
 +    observe_seqno(h, h1, 1, vb_uuid);
 +
 +    check_observe_seqno(false, 0, 1, vb_uuid, total_persisted, high_seqno);
 +    return SUCCESS;
 +}
 +
 +static enum test_result test_observe_seqno_failover(ENGINE_HANDLE *h,
 +                                                    ENGINE_HANDLE_V1 *h1) {
 +    int num_items = 10;
 +    for (int j = 0; j < num_items; ++j) {
 +        // Set an item
 +        item *it = NULL;
 +        std::stringstream ss;
 +        ss << "key" << j;
 +        uint64_t cas1;
 +        check(h1->allocate(h, NULL, &it, ss.str().c_str(), 4, 100, 0, 0,
 +              PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
 +        check(h1->store(h, NULL, it, &cas1, OPERATION_SET, 0)== ENGINE_SUCCESS,
 +              "Expected set to succeed");
 +        h1->release(h, NULL, it);
 +    }
 +
 +    wait_for_flusher_to_settle(h, h1);
 +
 +    uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
 +    uint64_t high_seqno = get_int_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
 +
 +    // restart
 +    testHarness.reload_engine(&h, &h1,
 +                              testHarness.engine_path,
 +                              testHarness.get_current_testcase()->cfg,
 +                              true, true);
 +    wait_for_warmup_complete(h, h1);
 +
 +    uint64_t new_vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
 +
 +    observe_seqno(h, h1, 0, vb_uuid);
 +
 +    check_observe_seqno(true, 1, 0, new_vb_uuid, high_seqno, high_seqno,
 +                        vb_uuid, high_seqno);
 +
 +    return SUCCESS;
 +}
 +
 +static enum test_result test_observe_seqno_error(ENGINE_HANDLE *h,
 +                                                 ENGINE_HANDLE_V1 *h1) {
 +
 +    //not my vbucket test
 +    uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
 +    observe_seqno(h, h1, 10, vb_uuid);
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET,
 +          "Expected not my vbucket");
 +
 +    //invalid uuid for vbucket
 +    vb_uuid = 0xdeadbeef;
 +    std::stringstream invalid_data;
 +    invalid_data.write((char *) &vb_uuid, sizeof(uint64_t));
 +
 +    protocol_binary_request_header *request;
 +
 +    request = createPacket(PROTOCOL_BINARY_CMD_OBSERVE_SEQNO, 0, 0, NULL, 0,
 +                           NULL, 0, invalid_data.str().data(),
 +                           invalid_data.str().length());
 +    h1->unknown_command(h, NULL, request, add_response);
 +
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
 +          "Expected vb uuid not found");
 +
 +    return SUCCESS;
 +}
 +
 +static enum test_result test_observe_single_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 +    stop_persistence(h, h1);
 +
 +    // Set an item
 +    item *it = NULL;
 +    uint64_t cas1;
 +    check(h1->allocate(h, NULL, &it, "key", 3, 100, 0, 0,
 +          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
 +    check(h1->store(h, NULL, it, &cas1, OPERATION_SET, 0)== ENGINE_SUCCESS,
 +          "Set should work.");
 +    h1->release(h, NULL, it);
 +
 +    // Do an observe
 +    std::map<std::string, uint16_t> obskeys;
 +    obskeys["key"] = 0;
 +    observe(h, h1, obskeys);
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
 +
 +    // Check that the key is not persisted
 +    uint16_t vb;
 +    uint16_t keylen;
 +    char key[3];
 +    uint8_t persisted;
 +    uint64_t cas;
 +
-     memcpy(&vb, last_body, sizeof(uint16_t));
++    memcpy(&vb, last_body.data(), sizeof(uint16_t));
 +    check(ntohs(vb) == 0, "Wrong vbucket in result");
-     memcpy(&keylen, last_body + 2, sizeof(uint16_t));
++    memcpy(&keylen, last_body.data() + 2, sizeof(uint16_t));
 +    check(ntohs(keylen) == 3, "Wrong keylen in result");
-     memcpy(&key, last_body + 4, ntohs(keylen));
++    memcpy(&key, last_body.data() + 4, ntohs(keylen));
 +    check(strncmp(key, "key", 3) == 0, "Wrong key in result");
-     memcpy(&persisted, last_body + 7, sizeof(uint8_t));
++    memcpy(&persisted, last_body.data() + 7, sizeof(uint8_t));
 +    check(persisted == OBS_STATE_NOT_PERSISTED, "Expected persisted in result");
-     memcpy(&cas, last_body + 8, sizeof(uint64_t));
++    memcpy(&cas, last_body.data() + 8, sizeof(uint64_t));
 +    check(ntohll(cas) == cas1, "Wrong cas in result");
 +
 +    return SUCCESS;
 +}
 +
 +static enum test_result test_observe_temp_item(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 +    char const *k1 = "key";
 +    item *i = NULL;
 +
 +    check(store(h, h1, NULL, OPERATION_SET, k1, "somevalue", &i) == ENGINE_SUCCESS,
 +          "Failed set.");
 +    h1->release(h, NULL, i);
 +    wait_for_flusher_to_settle(h, h1);
 +
 +    check(del(h, h1, k1, 0, 0) == ENGINE_SUCCESS, "Delete failed");
 +    wait_for_flusher_to_settle(h, h1);
 +    wait_for_stat_to_be(h, h1, "curr_items", 0);
 +
 +    check(get_meta(h, h1, k1), "Expected to get meta");
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
 +    check(last_deleted_flag, "Expected deleted flag to be set");
 +    check(get_int_stat(h, h1, "curr_items") == 0, "Expected zero curr_items");
 +
 +    // Make sure there is one temp_item
 +    check(get_int_stat(h, h1, "curr_temp_items") == 1, "Expected single temp_items");
 +
 +    // Do an observe
 +    std::map<std::string, uint16_t> obskeys;
 +    obskeys["key"] = 0;
 +    observe(h, h1, obskeys);
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
 +
 +    // Check that the key is not found
 +    uint16_t vb;
 +    uint16_t keylen;
 +    char key[3];
 +    uint8_t persisted;
 +    uint64_t cas;
 +
-     memcpy(&vb, last_body, sizeof(uint16_t));
++    memcpy(&vb, last_body.data(), sizeof(uint16_t));
 +    check(ntohs(vb) == 0, "Wrong vbucket in result");
-     memcpy(&keylen, last_body + 2, sizeof(uint16_t));
++    memcpy(&keylen, last_body.data() + 2, sizeof(uint16_t));
 +    check(ntohs(keylen) == 3, "Wrong keylen in result");
-     memcpy(&key, last_body + 4, ntohs(keylen));
++    memcpy(&key, last_body.data() + 4, ntohs(keylen));
 +    check(strncmp(key, "key", 3) == 0, "Wrong key in result");
-     memcpy(&persisted, last_body + 7, sizeof(uint8_t));
++    memcpy(&persisted, last_body.data() + 7, sizeof(uint8_t));
 +    check(persisted == OBS_STATE_NOT_FOUND, "Expected NOT_FOUND in result");
-     memcpy(&cas, last_body + 8, sizeof(uint64_t));
++    memcpy(&cas, last_body.data() + 8, sizeof(uint64_t));
 +    check(ntohll(cas) == 0, "Wrong cas in result");
 +
 +    return SUCCESS;
 +}
 +
 +static enum test_result test_observe_multi_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 +    // Create some vbuckets
 +    check(set_vbucket_state(h, h1, 1, vbucket_state_active), "Failed to set vbucket state.");
 +
 +    // Set some keys to observe
 +    item *it = NULL;
 +    uint64_t cas1, cas2, cas3;
 +    check(h1->allocate(h, NULL, &it, "key1", 4, 100, 0, 0,
 +          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
 +    check(h1->store(h, NULL, it, &cas1, OPERATION_SET, 0)== ENGINE_SUCCESS,
 +          "Set should work.");
 +    h1->release(h, NULL, it);
 +
 +    check(h1->allocate(h, NULL, &it, "key2", 4, 100, 0, 0,
 +          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
 +    check(h1->store(h, NULL, it, &cas2, OPERATION_SET, 1)== ENGINE_SUCCESS,
 +          "Set should work.");
 +    h1->release(h, NULL, it);
 +
 +    check(h1->allocate(h, NULL, &it, "key3", 4, 100, 0, 0,
 +          PROTOCOL_BINARY_RAW_BYTES)== ENGINE_SUCCESS, "Allocation failed.");
 +    check(h1->store(h, NULL, it, &cas3, OPERATION_SET, 1)== ENGINE_SUCCESS,
 +          "Set should work.");
 +    h1->release(h, NULL, it);
 +
 +    wait_for_stat_to_be(h, h1, "ep_total_persisted", 3);
 +
 +    // Do observe
 +    std::map<std::string, uint16_t> obskeys;
 +    obskeys["key1"] = 0;
 +    obskeys["key2"] = 1;
 +    obskeys["key3"] = 1;
 +    observe(h, h1, obskeys);
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
 +
 +    // Check the result
 +    uint16_t vb;
 +    uint16_t keylen;
 +    char key[10];
 +    uint8_t persisted;
 +    uint64_t cas;
 +
-     memcpy(&vb, last_body, sizeof(uint16_t));
++    memcpy(&vb, last_body.data(), sizeof(uint16_t));
      check(ntohs(vb) == 0, "Wrong vbucket in result");
-     memcpy(&keylen, last_body + 2, sizeof(uint16_t));
+     memcpy(&keylen, last_body.data() + 2, sizeof(uint16_t));
      check(ntohs(keylen) == 4, "Wrong keylen in result");
-     memcpy(&key, last_body + 4, ntohs(keylen));
+     memcpy(&key, last_body.data() + 4, ntohs(keylen));
      check(strncmp(key, "key1", 4) == 0, "Wrong key in result");
-     memcpy(&persisted, last_body + 8, sizeof(uint8_t));
+     memcpy(&persisted, last_body.data() + 8, sizeof(uint8_t));
      check(persisted == OBS_STATE_PERSISTED, "Expected persisted in result");
-     memcpy(&cas, last_body + 9, sizeof(uint64_t));
+     memcpy(&cas, last_body.data() + 9, sizeof(uint64_t));
      check(ntohll(cas) == cas1, "Wrong cas in result");
  
-     memcpy(&vb, last_body + 17, sizeof(uint16_t));
+     memcpy(&vb, last_body.data() + 17, sizeof(uint16_t));
      check(ntohs(vb) == 1, "Wrong vbucket in result");
-     memcpy(&keylen, last_body + 19, sizeof(uint16_t));
+     memcpy(&keylen, last_body.data() + 19, sizeof(uint16_t));
      check(ntohs(keylen) == 4, "Wrong keylen in result");
-     memcpy(&key, last_body + 21, ntohs(keylen));
+     memcpy(&key, last_body.data() + 21, ntohs(keylen));
      check(strncmp(key, "key2", 4) == 0, "Wrong key in result");
-     memcpy(&persisted, last_body + 25, sizeof(uint8_t));
+     memcpy(&persisted, last_body.data() + 25, sizeof(uint8_t));
      check(persisted == OBS_STATE_PERSISTED, "Expected persisted in result");
-     memcpy(&cas, last_body + 26, sizeof(uint64_t));
+     memcpy(&cas, last_body.data() + 26, sizeof(uint64_t));
      check(ntohll(cas) == cas2, "Wrong cas in result");
  
-     memcpy(&vb, last_body + 34, sizeof(uint16_t));
+     memcpy(&vb, last_body.data() + 34, sizeof(uint16_t));
      check(ntohs(vb) == 1, "Wrong vbucket in result");
-     memcpy(&keylen, last_body + 36, sizeof(uint16_t));
+     memcpy(&keylen, last_body.data() + 36, sizeof(uint16_t));
      check(ntohs(keylen) == 4, "Wrong keylen in result");
-     memcpy(&key, last_body + 38, ntohs(keylen));
+     memcpy(&key, last_body.data() + 38, ntohs(keylen));
      check(strncmp(key, "key3", 4) == 0, "Wrong key in result");
-     memcpy(&persisted, last_body + 42, sizeof(uint8_t));
+     memcpy(&persisted, last_body.data() + 42, sizeof(uint8_t));
      check(persisted == OBS_STATE_PERSISTED, "Expected persisted in result");
-     memcpy(&cas, last_body + 43, sizeof(uint64_t));
+     memcpy(&cas, last_body.data() + 43, sizeof(uint64_t));
      check(ntohll(cas) == cas3, "Wrong cas in result");
  
      return SUCCESS;
@@@ -13758,89 -11611,6 +13761,89 @@@ static enum test_result test_defragment
      testHarness.destroy_cookie(cookie);
      return SUCCESS;
  }
-     check(last_bodylen == sizeof(int64_t),
 +#endif // defined(HAVE_JEMALLOC)
 +
 +static enum test_result test_hlc_cas(ENGINE_HANDLE *h,
 +                                     ENGINE_HANDLE_V1 *h1) {
 +    const char *key = "key";
 +    item *i = NULL;
 +    item_info info;
 +    uint64_t curr_cas = 0, prev_cas = 0;
 +
 +    memset(&info, 0, sizeof(info));
 +
 +    //enabled time sync
 +    set_drift_counter_state(h, h1, 100000, true);
 +    check(store(h, h1, NULL, OPERATION_ADD, key, "data1", &i, 0, 0)
 +          == ENGINE_SUCCESS, "Failed to store an item");
 +    h1->release(h, NULL, i);
 +
 +    check(get_item_info(h, h1, &info, key), "Error in getting item info");
 +    curr_cas = info.cas;
 +    check(curr_cas > prev_cas, "CAS is not monotonically increasing");
 +    prev_cas = curr_cas;
 +
 +    //set a lesser drift and ensure that the CAS is monotonically
 +    //increasing
 +    set_drift_counter_state(h, h1, 100, true);
 +
 +    check(store(h, h1, NULL, OPERATION_SET, key, "data2", &i, 0, 0)
 +          == ENGINE_SUCCESS, "Failed to store an item");
 +    h1->release(h, NULL, i);
 +
 +    check(get_item_info(h, h1, &info, key), "Error getting item info");
 +    curr_cas = info.cas;
 +    check(curr_cas > prev_cas, "CAS is not monotonically increasing");
 +    prev_cas = curr_cas;
 +
 +    //ensure that the adjusted time will be negative
 +    int64_t drift_counter = (-1) * (gethrtime() + 100000);
 +    set_drift_counter_state(h, h1, drift_counter, true);
 +
 +    protocol_binary_request_header *request;
 +    int64_t adjusted_time;
 +    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
 +                           NULL, 0, NULL, 0);
 +    h1->unknown_command(h, NULL, request, add_response);
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
 +            "Expected Success");
-     memcpy(&adjusted_time, last_body, last_bodylen);
++    checkeq(sizeof(int64_t), last_body.size(),
 +            "Bodylen didn't match expected value");
++    memcpy(&adjusted_time, last_body.data(), last_body.size());
 +    adjusted_time = ntohll(adjusted_time);
 +    check(adjusted_time < 0, "Adjusted time is supposed to negative");
 +
 +    check(store(h, h1, NULL, OPERATION_REPLACE, key, "data3", &i, 0, 0)
 +          == ENGINE_SUCCESS, "Failed to store an item");
 +    h1->release(h, NULL, i);
 +
 +    check(get_item_info(h, h1, &info, key), "Error in getting item info");
 +    curr_cas = info.cas;
 +    check(curr_cas > prev_cas, "CAS is not monotonically increasing");
 +    prev_cas = curr_cas;
 +
 +    //disable time sync
 +    set_drift_counter_state(h, h1, 0, false);
 +
 +    getl(h, h1, key, 0, 10);
 +    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
 +          "Expected to be able to getl on first try");
 +    curr_cas = last_cas;
 +    check(curr_cas > prev_cas, "CAS is not monotonically increasing");
 +    prev_cas = curr_cas;
 +
 +    uint64_t result = 0;
 +    check(h1->arithmetic(h, NULL, "key2", 4, true, true, 1, 1, 0,
 +                         &i, PROTOCOL_BINARY_RAW_BYTES, &result, 0)
 +                         == ENGINE_SUCCESS, "Failed arithmetic operation");
 +    h1->release(h, NULL, i);
 +
 +    check(get_item_info(h, h1, &info, "key2"), "Error in getting item info");
 +    curr_cas = info.cas;
 +    check(curr_cas > prev_cas, "CAS is not monotonically increasing");
 +
 +    return SUCCESS;
 +}
  
  static enum test_result test_failover_log_dcp(ENGINE_HANDLE *h,
                                                ENGINE_HANDLE_V1 *h1) {
@@@ -14106,348 -11781,6 +14109,348 @@@ static enum test_result test_mb16357(EN
      return SUCCESS;
  }
  
-     checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status,
 +// Regression test for MB-17517 - ensure that if an item is locked when TAP
 +// attempts to stream it, it doesn't get a CAS of -1.
 +static enum test_result test_mb17517_tap_with_locked_key(ENGINE_HANDLE *h,
 +                                                         ENGINE_HANDLE_V1 *h1) {
 +    const uint16_t vbid = 0;
 +    // Store an item and immediately lock it.
 +    item *it = NULL;
 +    std::string key("key");
 +    checkeq(store(h, h1, NULL, OPERATION_SET, key.c_str(), "value",
 +                  &it, 0, vbid, 3600, PROTOCOL_BINARY_RAW_BYTES),
 +            ENGINE_SUCCESS,
 +            "Failed to store an item.");
 +    h1->release(h, NULL, it);
 +
 +    uint32_t lock_timeout = 10;
 +    getl(h, h1, key.c_str(), vbid, lock_timeout);
++    checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
 +            "Expected to be able to getl on first try");
 +
 +    wait_for_flusher_to_settle(h, h1);
 +
 +    // Create the TAP connection and try to get the items.
 +    const void *cookie = testHarness.create_cookie();
 +    std::string name("test_mb17517_tap_with_locked_key");
 +    TAP_ITERATOR iter = h1->get_tap_iterator(h, cookie, name.c_str(),
 +                                             name.length(),
 +                                             TAP_CONNECT_FLAG_DUMP, NULL, 0);
 +    check(iter != NULL, "Failed to create a tap iterator");
 +
 +    void *engine_specific;
 +    uint16_t nengine_specific;
 +    uint8_t ttl;
 +    uint16_t flags;
 +    uint32_t seqno;
 +    uint16_t vbucket;
 +    tap_event_t event;
 +
 +    uint16_t unlikely_vbucket_identifier = 17293;
 +
 +    do {
 +        vbucket = unlikely_vbucket_identifier;
 +        event = iter(h, cookie, &it, &engine_specific,
 +                     &nengine_specific, &ttl, &flags,
 +                     &seqno, &vbucket);
 +
 +        switch (event) {
 +        case TAP_PAUSE:
 +            testHarness.waitfor_cookie(cookie);
 +            break;
 +        case TAP_OPAQUE:
 +        case TAP_NOOP:
 +            break;
 +        case TAP_MUTATION: {
 +            testHarness.unlock_cookie(cookie);
 +
 +            item_info info;
 +            info.nvalue = 1;
 +            if (!h1->get_item_info(h, NULL, it, &info)) {
 +                fprintf(stderr, "test_mb17517_tap_with_locked_key: "
 +                        "get_item_info failed\n");
 +                return FAIL;
 +            }
 +
 +            // Check the CAS.
 +            if (info.cas == ~0ull) {
 +                fprintf(stderr, "test_mb17517_tap_with_locked_key: "
 +                        "Got CAS of -1 in TAP_MUTATION\n");
 +                return FAIL;
 +            }
 +
 +            testHarness.lock_cookie(cookie);
 +            break;
 +        }
 +        case TAP_DISCONNECT:
 +            break;
 +        default:
 +            std::cerr << "Unexpected event:  " << event << std::endl;
 +            return FAIL;
 +        }
 +
 +    } while (event != TAP_DISCONNECT);
 +
 +    testHarness.unlock_cookie(cookie);
 +    testHarness.destroy_cookie(cookie);
 +
 +    return SUCCESS;
 +}
 +
 +static void force_vbstate_to_25x(std::string dbname, int vbucket) {
 +    std::string filename = dbname +
 +                           DIRECTORY_SEPARATOR_CHARACTER +
 +                           std::to_string(vbucket) +
 +                           ".couch.1";
 +    Db* handle;
 +    couchstore_error_t err = couchstore_open_db(filename.c_str(),
 +                                                COUCHSTORE_OPEN_FLAG_CREATE,
 +                                                &handle);
 +
 +    checkeq(COUCHSTORE_SUCCESS, err, "Failed to open new database");
 +
 +    // Create 2.5 _local/vbstate
 +    std::string vbstate2_5_x ="{\"state\": \"active\","
 +                              " \"checkpoint_id\": \"1\","
 +                              " \"max_deleted_seqno\": \"0\"}";
 +    LocalDoc vbstate;
 +    vbstate.id.buf = (char *)"_local/vbstate";
 +    vbstate.id.size = sizeof("_local/vbstate") - 1;
 +    vbstate.json.buf = (char *)vbstate2_5_x.c_str();
 +    vbstate.json.size = vbstate2_5_x.size();
 +    vbstate.deleted = 0;
 +
 +    err = couchstore_save_local_document(handle, &vbstate);
 +    checkeq(COUCHSTORE_SUCCESS, err, "Failed to write local document");
 +    couchstore_commit(handle);
 +    couchstore_close_db(handle);
 +}
 +
 +// Regression test for MB-19635
 +// Check that warming up from a 2.x couchfile doesn't end up with a UUID of 0
 +// we warmup 2 vbuckets and ensure they get unique IDs.
 +static enum test_result test_mb19635_upgrade_from_25x(ENGINE_HANDLE *h,
 +                                                      ENGINE_HANDLE_V1 *h1) {
 +    std::string dbname = dbname_env;
 +
 +    force_vbstate_to_25x(dbname, 0);
 +    force_vbstate_to_25x(dbname, 1);
 +
 +    // Now shutdown engine force and restart to warmup from the 2.5.x data.
 +    testHarness.reload_engine(&h, &h1,
 +                              testHarness.engine_path,
 +                              testHarness.get_current_testcase()->cfg,
 +                              true, false);
 +    wait_for_warmup_complete(h, h1);
 +    uint64_t vb_uuid0 = get_ull_stat(h, h1, "vb_0:uuid", "vbucket-details");
 +    uint64_t vb_uuid1 = get_ull_stat(h, h1, "vb_1:uuid", "vbucket-details");
 +    checkne(vb_uuid0, vb_uuid1, "UUID is not unique");
 +    return SUCCESS;
 +}
 +
 +static enum test_result test_set_dcp_param(ENGINE_HANDLE *h,
 +                                           ENGINE_HANDLE_V1 *h1)
 +{
 +    auto func = [h, h1](std::string key, size_t newValue, bool expectedSetParam){
 +        std::string statKey = "ep_" + key;
 +        size_t param = get_int_stat(h,
 +                                    h1,
 +                                    statKey.c_str());
 +        std::string value = std::to_string(newValue);
 +        check(expectedSetParam == set_param(h, h1,
 +                                            protocol_binary_engine_param_dcp,
 +                                            key.c_str(),
 +                                            value.c_str()),
 +                "Set param not expected");
 +        check(newValue != param,
 +              "Forcing failure as nothing will change");
 +
 +        if (expectedSetParam) {
 +            checkeq(newValue,
 +                    size_t(get_int_stat(h,
 +                                        h1,
 +                                        statKey.c_str())),
 +                "Incorrect dcp param value after calling set_param");
 +        }
 +    };
 +
 +    func("dcp_consumer_process_buffered_messages_yield_limit", 1000, true);
 +    func("dcp_consumer_process_buffered_messages_batch_size", 1000, true);
 +    func("dcp_consumer_process_buffered_messages_yield_limit", 0, false);
 +    func("dcp_consumer_process_buffered_messages_batch_size", 0, false);
 +    return SUCCESS;
 +}
 +
 +
 +/*
 + * Test MB-18452
 + * Drive DCP consumer by halting all NONIO tasks
 + * Writing numItems mutations (they get buffered)
 + * Then trigger the NONIO tasks, which will trigger the DCP consumer
 + *  to consume the buffered items.
 + * If the DCP consumer is friendly and not hogging the NONIO threads
 + * we should see it being scheduled many times.
 + * This test function returns the number of times the processor task was
 + * dispatched.
 + */
 +static int test_mb18452(ENGINE_HANDLE *h,
 +                        ENGINE_HANDLE_V1 *h1,
 +                        size_t numItems,
 +                        size_t yieldValue,
 +                        size_t batchSize) {
 +
 +    // 1. Setup the consumer params.
 +    std::string value = std::to_string(yieldValue);
 +    set_param(h, h1, protocol_binary_engine_param_dcp,
 +              "dcp_consumer_process_buffered_messages_yield_limit",
 +              value.c_str());
 +    value = std::to_string(batchSize);
 +    set_param(h, h1, protocol_binary_engine_param_dcp,
 +              "dcp_consumer_process_buffered_messages_batch_size",
 +              value.c_str());
 +
 +    const uint16_t vbid = 0;
 +    const uint32_t opaque = 0xFFFF0000;
 +    const uint32_t flags = 0;
 +    const void* cookie = testHarness.create_cookie();
 +
 +    // 2. We need to use a replica
 +    check(set_vbucket_state(h, h1, vbid, vbucket_state_replica),
 +          "Failed to set vbucket state.");
 +
 +    // 3. Force the engine to not run any NONIO tasks whilst we 'load up'
 +    set_param(h, h1, protocol_binary_engine_param_flush,
 +              "max_num_nonio",
 +              "0");
 +
 +    // 4. Create a consumer and one stream for the vbucket
 +    std::string consumer("unittest");
 +    checkeq(h1->dcp.open(h,
 +                         cookie,
 +                         opaque,
 +                         0/*seqno*/,
 +                         flags,
 +                         (void*)consumer.c_str(),
 +                         consumer.length()),
 +            ENGINE_SUCCESS,
 +            "Failed dcp Consumer open connection.");
 +    add_stream_for_consumer(h, h1, cookie, opaque + 1, vbid, flags,
 +                            PROTOCOL_BINARY_RESPONSE_SUCCESS);
 +
 +    uint32_t stream_opaque = get_int_stat(h, h1,
 +                                          "eq_dcpq:unittest:stream_0_opaque",
 +                                          "dcp");
 +    checkeq(ENGINE_SUCCESS,
 +            h1->dcp.snapshot_marker(h,
 +                                    cookie,
 +                                    stream_opaque,
 +                                    vbid,
 +                                    1,//snap start
 +                                    numItems,//snap end
 +                                    2), //flags
 +            "Failed to send snapshot marker");
 +
 +    for (uint64_t seqno = 1; seqno <= numItems; seqno++) {
 +        std::string key = "key" + std::to_string(seqno);
 +        checkeq(ENGINE_SUCCESS,
 +                h1->dcp.mutation(h,
 +                                 cookie,
 +                                 stream_opaque,
 +                                 key.c_str(),
 +                                 key.length(),
 +                                 "value", // item value
 +                                 sizeof("value"), // item value length
 +                                 seqno, // cas
 +                                 vbid, // vbucket
 +                                 0, // flags
 +                                 PROTOCOL_BINARY_RAW_BYTES,
 +                                 seqno, // bySeqno
 +                                 1, // revSeqno
 +                                 0, // expiration
 +                                 0, // locktime
 +                                 "", //meta
 +                                 0, // metalen
 +                                 INITIAL_NRU_VALUE),
 +                "Failed to send dcp mutation");
 +
 +        // At n - 1, enable NONIO tasks, the nth mutation will wake up the task.
 +        if (seqno == (numItems - 1)) {
 +               set_param(h, h1, protocol_binary_engine_param_flush,
 +              "max_num_nonio",
 +              "1");
 +        }
 +    }
 +
 +    wait_for_stat_to_be(h, h1, "vb_replica_curr_items", numItems);
 +
 +    // 3. Force the engine to not run any NONIO tasks whilst we 'count up'
 +    set_param(h, h1, protocol_binary_engine_param_flush,
 +              "max_num_nonio",
 +              "0");
 +
 +    // Now we should count how many times the NONIO task ran
 +    // This is slighly racy, but if the task is yielding we expect many
 +    // runs from it, not a small number
 +    check(h1->get_stats(h, NULL, "dispatcher",
 +                        strlen("dispatcher"), add_stats) == ENGINE_SUCCESS,
 +                        "Failed to get worker stats");
 +
 +    // Count up how many times the Processing task was logged
 +    int count = 0;
 +    const std::string key1 = "nonio_worker_";
 +    const std::string key2 = "Processing buffered items for eq_dcpq:unittest";
 +    for (auto kv : vals) {
 +        if (kv.first.find(key1) != std::string::npos &&
 +            kv.second.find(key2) != std::string::npos) {
 +            count++;
 +        }
 +    }
 +
 +    // 4. Re-enable NONIO so we can shutdown
 +    set_param(h, h1, protocol_binary_engine_param_flush,
 +              "max_num_nonio",
 +              "1");
 +    return count;
 +}
 +
 +/**
 + * Test the behaviour of DCP consumer under load.
 + * The consumer use a NONIO task to process data from an input buffer.
 + * This task when given lots of data should voluntarily yield if it finds
 + * itself running for n iterations...
 + */
 +static enum test_result test_mb18452_smallYield(ENGINE_HANDLE* h,
 +                                                 ENGINE_HANDLE_V1* h1) {
 +    const int batchSize = 10;
 +    const int numItems = 1000;
 +    const int yield = 10;
 +
 +    int processorRuns = test_mb18452(h, h1, numItems, yield, batchSize);
 +
 +    // Before the ep-engine updates, the processor run count was usually 1 or 2
 +    // with the fix it's up around 80 (appears to saturate the log).
 +
 +    // So we check that it ran the same or more times than the numItems/(yield*batch)
 +    check(processorRuns >= (numItems / (yield * batchSize)),
 +          "DCP Processor ran less times than expected.");
 +    return SUCCESS;
 +}
 +
 +static enum test_result test_mb18452_largeYield(ENGINE_HANDLE* h,
 +                                                ENGINE_HANDLE_V1* h1) {
 +    const int batchSize = 10;
 +    const int numItems = 10000;
 +    const int yield = 10000;
 +    int processorRuns =  test_mb18452(h, h1, numItems, yield, batchSize);
 +    // Here we expect very few yields, so very few runs (definitely not enough to fill
 +    // the task log (TASK_LOG_SIZE)
 +    check(processorRuns < 80,
 +          "DCP Processor ran more times than expected.");
 +
 +
 +    return SUCCESS;
 +}
 +
  /**
   * This test demonstrates bucket shutdown when there is a rogue
   * backfill (whose producer and stream are already closed).
Simple merge