Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / tests / ep_test_apis.cc
index 2dbd6df..ffb88c7 100644 (file)
@@ -32,8 +32,6 @@
 #define check(expr, msg) \
     static_cast<void>((expr) ? 0 : abort_msg(#expr, msg, __LINE__))
 
-extern "C" bool abort_msg(const char *expr, const char *msg, int line);
-
 std::map<std::string, std::string> vals;
 bool dump_stats = false;
 AtomicValue<protocol_binary_response_status> last_status(
@@ -41,9 +39,12 @@ AtomicValue<protocol_binary_response_status> last_status(
 std::string last_key;
 std::string last_body;
 bool last_deleted_flag(false);
+AtomicValue<uint8_t> last_conflict_resolution_mode{0xff};
 AtomicValue<uint64_t> last_cas(0);
 AtomicValue<uint8_t> last_datatype(0x00);
 ItemMetaData last_meta;
+uint64_t last_uuid = 0;
+uint64_t last_seqno = 0;
 
 extern "C" bool add_response_get_meta(const void *key, uint16_t keylen,
                                       const void *ext, uint8_t extlen,
@@ -98,7 +99,29 @@ bool add_response_get_meta(const void *key, uint16_t keylen, const void *ext,
         memcpy(&last_meta.revSeqno, ext_bytes + 12, 8);
         last_meta.revSeqno = ntohll(last_meta.revSeqno);
         last_meta.cas = cas;
+        if (extlen > 20) {
+            memcpy(&last_conflict_resolution_mode, ext_bytes + 20, 1);
+        }
+    }
+    return add_response(key, keylen, ext, extlen, body, bodylen, datatype,
+                        status, cas, cookie);
+}
+
+bool add_response_set_del_meta(const void *key, uint16_t keylen, const void *ext,
+                               uint8_t extlen, const void *body, uint32_t bodylen,
+                               uint8_t datatype, uint16_t status, uint64_t cas,
+                               const void *cookie) {
+    (void)cookie;
+    const uint8_t* ext_bytes = reinterpret_cast<const uint8_t*> (ext);
+    if (ext && extlen > 0) {
+        uint64_t vb_uuid;
+        uint64_t seqno;
+        memcpy(&vb_uuid, ext_bytes, 8);
+        memcpy(&seqno, ext_bytes + 8, 8);
+        last_uuid = ntohll(vb_uuid);
+        last_seqno = ntohll(seqno);
     }
+
     return add_response(key, keylen, ext, extlen, body, bodylen, datatype,
                         status, cas, cookie);
 }
@@ -160,10 +183,12 @@ protocol_binary_request_header* createPacket(uint8_t opcode,
                                              uint32_t keylen,
                                              const char *val,
                                              uint32_t vallen,
-                                             uint8_t datatype) {
+                                             uint8_t datatype,
+                                             const char *meta,
+                                             uint16_t nmeta) {
     char *pkt_raw;
     uint32_t headerlen = sizeof(protocol_binary_request_header);
-    pkt_raw = static_cast<char*>(calloc(1, headerlen + extlen + keylen + vallen));
+    pkt_raw = static_cast<char*>(calloc(1, headerlen + extlen + keylen + vallen + nmeta));
     cb_assert(pkt_raw);
     protocol_binary_request_header *req =
         (protocol_binary_request_header*)pkt_raw;
@@ -171,7 +196,7 @@ protocol_binary_request_header* createPacket(uint8_t opcode,
     req->request.keylen = htons(keylen);
     req->request.extlen = extlen;
     req->request.vbucket = htons(vbid);
-    req->request.bodylen = htonl(keylen + vallen + extlen);
+    req->request.bodylen = htonl(keylen + vallen + extlen + nmeta);
     req->request.cas = htonll(cas);
     req->request.datatype = datatype;
 
@@ -187,26 +212,72 @@ protocol_binary_request_header* createPacket(uint8_t opcode,
         memcpy(pkt_raw + headerlen + extlen + keylen, val, vallen);
     }
 
+    // Extended meta: To be used for set_with_meta/del_with_meta/add_with_meta
+    if (meta && nmeta > 0) {
+        memcpy(pkt_raw + headerlen + extlen + keylen + vallen,
+               meta, nmeta);
+    }
+
     return req;
 }
 
+void set_drift_counter_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+                             int64_t initialDriftCount, uint8_t timeSync) {
+
+    protocol_binary_request_header *request;
+
+    int64_t driftCount = htonll(initialDriftCount);
+    uint8_t extlen = sizeof(driftCount) + sizeof(timeSync);
+    char *ext = new char[extlen];
+    memcpy(ext, (char*)&driftCount, sizeof(driftCount));
+    memcpy(ext + sizeof(driftCount), (char*)&timeSync, sizeof(timeSync));
+
+    request = createPacket(PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE,
+                           0, 0, ext, extlen);
+    h1->unknown_command(h, NULL, request, add_response);
+    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
+            "Expected success for CMD_SET_DRIFT_COUNTER_STATE");
+    delete[] ext;
+}
+
 void add_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                    const size_t keylen, const char *val, const size_t vallen,
                    const uint32_t vb, ItemMetaData *itemMeta,
-                   bool skipConflictResolution) {
-    int blen = skipConflictResolution ? 28 : 24;
-    char *ext = new char[blen];
-    encodeWithMetaExt(ext, itemMeta);
-
-    if (skipConflictResolution) {
-        uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
-        flag = htonl(flag);
-        memcpy(ext + 24, (char*)&flag, sizeof(flag));
+                   bool skipConflictResolution, uint8_t datatype,
+                   bool includeExtMeta, int64_t adjustedTime) {
+    int blen = 0;
+    char *ext;
+    ExtendedMetaData *emd = NULL;
+    if (!includeExtMeta) {
+        blen = skipConflictResolution ? 28 : 24;
+        ext = new char[blen];
+        encodeWithMetaExt(ext, itemMeta);
+
+        if (skipConflictResolution) {
+            uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
+            flag = htonl(flag);
+            memcpy(ext + 24, (char*)&flag, sizeof(flag));
+        }
+    } else {
+        blen = 26;
+        ext = new char[blen];
+        encodeWithMetaExt(ext, itemMeta);
+        emd = new ExtendedMetaData(adjustedTime);
+        // nmeta added to ext below
     }
 
     protocol_binary_request_header *pkt;
-    pkt = createPacket(PROTOCOL_BINARY_CMD_ADD_WITH_META, vb, 0, ext, blen, key, keylen,
-                       val, vallen);
+    if (emd) {
+        std::pair<const char*, uint16_t> meta = emd->getExtMeta();
+        uint16_t nmeta = htons(meta.second);
+        memcpy(ext + 24, (char*)&nmeta, sizeof(nmeta));
+        pkt = createPacket(PROTOCOL_BINARY_CMD_ADD_WITH_META, vb, 0, ext, blen, key,
+                           keylen, val, vallen, datatype, meta.first, meta.second);
+        delete emd;
+    } else {
+        pkt = createPacket(PROTOCOL_BINARY_CMD_ADD_WITH_META, vb, 0, ext, blen, key,
+                           keylen, val, vallen, datatype);
+    }
     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
           "Expected to be able to store with meta");
     delete[] ext;
@@ -243,26 +314,51 @@ void createCheckpoint(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 
 ENGINE_ERROR_CODE del(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                       uint64_t cas, uint16_t vbucket, const void* cookie) {
-    return h1->remove(h, cookie, key, strlen(key), &cas, vbucket);
+    mutation_descr_t mut_info;
+    return h1->remove(h, cookie, key, strlen(key), &cas, vbucket, &mut_info);
 }
 
 void del_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                    const size_t keylen, const uint32_t vb,
                    ItemMetaData *itemMeta, uint64_t cas_for_delete,
-                   bool skipConflictResolution) {
-    int blen = skipConflictResolution ? 28 : 24;
-    char *ext = new char[blen];
-    encodeWithMetaExt(ext, itemMeta);
-
-    if (skipConflictResolution) {
-        uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
-        flag = htonl(flag);
-        memcpy(ext + 24, (char*)&flag, sizeof(flag));
+                   bool skipConflictResolution, bool includeExtMeta,
+                   int64_t adjustedTime, uint8_t conflictResMode,
+                   const void *cookie) {
+    int blen = 0;
+    char *ext;
+    ExtendedMetaData *emd = NULL;
+    if (!includeExtMeta) {
+        blen = skipConflictResolution ? 28 : 24;
+        ext = new char[blen];
+        encodeWithMetaExt(ext, itemMeta);
+
+        if (skipConflictResolution) {
+            uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
+            flag = htonl(flag);
+            memcpy(ext + 24, (char*)&flag, sizeof(flag));
+        }
+    } else {
+        blen = 26;
+        ext = new char[blen];
+        encodeWithMetaExt(ext, itemMeta);
+        emd = new ExtendedMetaData(adjustedTime, conflictResMode);
+        // nmeta added to ext below
     }
+
     protocol_binary_request_header *pkt;
-    pkt = createPacket(PROTOCOL_BINARY_CMD_DEL_WITH_META, vb, cas_for_delete, ext, blen, key,
-                       keylen);
-    check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
+    if (emd) {
+        std::pair<const char*, uint16_t> meta = emd->getExtMeta();
+        uint16_t nmeta = htons(meta.second);
+        memcpy(ext + 24, (char*)&nmeta, sizeof(nmeta));
+        pkt = createPacket(PROTOCOL_BINARY_CMD_DEL_WITH_META, vb, cas_for_delete,
+                           ext, blen, key, keylen, NULL, 0, 0x00, meta.first,
+                           meta.second);
+        delete emd;
+    } else {
+        pkt = createPacket(PROTOCOL_BINARY_CMD_DEL_WITH_META, vb, cas_for_delete,
+                           ext, blen, key, keylen, NULL, 0, 0x00);
+    }
+    check(h1->unknown_command(h, cookie, pkt, add_response_set_del_meta) == ENGINE_SUCCESS,
           "Expected to be able to delete with meta");
     delete[] ext;
 }
@@ -394,9 +490,18 @@ void getl(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
     free(request);
 }
 
-bool get_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key) {
-    protocol_binary_request_header *req = createPacket(PROTOCOL_BINARY_CMD_GET_META, 0, 0, NULL,
-                                                       0, key, strlen(key));
+bool get_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
+              bool reqExtMeta) {
+
+    protocol_binary_request_header *req;
+    if (reqExtMeta) {
+        uint8_t ext = 0x01;
+        req = createPacket(PROTOCOL_BINARY_CMD_GET_META, 0, 0,
+                           (char*)&ext, sizeof(ext), key, strlen(key));
+    } else {
+        req = createPacket(PROTOCOL_BINARY_CMD_GET_META, 0, 0,
+                           NULL, 0, key, strlen(key));
+    }
 
     ENGINE_ERROR_CODE ret = h1->unknown_command(h, NULL, req,
                                                 add_response_get_meta);
@@ -427,6 +532,20 @@ void observe(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
     free(request);
 }
 
+void observe_seqno(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+                   uint16_t vb_id, uint64_t uuid) {
+    protocol_binary_request_header *request;
+    uint64_t vb_uuid = htonll(uuid);
+    std::stringstream data;
+    data.write((char *) &vb_uuid, sizeof(uint64_t));
+
+    request = createPacket(PROTOCOL_BINARY_CMD_OBSERVE_SEQNO, vb_id, 0, NULL, 0,
+                           NULL, 0, data.str().data(), data.str().length());
+    check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
+          "Observe_seqno call failed");
+    free(request);
+}
+
 void get_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
                  uint16_t vbid) {
     protocol_binary_request_header *pkt;
@@ -489,25 +608,99 @@ bool set_vbucket_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
     return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
 }
 
+bool get_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+                       vbucket_state_t state, const void *cookie) {
+    protocol_binary_request_header *pkt;
+    if (state) {
+        char ext[sizeof(vbucket_state_t)];
+        encodeExt(ext, static_cast<uint32_t>(state));
+        pkt = createPacket(PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS, 0, 0, ext,
+                           sizeof(vbucket_state_t));
+    } else {
+        pkt = createPacket(PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS);
+    }
+
+    check(h1->unknown_command(h, cookie, pkt, add_response) ==
+          ENGINE_SUCCESS, "Error in getting all vb info");
+
+    free(pkt);
+    return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+void verify_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+                          uint16_t vb_start, uint16_t vb_end) {
+    const int per_vb_resp_size = sizeof(uint16_t) + sizeof(uint64_t);
+    const int high_seqno_offset = sizeof(uint16_t);
+
+    std::string seqno_body = last_body;
+
+    /* Check if the total response length is as expected. We expect 10 bytes
+       (2 for vb_id + 8 for seqno) */
+    checkeq((size_t)((vb_end - vb_start + 1) * per_vb_resp_size),
+            last_body.size(), "Failed to get all vb info.");
+    /* Check if the contents are correct */
+    for (int i = 0; i < (vb_end - vb_start + 1); i++) {
+        /* Check for correct vb_id */
+        checkeq(static_cast<const uint16_t>(vb_start + i),
+                ntohs(*(reinterpret_cast<const uint16_t*>(seqno_body.data() +
+                                                          per_vb_resp_size * i))),
+                "vb_id mismatch");
+        /* Check for correct high_seqno */
+        std::string vb_stat_seqno("vb_" + std::to_string(vb_start + i) +
+                                  ":high_seqno");
+        uint64_t high_seqno_vb = get_ull_stat(h, h1, vb_stat_seqno.c_str(),
+                                              "vbucket-seqno");
+        checkeq(high_seqno_vb,
+                ntohll(*(reinterpret_cast<const uint64_t*>(seqno_body.data() +
+                                                           per_vb_resp_size * i +
+                                                           high_seqno_offset))),
+              "high_seqno mismatch");
+    }
+}
+
 void set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                    const size_t keylen, const char *val, const size_t vallen,
                    const uint32_t vb, ItemMetaData *itemMeta,
                    uint64_t cas_for_set, bool skipConflictResolution,
-                   uint8_t datatype) {
-    int blen = skipConflictResolution ? 28 : 24;
-    char *ext = new char[blen];
-    encodeWithMetaExt(ext, itemMeta);
-
-    if (skipConflictResolution) {
-        uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
-        flag = htonl(flag);
-        memcpy(ext + 24, (char*)&flag, sizeof(flag));
+                   uint8_t datatype, bool includeExtMeta,
+                   int64_t adjustedTime, uint8_t conflictResMode,
+                   const void *cookie) {
+    int blen = 0;
+    char *ext;
+    ExtendedMetaData *emd = NULL;
+    if (!includeExtMeta) {
+        blen = skipConflictResolution ? 28 : 24;
+        ext = new char[blen];
+        encodeWithMetaExt(ext, itemMeta);
+
+        if (skipConflictResolution) {
+            uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
+            flag = htonl(flag);
+            memcpy(ext + 24, (char*)&flag, sizeof(flag));
+        }
+    } else {
+        blen = 26;
+        ext = new char[blen];
+        encodeWithMetaExt(ext, itemMeta);
+        emd = new ExtendedMetaData(adjustedTime, conflictResMode);
+        // nmeta added to ext below
     }
 
     protocol_binary_request_header *pkt;
-    pkt = createPacket(PROTOCOL_BINARY_CMD_SET_WITH_META, vb, cas_for_set, ext, blen, key, keylen,
-                       val, vallen, datatype);
-    check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
+    if (emd) {
+        std::pair<const char*, uint16_t> meta = emd->getExtMeta();
+        uint16_t nmeta = htons(meta.second);
+        memcpy(ext + 24, (char*)&nmeta, sizeof(nmeta));
+        pkt = createPacket(PROTOCOL_BINARY_CMD_SET_WITH_META, vb, cas_for_set, ext,
+                           blen, key, keylen, val, vallen, datatype, meta.first,
+                           meta.second);
+        delete emd;
+    } else {
+        pkt = createPacket(PROTOCOL_BINARY_CMD_SET_WITH_META, vb, cas_for_set, ext,
+                           blen, key, keylen, val, vallen, datatype);
+    }
+
+    check(h1->unknown_command(h, cookie, pkt, add_response_set_del_meta) == ENGINE_SUCCESS,
           "Expected to be able to store with meta");
     delete[] ext;
 }
@@ -515,7 +708,8 @@ void set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
 void return_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                  const size_t keylen, const char *val, const size_t vallen,
                  const uint32_t vb, const uint64_t cas, const uint32_t flags,
-                 const uint32_t exp, const uint32_t type, uint8_t datatype) {
+                 const uint32_t exp, const uint32_t type, uint8_t datatype,
+                 const void *cookie) {
     char ext[12];
     encodeExt(ext, type);
     encodeExt(ext + 4, flags);
@@ -523,7 +717,7 @@ void return_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
     protocol_binary_request_header *pkt;
     pkt = createPacket(PROTOCOL_BINARY_CMD_RETURN_META, vb, cas, ext, 12, key, keylen, val,
                        vallen, datatype);
-    check(h1->unknown_command(h, NULL, pkt, add_response_ret_meta)
+    check(h1->unknown_command(h, cookie, pkt, add_response_ret_meta)
               == ENGINE_SUCCESS, "Expected to be able to store ret meta");
     free(pkt);
 }
@@ -531,23 +725,24 @@ void return_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
 void set_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                   const size_t keylen, const char *val, const size_t vallen,
                   const uint32_t vb, const uint64_t cas, const uint32_t flags,
-                  const uint32_t exp, uint8_t datatype) {
+                  const uint32_t exp, uint8_t datatype, const void *cookie) {
     return_meta(h, h1, key, keylen, val, vallen, vb, cas, flags, exp,
-                SET_RET_META, datatype);
+                SET_RET_META, datatype, cookie);
 }
 
 void add_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                   const size_t keylen, const char *val, const size_t vallen,
                   const uint32_t vb, const uint64_t cas, const uint32_t flags,
-                  const uint32_t exp, uint8_t datatype) {
+                  const uint32_t exp, uint8_t datatype, const void *cookie) {
     return_meta(h, h1, key, keylen, val, vallen, vb, cas, flags, exp,
-                ADD_RET_META, datatype);
+                ADD_RET_META, datatype, cookie);
 }
 
 void del_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
-                  const size_t keylen, const uint32_t vb, const uint64_t cas) {
+                  const size_t keylen, const uint32_t vb, const uint64_t cas,
+                  const void *cookie) {
     return_meta(h, h1, key, keylen, NULL, 0, vb, cas, 0, 0,
-                DEL_RET_META);
+                DEL_RET_META, 0x00, cookie);
 }
 
 void disable_traffic(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
@@ -615,7 +810,9 @@ ENGINE_ERROR_CODE storeCasVb11(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
                                         key, strlen(key),
                                         vlen, flags, exp,
                                         datatype);
-    check(rv == ENGINE_SUCCESS, "Allocation failed.");
+    if (rv != ENGINE_SUCCESS) {
+        return rv;
+    }
 
     item_info info;
     info.nvalue = 1;
@@ -774,6 +971,15 @@ int get_int_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname,
     return atoi(s.c_str());
 }
 
+float get_float_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname,
+                     const char *statkey) {
+    vals.clear();
+    check(h1->get_stats(h, NULL, statkey, statkey == NULL ? 0 : strlen(statkey),
+                        add_stats) == ENGINE_SUCCESS, "Failed to get stats.");
+    std::string s = vals[statname];
+    return atof(s.c_str());
+}
+
 uint64_t get_ull_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname,
                       const char *statkey) {
     vals.clear();
@@ -847,17 +1053,36 @@ private:
 
 void wait_for_stat_change(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
                           const char *stat, int initial,
-                          const char *statkey) {
+                          const char *statkey, const time_t wait_time) {
     useconds_t sleepTime = 128;
+    WaitTimeAccumulator<int> accumulator("to change from", stat, statkey,
+                                         initial, wait_time);
     while (get_int_stat(h, h1, stat, statkey) == initial) {
+        accumulator.incrementAndAbortIfLimitReached(sleepTime);
         decayingSleep(&sleepTime);
     }
 }
 
 void wait_for_stat_to_be(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
-                         const char *stat, int final, const char* stat_key) {
+                         const char *stat, int final, const char* stat_key,
+                         const time_t wait_time) {
     useconds_t sleepTime = 128;
+    WaitTimeAccumulator<int> accumulator("to be", stat, stat_key, final,
+                                         wait_time);
     while (get_int_stat(h, h1, stat, stat_key) != final) {
+        accumulator.incrementAndAbortIfLimitReached(sleepTime);
+        decayingSleep(&sleepTime);
+    }
+}
+
+void wait_for_stat_to_be_gte(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+                             const char *stat, int final,
+                             const char* stat_key, const time_t wait_time) {
+    useconds_t sleepTime = 128;
+    WaitTimeAccumulator<int> accumulator("to be greater or equal than", stat,
+                                         stat_key, final, wait_time);
+    while (get_int_stat(h, h1, stat, stat_key) < final) {
+        accumulator.incrementAndAbortIfLimitReached(sleepTime);
         decayingSleep(&sleepTime);
     }
 }
@@ -878,17 +1103,23 @@ void wait_for_stat_to_be_lte(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
 
 void wait_for_str_stat_to_be(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
                              const char *stat, const char* final,
-                             const char* stat_key) {
+                             const char* stat_key, const time_t wait_time) {
     useconds_t sleepTime = 128;
+    WaitTimeAccumulator<const char*> accumulator("to be", stat, stat_key,
+                                                 final, wait_time);
     while (get_str_stat(h, h1, stat, stat_key).compare(final) != 0) {
+        accumulator.incrementAndAbortIfLimitReached(sleepTime);
         decayingSleep(&sleepTime);
     }
 }
 
 void wait_for_memory_usage_below(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
-                                 int mem_threshold) {
+                                 int mem_threshold, const time_t wait_time) {
     useconds_t sleepTime = 128;
+    WaitTimeAccumulator<int> accumulator("to be below", "mem_used", NULL,
+                                         mem_threshold, wait_time);
     while (get_int_stat(h, h1, "mem_used") > mem_threshold) {
+        accumulator.incrementAndAbortIfLimitReached(sleepTime);
         decayingSleep(&sleepTime);
     }
 }
@@ -911,7 +1142,6 @@ void wait_for_flusher_to_settle(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     while (get_int_stat(h, h1, "ep_queue_size") > 0) {
         decayingSleep(&sleepTime);
     }
-    wait_for_stat_change(h, h1, "ep_commit_num", 0);
 }
 
 void wait_for_persisted_value(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
@@ -930,57 +1160,6 @@ void wait_for_persisted_value(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
     h1->release(h, NULL, i);
 }
 
-bool get_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
-                       vbucket_state_t state, const void *cookie) {
-    protocol_binary_request_header *pkt;
-    if (state) {
-        char ext[sizeof(vbucket_state_t)];
-        encodeExt(ext, static_cast<uint32_t>(state));
-        pkt = createPacket(PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS, 0, 0, ext,
-                           sizeof(vbucket_state_t));
-    } else {
-        pkt = createPacket(PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS);
-    }
-
-    checkeq(ENGINE_SUCCESS, h1->unknown_command(h, cookie, pkt, add_response),
-            "Error in getting all vb info");
-
-    free(pkt);
-
-    return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
-}
-
-void verify_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
-                          uint16_t vb_start, uint16_t vb_end) {
-    const int per_vb_resp_size = sizeof(uint16_t) + sizeof(uint64_t);
-    const int high_seqno_offset = sizeof(uint16_t);
-
-    std::string seqno_body = last_body;
-
-    /* Check if the total response length is as expected. We expect 10 bytes
-       (2 for vb_id + 8 for seqno) */
-    checkeq((vb_end - vb_start + 1) * per_vb_resp_size,
-            static_cast<int>(seqno_body.size()), "Failed to get all vb info.");
-    /* Check if the contents are correct */
-    for (uint16_t i = 0; i < (vb_end - vb_start + 1); i++) {
-        /* Check for correct vb_id */
-        checkeq(static_cast<const uint16_t>(vb_start + i),
-                ntohs(*(reinterpret_cast<const uint16_t*>(seqno_body.data() +
-                                                          per_vb_resp_size*i))),
-                "vb_id mismatch");
-        /* Check for correct high_seqno */
-        std::stringstream vb_stat_seqno;
-        vb_stat_seqno << "vb_" << (vb_start + i) << ":high_seqno";
-        uint64_t high_seqno_vb =
-              get_ull_stat(h, h1, vb_stat_seqno.str().c_str(), "vbucket-seqno");
-        checkeq(high_seqno_vb,
-                ntohll(*(reinterpret_cast<const uint64_t*>(seqno_body.data() +
-                                                           per_vb_resp_size*i +
-                                                           high_seqno_offset))),
-                "high_seqno mismatch");
-    }
-}
-
 void dcp_step(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const void* cookie) {
     struct dcp_message_producers* producers = get_dcp_producers();
     ENGINE_ERROR_CODE err = h1->dcp.step(h, cookie, producers);
@@ -991,3 +1170,33 @@ void dcp_step(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const void* cookie) {
     }
     free(producers);
 }
+
+void set_degraded_mode(ENGINE_HANDLE *h,
+                       ENGINE_HANDLE_V1 *h1,
+                       const void* cookie,
+                       bool enable)
+{
+    protocol_binary_request_header *pkt;
+    if (enable) {
+        pkt = createPacket(PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC, 0, 0);
+    } else {
+        pkt = createPacket(PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC, 0, 0);
+    }
+
+    ENGINE_ERROR_CODE errcode = h1->unknown_command(h, NULL, pkt, add_response);
+    if (errcode != ENGINE_SUCCESS) {
+        std::cerr << "Failed to set degraded mode to " << enable
+                  << ". api call return engine code: " << errcode << std::endl;
+        cb_assert(false);
+    }
+
+    if (last_status != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
+        std::cerr << "Failed to set degraded mode to " << enable
+                  << ". protocol code: " << last_status << std::endl;
+        if (last_body.size() > 0) {
+            std::cerr << "\tBody: [" << last_body << "]" << std::endl;
+        }
+
+        cb_assert(false);
+    }
+}