Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / tests / ep_test_apis.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2012 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19 #include "ep_test_apis.h"
20
21 #include <memcached/util.h>
22 #include <platform/platform.h>
23 #include <stdlib.h>
24 #include <string.h>
25
26 #include <algorithm>
27 #include <iostream>
28 #include <sstream>
29
30 #include "mock/mock_dcp.h"
31
32 #define check(expr, msg) \
33     static_cast<void>((expr) ? 0 : abort_msg(#expr, msg, __LINE__))
34
35 std::map<std::string, std::string> vals;
36 bool dump_stats = false;
37 AtomicValue<protocol_binary_response_status> last_status(
38     static_cast<protocol_binary_response_status>(0));
39 std::string last_key;
40 std::string last_body;
41 bool last_deleted_flag(false);
42 AtomicValue<uint8_t> last_conflict_resolution_mode{0xff};
43 AtomicValue<uint64_t> last_cas(0);
44 AtomicValue<uint8_t> last_datatype(0x00);
45 ItemMetaData last_meta;
46 uint64_t last_uuid = 0;
47 uint64_t last_seqno = 0;
48
49 extern "C" bool add_response_get_meta(const void *key, uint16_t keylen,
50                                       const void *ext, uint8_t extlen,
51                                       const void *body, uint32_t bodylen,
52                                       uint8_t datatype, uint16_t status,
53                                       uint64_t cas, const void *cookie);
54 void encodeExt(char *buffer, uint32_t val);
55 void encodeWithMetaExt(char *buffer, ItemMetaData *meta);
56
57 void decayingSleep(useconds_t *sleepTime) {
58     static const useconds_t maxSleepTime = 500000;
59     usleep(*sleepTime);
60     *sleepTime = std::min(*sleepTime << 1, maxSleepTime);
61 }
62
63 ENGINE_ERROR_CODE vb_map_response(const void *cookie,
64                                   const void *map,
65                                   size_t mapsize) {
66     (void)cookie;
67     last_body.assign(static_cast<const char*>(map), mapsize);
68     return ENGINE_SUCCESS;
69 }
70
71 bool add_response(const void *key, uint16_t keylen, const void *ext,
72                   uint8_t extlen, const void *body, uint32_t bodylen,
73                   uint8_t datatype, uint16_t status, uint64_t cas,
74                   const void *cookie) {
75     (void)ext;
76     (void)extlen;
77     (void)cookie;
78     last_status = static_cast<protocol_binary_response_status>(status);
79     last_body.assign(static_cast<const char*>(body), bodylen);
80     last_key.assign(static_cast<const char*>(key), keylen);
81     last_cas = cas;
82     last_datatype = datatype;
83     return true;
84 }
85
86 bool add_response_get_meta(const void *key, uint16_t keylen, const void *ext,
87                            uint8_t extlen, const void *body, uint32_t bodylen,
88                            uint8_t datatype, uint16_t status, uint64_t cas,
89                            const void *cookie) {
90     (void)cookie;
91     const uint8_t* ext_bytes = reinterpret_cast<const uint8_t*> (ext);
92     if (ext && extlen > 0) {
93         uint32_t flags;
94         memcpy(&flags, ext_bytes, 4);
95         last_deleted_flag = ntohl(flags) & GET_META_ITEM_DELETED_FLAG;
96         memcpy(&last_meta.flags, ext_bytes + 4, 4);
97         memcpy(&last_meta.exptime, ext_bytes + 8, 4);
98         last_meta.exptime = ntohl(last_meta.exptime);
99         memcpy(&last_meta.revSeqno, ext_bytes + 12, 8);
100         last_meta.revSeqno = ntohll(last_meta.revSeqno);
101         last_meta.cas = cas;
102         if (extlen > 20) {
103             memcpy(&last_conflict_resolution_mode, ext_bytes + 20, 1);
104         }
105     }
106     return add_response(key, keylen, ext, extlen, body, bodylen, datatype,
107                         status, cas, cookie);
108 }
109
110 bool add_response_set_del_meta(const void *key, uint16_t keylen, const void *ext,
111                                uint8_t extlen, const void *body, uint32_t bodylen,
112                                uint8_t datatype, uint16_t status, uint64_t cas,
113                                const void *cookie) {
114     (void)cookie;
115     const uint8_t* ext_bytes = reinterpret_cast<const uint8_t*> (ext);
116     if (ext && extlen > 0) {
117         uint64_t vb_uuid;
118         uint64_t seqno;
119         memcpy(&vb_uuid, ext_bytes, 8);
120         memcpy(&seqno, ext_bytes + 8, 8);
121         last_uuid = ntohll(vb_uuid);
122         last_seqno = ntohll(seqno);
123     }
124
125     return add_response(key, keylen, ext, extlen, body, bodylen, datatype,
126                         status, cas, cookie);
127 }
128
129 bool add_response_ret_meta(const void *key, uint16_t keylen, const void *ext,
130                            uint8_t extlen, const void *body, uint32_t bodylen,
131                            uint8_t datatype, uint16_t status, uint64_t cas,
132                            const void *cookie) {
133     (void)cookie;
134     const uint8_t* ext_bytes = reinterpret_cast<const uint8_t*> (ext);
135     if (ext && extlen == 16) {
136         memcpy(&last_meta.flags, ext_bytes, 4);
137         memcpy(&last_meta.exptime, ext_bytes + 4, 4);
138         last_meta.exptime = ntohl(last_meta.exptime);
139         memcpy(&last_meta.revSeqno, ext_bytes + 8, 8);
140         last_meta.revSeqno = ntohll(last_meta.revSeqno);
141         last_meta.cas = cas;
142     }
143     return add_response(key, keylen, ext, extlen, body, bodylen, datatype,
144                         status, cas, cookie);
145 }
146
147 void add_stats(const char *key, const uint16_t klen, const char *val,
148                const uint32_t vlen, const void *cookie) {
149     (void)cookie;
150     std::string k(key, klen);
151     std::string v(val, vlen);
152
153     if (dump_stats) {
154         std::cout << "stat[" << k << "] = " << v << std::endl;
155     }
156
157     vals[k] = v;
158 }
159
160 void encodeExt(char *buffer, uint32_t val) {
161     val = htonl(val);
162     memcpy(buffer, (char*)&val, sizeof(val));
163 }
164
165 void encodeWithMetaExt(char *buffer, ItemMetaData *meta) {
166     uint32_t flags = meta->flags;
167     uint32_t exp = htonl(meta->exptime);
168     uint64_t seqno = htonll(meta->revSeqno);
169     uint64_t cas = htonll(meta->cas);
170
171     memcpy(buffer, (char*)&flags, sizeof(flags));
172     memcpy(buffer + 4, (char*)&exp, sizeof(exp));
173     memcpy(buffer + 8, (char*)&seqno, sizeof(seqno));
174     memcpy(buffer + 16, (char*)&cas, sizeof(cas));
175 }
176
177 protocol_binary_request_header* createPacket(uint8_t opcode,
178                                              uint16_t vbid,
179                                              uint64_t cas,
180                                              const char *ext,
181                                              uint8_t extlen,
182                                              const char *key,
183                                              uint32_t keylen,
184                                              const char *val,
185                                              uint32_t vallen,
186                                              uint8_t datatype,
187                                              const char *meta,
188                                              uint16_t nmeta) {
189     char *pkt_raw;
190     uint32_t headerlen = sizeof(protocol_binary_request_header);
191     pkt_raw = static_cast<char*>(calloc(1, headerlen + extlen + keylen + vallen + nmeta));
192     cb_assert(pkt_raw);
193     protocol_binary_request_header *req =
194         (protocol_binary_request_header*)pkt_raw;
195     req->request.opcode = opcode;
196     req->request.keylen = htons(keylen);
197     req->request.extlen = extlen;
198     req->request.vbucket = htons(vbid);
199     req->request.bodylen = htonl(keylen + vallen + extlen + nmeta);
200     req->request.cas = htonll(cas);
201     req->request.datatype = datatype;
202
203     if (extlen > 0) {
204         memcpy(pkt_raw + headerlen, ext, extlen);
205     }
206
207     if (keylen > 0) {
208         memcpy(pkt_raw + headerlen + extlen, key, keylen);
209     }
210
211     if (vallen > 0) {
212         memcpy(pkt_raw + headerlen + extlen + keylen, val, vallen);
213     }
214
215     // Extended meta: To be used for set_with_meta/del_with_meta/add_with_meta
216     if (meta && nmeta > 0) {
217         memcpy(pkt_raw + headerlen + extlen + keylen + vallen,
218                meta, nmeta);
219     }
220
221     return req;
222 }
223
224 void set_drift_counter_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
225                              int64_t initialDriftCount, uint8_t timeSync) {
226
227     protocol_binary_request_header *request;
228
229     int64_t driftCount = htonll(initialDriftCount);
230     uint8_t extlen = sizeof(driftCount) + sizeof(timeSync);
231     char *ext = new char[extlen];
232     memcpy(ext, (char*)&driftCount, sizeof(driftCount));
233     memcpy(ext + sizeof(driftCount), (char*)&timeSync, sizeof(timeSync));
234
235     request = createPacket(PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE,
236                            0, 0, ext, extlen);
237     h1->unknown_command(h, NULL, request, add_response);
238     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
239             "Expected success for CMD_SET_DRIFT_COUNTER_STATE");
240     delete[] ext;
241 }
242
243 void add_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
244                    const size_t keylen, const char *val, const size_t vallen,
245                    const uint32_t vb, ItemMetaData *itemMeta,
246                    bool skipConflictResolution, uint8_t datatype,
247                    bool includeExtMeta, int64_t adjustedTime) {
248     int blen = 0;
249     char *ext;
250     ExtendedMetaData *emd = NULL;
251     if (!includeExtMeta) {
252         blen = skipConflictResolution ? 28 : 24;
253         ext = new char[blen];
254         encodeWithMetaExt(ext, itemMeta);
255
256         if (skipConflictResolution) {
257             uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
258             flag = htonl(flag);
259             memcpy(ext + 24, (char*)&flag, sizeof(flag));
260         }
261     } else {
262         blen = 26;
263         ext = new char[blen];
264         encodeWithMetaExt(ext, itemMeta);
265         emd = new ExtendedMetaData(adjustedTime);
266         // nmeta added to ext below
267     }
268
269     protocol_binary_request_header *pkt;
270     if (emd) {
271         std::pair<const char*, uint16_t> meta = emd->getExtMeta();
272         uint16_t nmeta = htons(meta.second);
273         memcpy(ext + 24, (char*)&nmeta, sizeof(nmeta));
274         pkt = createPacket(PROTOCOL_BINARY_CMD_ADD_WITH_META, vb, 0, ext, blen, key,
275                            keylen, val, vallen, datatype, meta.first, meta.second);
276         delete emd;
277     } else {
278         pkt = createPacket(PROTOCOL_BINARY_CMD_ADD_WITH_META, vb, 0, ext, blen, key,
279                            keylen, val, vallen, datatype);
280     }
281     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
282           "Expected to be able to store with meta");
283     delete[] ext;
284 }
285
286 void changeVBFilter(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, std::string name,
287                     std::map<uint16_t, uint64_t> &filtermap) {
288     std::stringstream value;
289     uint16_t vbs = htons(filtermap.size());
290     std::map<uint16_t, uint64_t>::iterator it;
291
292     value.write((char*) &vbs, sizeof(uint16_t));
293     for (it = filtermap.begin(); it != filtermap.end(); ++it) {
294         uint16_t vb = htons(it->first);
295         uint64_t chkid = htonll(it->second);
296         value.write((char*) &vb, sizeof(uint16_t));
297         value.write((char*) &chkid, sizeof(uint64_t));
298     }
299
300     protocol_binary_request_header *request;
301     request = createPacket(PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER, 0, 0, NULL, 0, name.c_str(),
302                        name.length(), value.str().data(), value.str().length());
303     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
304           "Failed to change the TAP VB filter.");
305     free(request);
306 }
307
308 void createCheckpoint(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
309     protocol_binary_request_header *request = createPacket(PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT);
310     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
311           "Failed to create a new checkpoint.");
312     free(request);
313 }
314
315 ENGINE_ERROR_CODE del(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
316                       uint64_t cas, uint16_t vbucket, const void* cookie) {
317     mutation_descr_t mut_info;
318     return h1->remove(h, cookie, key, strlen(key), &cas, vbucket, &mut_info);
319 }
320
321 void del_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
322                    const size_t keylen, const uint32_t vb,
323                    ItemMetaData *itemMeta, uint64_t cas_for_delete,
324                    bool skipConflictResolution, bool includeExtMeta,
325                    int64_t adjustedTime, uint8_t conflictResMode,
326                    const void *cookie) {
327     int blen = 0;
328     char *ext;
329     ExtendedMetaData *emd = NULL;
330     if (!includeExtMeta) {
331         blen = skipConflictResolution ? 28 : 24;
332         ext = new char[blen];
333         encodeWithMetaExt(ext, itemMeta);
334
335         if (skipConflictResolution) {
336             uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
337             flag = htonl(flag);
338             memcpy(ext + 24, (char*)&flag, sizeof(flag));
339         }
340     } else {
341         blen = 26;
342         ext = new char[blen];
343         encodeWithMetaExt(ext, itemMeta);
344         emd = new ExtendedMetaData(adjustedTime, conflictResMode);
345         // nmeta added to ext below
346     }
347
348     protocol_binary_request_header *pkt;
349     if (emd) {
350         std::pair<const char*, uint16_t> meta = emd->getExtMeta();
351         uint16_t nmeta = htons(meta.second);
352         memcpy(ext + 24, (char*)&nmeta, sizeof(nmeta));
353         pkt = createPacket(PROTOCOL_BINARY_CMD_DEL_WITH_META, vb, cas_for_delete,
354                            ext, blen, key, keylen, NULL, 0, 0x00, meta.first,
355                            meta.second);
356         delete emd;
357     } else {
358         pkt = createPacket(PROTOCOL_BINARY_CMD_DEL_WITH_META, vb, cas_for_delete,
359                            ext, blen, key, keylen, NULL, 0, 0x00);
360     }
361     check(h1->unknown_command(h, cookie, pkt, add_response_set_del_meta) == ENGINE_SUCCESS,
362           "Expected to be able to delete with meta");
363     delete[] ext;
364 }
365
366 void evict_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
367                uint16_t vbucketId, const char *msg, bool expectError) {
368     int nonResidentItems = get_int_stat(h, h1, "ep_num_non_resident");
369     int numEjectedItems = get_int_stat(h, h1, "ep_num_value_ejects");
370     protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_EVICT_KEY, 0, 0,
371                                                        NULL, 0, key, strlen(key));
372     pkt->request.vbucket = htons(vbucketId);
373
374     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
375           "Failed to evict key.");
376
377     if (expectError) {
378         check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
379               "Expected exists when evicting key.");
380     } else {
381         if (last_body != "Already ejected.") {
382             nonResidentItems++;
383             numEjectedItems++;
384         }
385         check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
386               "Expected success evicting key.");
387     }
388
389     check(get_int_stat(h, h1, "ep_num_non_resident") == nonResidentItems,
390           "Incorrect number of non-resident items");
391     check(get_int_stat(h, h1, "ep_num_value_ejects") == numEjectedItems,
392           "Incorrect number of ejected items");
393
394     if (msg != NULL && last_body != msg) {
395         fprintf(stderr, "Expected evict to return ``%s'', but it returned ``%s''\n",
396                 msg, last_body.c_str());
397         abort();
398     }
399 }
400
401 size_t estimateVBucketMove(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
402                          uint16_t vbid, const char* tap_name) {
403     std::stringstream ss;
404     ss << "tap-vbtakeover " << vbid;
405     if (tap_name) {
406       ss << " " << tap_name;
407     }
408     return get_int_stat(h, h1, "estimate", ss.str().c_str());
409 }
410
411 ENGINE_ERROR_CODE checkpointPersistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
412                                         uint64_t checkpoint_id, uint16_t vb) {
413     checkpoint_id = htonll(checkpoint_id);
414     protocol_binary_request_header *request;
415     request = createPacket(PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE, vb, 0, NULL, 0, NULL, 0,
416                            (const char *)&checkpoint_id, sizeof(uint64_t));
417     ENGINE_ERROR_CODE rv = h1->unknown_command(h, NULL, request, add_response);
418     free(request);
419     return rv;
420 }
421
422 ENGINE_ERROR_CODE seqnoPersistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
423                                    uint16_t vbucket, uint64_t seqno) {
424     seqno = htonll(seqno);
425     char buffer[8];
426     memcpy(buffer, &seqno, sizeof(uint64_t));
427     protocol_binary_request_header* request =
428         createPacket(PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE, vbucket, 0, buffer, 8);
429
430     ENGINE_ERROR_CODE rv = h1->unknown_command(h, NULL, request, add_response);
431     free(request);
432     return rv;
433 }
434
435 void gat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
436          uint16_t vb, uint32_t exp, bool quiet) {
437     char ext[4];
438     uint8_t opcode = quiet ? PROTOCOL_BINARY_CMD_GATQ : PROTOCOL_BINARY_CMD_GAT;
439     uint32_t keylen = key ? strlen(key) : 0;
440     protocol_binary_request_header *request;
441     encodeExt(ext, exp);
442     request = createPacket(opcode, vb, 0, ext, 4, key, keylen);
443
444     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
445           "Failed to call gat");
446     free(request);
447 }
448
449 bool get_item_info(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, item_info *info,
450                    const char* key, uint16_t vb) {
451     item *i = NULL;
452     if (h1->get(h, NULL, &i, key, strlen(key), vb) != ENGINE_SUCCESS) {
453         return false;
454     }
455     info->nvalue = 1;
456     if (!h1->get_item_info(h, NULL, i, info)) {
457         h1->release(h, NULL, i);
458         fprintf(stderr, "get_item_info failed\n");
459         return false;
460     }
461
462     h1->release(h, NULL, i);
463     return true;
464 }
465
466 bool get_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, item *i,
467              std::string &key) {
468
469     item_info info;
470     info.nvalue = 1;
471     if (!h1->get_item_info(h, NULL, i, &info)) {
472         fprintf(stderr, "get_item_info failed\n");
473         return false;
474     }
475
476     key.assign((const char*)info.key, info.nkey);
477     return true;
478 }
479
480 void getl(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
481           uint16_t vb, uint32_t lock_timeout) {
482     char ext[4];
483     uint32_t keylen = key ? strlen(key) : 0;
484     protocol_binary_request_header *request;
485     encodeExt(ext, lock_timeout);
486     request = createPacket(PROTOCOL_BINARY_CMD_GET_LOCKED, vb, 0, ext, 4, key, keylen);
487
488     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
489           "Failed to call getl");
490     free(request);
491 }
492
493 bool get_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
494               bool reqExtMeta) {
495
496     protocol_binary_request_header *req;
497     if (reqExtMeta) {
498         uint8_t ext = 0x01;
499         req = createPacket(PROTOCOL_BINARY_CMD_GET_META, 0, 0,
500                            (char*)&ext, sizeof(ext), key, strlen(key));
501     } else {
502         req = createPacket(PROTOCOL_BINARY_CMD_GET_META, 0, 0,
503                            NULL, 0, key, strlen(key));
504     }
505
506     ENGINE_ERROR_CODE ret = h1->unknown_command(h, NULL, req,
507                                                 add_response_get_meta);
508     check(ret == ENGINE_SUCCESS, "Expected get_meta call to be successful");
509     if (last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
510         return true;
511     }
512     return false;
513 }
514
515 void observe(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
516              std::map<std::string, uint16_t> obskeys) {
517     std::stringstream value;
518     std::map<std::string, uint16_t>::iterator it;
519     for (it = obskeys.begin(); it != obskeys.end(); ++it) {
520         uint16_t vb = htons(it->second);
521         uint16_t keylen = htons(it->first.length());
522         value.write((char*) &vb, sizeof(uint16_t));
523         value.write((char*) &keylen, sizeof(uint16_t));
524         value.write(it->first.c_str(), it->first.length());
525     }
526
527     protocol_binary_request_header *request;
528     request = createPacket(PROTOCOL_BINARY_CMD_OBSERVE, 0, 0, NULL, 0, NULL, 0,
529                            value.str().data(), value.str().length());
530     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
531           "Observe call failed");
532     free(request);
533 }
534
535 void observe_seqno(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
536                    uint16_t vb_id, uint64_t uuid) {
537     protocol_binary_request_header *request;
538     uint64_t vb_uuid = htonll(uuid);
539     std::stringstream data;
540     data.write((char *) &vb_uuid, sizeof(uint64_t));
541
542     request = createPacket(PROTOCOL_BINARY_CMD_OBSERVE_SEQNO, vb_id, 0, NULL, 0,
543                            NULL, 0, data.str().data(), data.str().length());
544     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
545           "Observe_seqno call failed");
546     free(request);
547 }
548
549 void get_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
550                  uint16_t vbid) {
551     protocol_binary_request_header *pkt;
552     pkt = createPacket(PROTOCOL_BINARY_CMD_GET_REPLICA, vbid, 0, NULL, 0, key, strlen(key));
553     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
554                               "Get Replica Failed");
555     free(pkt);
556 }
557
558 protocol_binary_request_header* prepare_get_replica(ENGINE_HANDLE *h,
559                                                     ENGINE_HANDLE_V1 *h1,
560                                                     vbucket_state_t state,
561                                                     bool makeinvalidkey) {
562     uint16_t id = 0;
563     const char *key = "k0";
564     protocol_binary_request_header *pkt;
565     pkt = createPacket(PROTOCOL_BINARY_CMD_GET_REPLICA, id, 0, NULL, 0, key, strlen(key));
566
567     if (!makeinvalidkey) {
568         item *i = NULL;
569         check(store(h, h1, NULL, OPERATION_SET, key, "replicadata", &i, 0, id)
570               == ENGINE_SUCCESS, "Get Replica Failed");
571         h1->release(h, NULL, i);
572
573         check(set_vbucket_state(h, h1, id, state),
574               "Failed to set vbucket active state, Get Replica Failed");
575     }
576
577     return pkt;
578 }
579
580 bool set_param(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, protocol_binary_engine_param_t paramtype,
581                const char *param, const char *val) {
582     char ext[4];
583     protocol_binary_request_header *pkt;
584     encodeExt(ext, static_cast<uint32_t>(paramtype));
585     pkt = createPacket(PROTOCOL_BINARY_CMD_SET_PARAM, 0, 0, ext, sizeof(protocol_binary_engine_param_t), param,
586                        strlen(param), val, strlen(val));
587
588     if (h1->unknown_command(h, NULL, pkt, add_response) != ENGINE_SUCCESS) {
589         return false;
590     }
591
592     return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
593 }
594
595 bool set_vbucket_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
596                        uint16_t vb, vbucket_state_t state) {
597
598     char ext[4];
599     protocol_binary_request_header *pkt;
600     encodeExt(ext, static_cast<uint32_t>(state));
601     pkt = createPacket(PROTOCOL_BINARY_CMD_SET_VBUCKET, vb, 0, ext, 4);
602
603     if (h1->unknown_command(h, NULL, pkt, add_response) != ENGINE_SUCCESS) {
604         return false;
605     }
606
607     free(pkt);
608     return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
609 }
610
611 bool get_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
612                        vbucket_state_t state, const void *cookie) {
613     protocol_binary_request_header *pkt;
614     if (state) {
615         char ext[sizeof(vbucket_state_t)];
616         encodeExt(ext, static_cast<uint32_t>(state));
617         pkt = createPacket(PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS, 0, 0, ext,
618                            sizeof(vbucket_state_t));
619     } else {
620         pkt = createPacket(PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS);
621     }
622
623     check(h1->unknown_command(h, cookie, pkt, add_response) ==
624           ENGINE_SUCCESS, "Error in getting all vb info");
625
626     free(pkt);
627     return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
628 }
629
630 void verify_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
631                           uint16_t vb_start, uint16_t vb_end) {
632     const int per_vb_resp_size = sizeof(uint16_t) + sizeof(uint64_t);
633     const int high_seqno_offset = sizeof(uint16_t);
634
635     std::string seqno_body = last_body;
636
637     /* Check if the total response length is as expected. We expect 10 bytes
638        (2 for vb_id + 8 for seqno) */
639     checkeq((size_t)((vb_end - vb_start + 1) * per_vb_resp_size),
640             last_body.size(), "Failed to get all vb info.");
641     /* Check if the contents are correct */
642     for (int i = 0; i < (vb_end - vb_start + 1); i++) {
643         /* Check for correct vb_id */
644         checkeq(static_cast<const uint16_t>(vb_start + i),
645                 ntohs(*(reinterpret_cast<const uint16_t*>(seqno_body.data() +
646                                                           per_vb_resp_size * i))),
647                 "vb_id mismatch");
648         /* Check for correct high_seqno */
649         std::string vb_stat_seqno("vb_" + std::to_string(vb_start + i) +
650                                   ":high_seqno");
651         uint64_t high_seqno_vb = get_ull_stat(h, h1, vb_stat_seqno.c_str(),
652                                               "vbucket-seqno");
653         checkeq(high_seqno_vb,
654                 ntohll(*(reinterpret_cast<const uint64_t*>(seqno_body.data() +
655                                                            per_vb_resp_size * i +
656                                                            high_seqno_offset))),
657               "high_seqno mismatch");
658     }
659 }
660
661 void set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
662                    const size_t keylen, const char *val, const size_t vallen,
663                    const uint32_t vb, ItemMetaData *itemMeta,
664                    uint64_t cas_for_set, bool skipConflictResolution,
665                    uint8_t datatype, bool includeExtMeta,
666                    int64_t adjustedTime, uint8_t conflictResMode,
667                    const void *cookie) {
668     int blen = 0;
669     char *ext;
670     ExtendedMetaData *emd = NULL;
671     if (!includeExtMeta) {
672         blen = skipConflictResolution ? 28 : 24;
673         ext = new char[blen];
674         encodeWithMetaExt(ext, itemMeta);
675
676         if (skipConflictResolution) {
677             uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
678             flag = htonl(flag);
679             memcpy(ext + 24, (char*)&flag, sizeof(flag));
680         }
681     } else {
682         blen = 26;
683         ext = new char[blen];
684         encodeWithMetaExt(ext, itemMeta);
685         emd = new ExtendedMetaData(adjustedTime, conflictResMode);
686         // nmeta added to ext below
687     }
688
689     protocol_binary_request_header *pkt;
690     if (emd) {
691         std::pair<const char*, uint16_t> meta = emd->getExtMeta();
692         uint16_t nmeta = htons(meta.second);
693         memcpy(ext + 24, (char*)&nmeta, sizeof(nmeta));
694         pkt = createPacket(PROTOCOL_BINARY_CMD_SET_WITH_META, vb, cas_for_set, ext,
695                            blen, key, keylen, val, vallen, datatype, meta.first,
696                            meta.second);
697         delete emd;
698     } else {
699         pkt = createPacket(PROTOCOL_BINARY_CMD_SET_WITH_META, vb, cas_for_set, ext,
700                            blen, key, keylen, val, vallen, datatype);
701     }
702
703     check(h1->unknown_command(h, cookie, pkt, add_response_set_del_meta) == ENGINE_SUCCESS,
704           "Expected to be able to store with meta");
705     delete[] ext;
706 }
707
708 void return_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
709                  const size_t keylen, const char *val, const size_t vallen,
710                  const uint32_t vb, const uint64_t cas, const uint32_t flags,
711                  const uint32_t exp, const uint32_t type, uint8_t datatype,
712                  const void *cookie) {
713     char ext[12];
714     encodeExt(ext, type);
715     encodeExt(ext + 4, flags);
716     encodeExt(ext + 8, exp);
717     protocol_binary_request_header *pkt;
718     pkt = createPacket(PROTOCOL_BINARY_CMD_RETURN_META, vb, cas, ext, 12, key, keylen, val,
719                        vallen, datatype);
720     check(h1->unknown_command(h, cookie, pkt, add_response_ret_meta)
721               == ENGINE_SUCCESS, "Expected to be able to store ret meta");
722     free(pkt);
723 }
724
725 void set_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
726                   const size_t keylen, const char *val, const size_t vallen,
727                   const uint32_t vb, const uint64_t cas, const uint32_t flags,
728                   const uint32_t exp, uint8_t datatype, const void *cookie) {
729     return_meta(h, h1, key, keylen, val, vallen, vb, cas, flags, exp,
730                 SET_RET_META, datatype, cookie);
731 }
732
733 void add_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
734                   const size_t keylen, const char *val, const size_t vallen,
735                   const uint32_t vb, const uint64_t cas, const uint32_t flags,
736                   const uint32_t exp, uint8_t datatype, const void *cookie) {
737     return_meta(h, h1, key, keylen, val, vallen, vb, cas, flags, exp,
738                 ADD_RET_META, datatype, cookie);
739 }
740
741 void del_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
742                   const size_t keylen, const uint32_t vb, const uint64_t cas,
743                   const void *cookie) {
744     return_meta(h, h1, key, keylen, NULL, 0, vb, cas, 0, 0,
745                 DEL_RET_META, 0x00, cookie);
746 }
747
748 void disable_traffic(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
749     protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC);
750     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
751           "Failed to send data traffic command to the server");
752     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
753           "Failed to disable data traffic");
754     free(pkt);
755 }
756
757 void enable_traffic(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
758     protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC);
759     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
760           "Failed to send data traffic command to the server");
761     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
762           "Failed to enable data traffic");
763     free(pkt);
764 }
765
766 void start_persistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
767     protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_START_PERSISTENCE);
768     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
769           "Failed to stop persistence.");
770     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
771           "Error starting persistence.");
772     free(pkt);
773 }
774
775 void stop_persistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
776     useconds_t sleepTime = 128;
777     while (true) {
778         if (get_str_stat(h, h1, "ep_flusher_state", 0) == "running") {
779             break;
780         }
781         decayingSleep(&sleepTime);
782     }
783
784     protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_STOP_PERSISTENCE);
785     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
786           "Failed to stop persistence.");
787     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
788           "Error stopping persistence.");
789     free(pkt);
790 }
791
792 ENGINE_ERROR_CODE store(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
793                         const void *cookie, ENGINE_STORE_OPERATION op,
794                         const char *key, const char *value, item **outitem,
795                         uint64_t casIn, uint16_t vb, uint32_t exp,
796                         uint8_t datatype) {
797     return storeCasVb11(h, h1, cookie, op, key, value, strlen(value),
798                         9258, outitem, casIn, vb, exp, datatype);
799 }
800
801 ENGINE_ERROR_CODE storeCasVb11(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
802                                const void *cookie, ENGINE_STORE_OPERATION op,
803                                const char *key, const char *value, size_t vlen,
804                                uint32_t flags, item **outitem, uint64_t casIn,
805                                uint16_t vb, uint32_t exp, uint8_t datatype) {
806     item *it = NULL;
807     uint64_t cas = 0;
808
809     ENGINE_ERROR_CODE rv = h1->allocate(h, cookie, &it,
810                                         key, strlen(key),
811                                         vlen, flags, exp,
812                                         datatype);
813     if (rv != ENGINE_SUCCESS) {
814         return rv;
815     }
816
817     item_info info;
818     info.nvalue = 1;
819     if (!h1->get_item_info(h, cookie, it, &info)) {
820         abort();
821     }
822
823     cb_assert(info.value[0].iov_len == vlen);
824     memcpy(info.value[0].iov_base, value, vlen);
825     h1->item_set_cas(h, cookie, it, casIn);
826
827     rv = h1->store(h, cookie, it, &cas, op, vb);
828
829     if (outitem) {
830         *outitem = it;
831     } else {
832         h1->release(h, NULL, it);
833     }
834
835     return rv;
836 }
837
838 void touch(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
839            uint16_t vb, uint32_t exp) {
840     char ext[4];
841     uint32_t keylen = key ? strlen(key) : 0;
842     protocol_binary_request_header *request;
843     encodeExt(ext, exp);
844     request = createPacket(PROTOCOL_BINARY_CMD_TOUCH, vb, 0, ext, 4, key, keylen);
845
846     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
847           "Failed to call touch");
848     free(request);
849 }
850
851 void unl(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
852          uint16_t vb, uint64_t cas) {
853     uint32_t keylen = key ? strlen(key) : 0;
854     protocol_binary_request_header *request;
855     request = createPacket(PROTOCOL_BINARY_CMD_UNLOCK_KEY, vb, cas, NULL, 0, key, keylen);
856
857     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
858           "Failed to call unl");
859     free(request);
860 }
861
862 void compact_db(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
863                      const uint16_t vbucket_id,
864                      const uint64_t purge_before_ts,
865                      const uint64_t purge_before_seq,
866                      const uint8_t  drop_deletes) {
867     protocol_binary_request_compact_db req;
868     memset(&req, 0, sizeof(req));
869     req.message.body.purge_before_ts  = htonll(purge_before_ts);
870     req.message.body.purge_before_seq = htonll(purge_before_seq);
871     req.message.body.drop_deletes     = drop_deletes;
872
873     const char *args = (const char *)&(req.message.body);
874     uint32_t argslen = 24;
875
876     protocol_binary_request_header *pkt =
877         createPacket(PROTOCOL_BINARY_CMD_COMPACT_DB, vbucket_id, 0, args, argslen,  NULL, 0,
878                      NULL, 0);
879     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
880           "Failed to request compact vbucket");
881 }
882
883 void vbucketDelete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vb,
884                    const char* args) {
885     uint32_t argslen = args ? strlen(args) : 0;
886     protocol_binary_request_header *pkt =
887         createPacket(PROTOCOL_BINARY_CMD_DEL_VBUCKET, vb, 0, NULL, 0, NULL, 0,
888                      args, argslen);
889     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
890           "Failed to request delete bucket");
891 }
892
893 ENGINE_ERROR_CODE verify_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
894                              const char* key, uint16_t vbucket) {
895     item *i = NULL;
896     ENGINE_ERROR_CODE rv = h1->get(h, NULL, &i, key, strlen(key), vbucket);
897     if (rv == ENGINE_SUCCESS) {
898         h1->release(h, NULL, i);
899     }
900     return rv;
901 }
902
903 bool verify_vbucket_missing(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
904                             uint16_t vb) {
905     char vbid[8];
906     snprintf(vbid, sizeof(vbid), "vb_%d", vb);
907     std::string vbstr(vbid);
908
909     // Try up to three times to verify the bucket is missing.  Bucket
910     // state changes are async.
911     vals.clear();
912     check(h1->get_stats(h, NULL, NULL, 0, add_stats) == ENGINE_SUCCESS,
913           "Failed to get stats.");
914
915     if (vals.find(vbstr) == vals.end()) {
916         return true;
917     }
918
919     std::cerr << "Expected bucket missing, got " << vals[vbstr] << std::endl;
920
921     return false;
922 }
923
924 bool verify_vbucket_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vb,
925                           vbucket_state_t expected, bool mute) {
926     protocol_binary_request_header *pkt;
927     pkt = createPacket(PROTOCOL_BINARY_CMD_GET_VBUCKET, vb, 0);
928
929     ENGINE_ERROR_CODE errcode = h1->unknown_command(h, NULL, pkt, add_response);
930     if (errcode != ENGINE_SUCCESS) {
931         if (!mute) {
932             fprintf(stderr, "Error code when getting vbucket %d\n", errcode);
933         }
934         return false;
935     }
936
937     if (last_status != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
938         if (!mute) {
939             fprintf(stderr, "Last protocol status was %d (%s)\n",
940                     last_status.load(),
941                     last_body.size() > 0 ? last_body.c_str() : "unknown");
942         }
943         return false;
944     }
945
946     vbucket_state_t state;
947     memcpy(&state, last_body.data(), sizeof(state));
948     state = static_cast<vbucket_state_t>(ntohl(state));
949     return state == expected;
950 }
951
952 void sendDcpAck(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
953                 const void* cookie, protocol_binary_command opcode,
954                 protocol_binary_response_status status, uint32_t opaque) {
955     protocol_binary_response_header pkt;
956     pkt.response.magic = PROTOCOL_BINARY_RES;
957     pkt.response.opcode = opcode;
958     pkt.response.status = htons(status);
959     pkt.response.opaque = opaque;
960
961     check(h1->dcp.response_handler(h, cookie, &pkt) == ENGINE_SUCCESS,
962           "Expected success");
963 }
964
965 int get_int_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname,
966                  const char *statkey) {
967     vals.clear();
968     check(h1->get_stats(h, NULL, statkey, statkey == NULL ? 0 : strlen(statkey),
969                         add_stats) == ENGINE_SUCCESS, "Failed to get stats.");
970     std::string s = vals[statname];
971     return atoi(s.c_str());
972 }
973
974 float get_float_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname,
975                      const char *statkey) {
976     vals.clear();
977     check(h1->get_stats(h, NULL, statkey, statkey == NULL ? 0 : strlen(statkey),
978                         add_stats) == ENGINE_SUCCESS, "Failed to get stats.");
979     std::string s = vals[statname];
980     return atof(s.c_str());
981 }
982
983 uint64_t get_ull_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname,
984                       const char *statkey) {
985     vals.clear();
986     check(h1->get_stats(h, NULL, statkey, statkey == NULL ? 0 : strlen(statkey),
987                         add_stats) == ENGINE_SUCCESS, "Failed to get stats.");
988     std::string s = vals[statname];
989     return strtoull(s.c_str(), NULL, 10);
990 }
991
992 std::string get_str_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
993                          const char *statname, const char *statkey) {
994     vals.clear();
995     check(h1->get_stats(h, NULL, statkey, statkey == NULL ? 0 : strlen(statkey),
996                         add_stats) == ENGINE_SUCCESS, "Failed to get stats.");
997     std::string s = vals[statname];
998     return s;
999 }
1000
1001 void verify_curr_items(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, int exp,
1002                        const char *msg) {
1003     int curr_items = get_int_stat(h, h1, "curr_items");
1004     if (curr_items != exp) {
1005         std::cerr << "Expected "<< exp << " curr_items after " << msg
1006                   << ", got " << curr_items << std::endl;
1007         abort();
1008     }
1009 }
1010
1011 /** Helper class used when waiting on statistics to reach a certain value -
1012  * aggregates how long we have been waiting and aborts if the maximum wait time
1013  * is exceeded.
1014  */
1015 template <typename T>
1016 class WaitTimeAccumulator
1017 {
1018 public:
1019     WaitTimeAccumulator(const char* compare_name,
1020                         const char* stat_, const char* stat_key,
1021                         const T final_, const time_t wait_time_in_secs)
1022     : compareName(compare_name),
1023       stat(stat_),
1024       statKey(stat_key),
1025       final(final_),
1026       maxWaitTime(wait_time_in_secs * 1000 * 1000),
1027       totalSleepTime(0) {}
1028
1029     void incrementAndAbortIfLimitReached(const useconds_t sleep_time)
1030     {
1031         totalSleepTime += sleep_time;
1032         if (totalSleepTime >= maxWaitTime) {
1033             std::cerr << "Exceeded maximum wait time of " << maxWaitTime
1034                       << "s waiting for stat '" << stat;
1035             if (statKey != NULL) {
1036                 std::cerr << "(" << statKey << ")";
1037             }
1038             std::cerr << "' " << compareName << " " << final << " - aborting."
1039                       << std::endl;
1040             abort();
1041         }
1042     }
1043
1044 private:
1045     const char* compareName;
1046     const char* stat;
1047     const char* statKey;
1048     const T final;
1049     const useconds_t maxWaitTime;
1050     useconds_t totalSleepTime;
1051 };
1052
1053
1054 void wait_for_stat_change(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1055                           const char *stat, int initial,
1056                           const char *statkey, const time_t wait_time) {
1057     useconds_t sleepTime = 128;
1058     WaitTimeAccumulator<int> accumulator("to change from", stat, statkey,
1059                                          initial, wait_time);
1060     while (get_int_stat(h, h1, stat, statkey) == initial) {
1061         accumulator.incrementAndAbortIfLimitReached(sleepTime);
1062         decayingSleep(&sleepTime);
1063     }
1064 }
1065
1066 void wait_for_stat_to_be(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1067                          const char *stat, int final, const char* stat_key,
1068                          const time_t wait_time) {
1069     useconds_t sleepTime = 128;
1070     WaitTimeAccumulator<int> accumulator("to be", stat, stat_key, final,
1071                                          wait_time);
1072     while (get_int_stat(h, h1, stat, stat_key) != final) {
1073         accumulator.incrementAndAbortIfLimitReached(sleepTime);
1074         decayingSleep(&sleepTime);
1075     }
1076 }
1077
1078 void wait_for_stat_to_be_gte(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1079                              const char *stat, int final,
1080                              const char* stat_key, const time_t wait_time) {
1081     useconds_t sleepTime = 128;
1082     WaitTimeAccumulator<int> accumulator("to be greater or equal than", stat,
1083                                          stat_key, final, wait_time);
1084     while (get_int_stat(h, h1, stat, stat_key) < final) {
1085         accumulator.incrementAndAbortIfLimitReached(sleepTime);
1086         decayingSleep(&sleepTime);
1087     }
1088 }
1089
1090 void wait_for_stat_to_be_lte(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1091                              const char *stat, int final,
1092                              const char* stat_key,
1093                              const time_t max_wait_time_in_secs) {
1094     useconds_t sleepTime = 128;
1095     WaitTimeAccumulator<int> accumulator("to be less than or equal to", stat,
1096                                          stat_key, final,
1097                                          max_wait_time_in_secs);
1098     while (get_int_stat(h, h1, stat, stat_key) > final) {
1099         accumulator.incrementAndAbortIfLimitReached(sleepTime);
1100         decayingSleep(&sleepTime);
1101     }
1102 }
1103
1104 void wait_for_str_stat_to_be(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1105                              const char *stat, const char* final,
1106                              const char* stat_key, const time_t wait_time) {
1107     useconds_t sleepTime = 128;
1108     WaitTimeAccumulator<const char*> accumulator("to be", stat, stat_key,
1109                                                  final, wait_time);
1110     while (get_str_stat(h, h1, stat, stat_key).compare(final) != 0) {
1111         accumulator.incrementAndAbortIfLimitReached(sleepTime);
1112         decayingSleep(&sleepTime);
1113     }
1114 }
1115
1116 void wait_for_memory_usage_below(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1117                                  int mem_threshold, const time_t wait_time) {
1118     useconds_t sleepTime = 128;
1119     WaitTimeAccumulator<int> accumulator("to be below", "mem_used", NULL,
1120                                          mem_threshold, wait_time);
1121     while (get_int_stat(h, h1, "mem_used") > mem_threshold) {
1122         accumulator.incrementAndAbortIfLimitReached(sleepTime);
1123         decayingSleep(&sleepTime);
1124     }
1125 }
1126
1127 bool wait_for_warmup_complete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1128     useconds_t sleepTime = 128;
1129     while (h1->get_stats(h, NULL, "warmup", 6, add_stats) == ENGINE_SUCCESS) {
1130         std::string s = vals["ep_warmup_thread"];
1131         if (strcmp(s.c_str(), "complete") == 0) {
1132             break;
1133         }
1134         decayingSleep(&sleepTime);
1135         vals.clear();
1136     }
1137     return true;
1138 }
1139
1140 void wait_for_flusher_to_settle(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1141     useconds_t sleepTime = 128;
1142     while (get_int_stat(h, h1, "ep_queue_size") > 0) {
1143         decayingSleep(&sleepTime);
1144     }
1145 }
1146
1147 void wait_for_persisted_value(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1148                               const char *key, const char *val,
1149                               uint16_t vbucketId) {
1150
1151     item *i = NULL;
1152     int commitNum = get_int_stat(h, h1, "ep_commit_num");
1153     check(ENGINE_SUCCESS == store(h, h1, NULL, OPERATION_SET, key, val, &i, 0,
1154                                   vbucketId),
1155             "Failed to store an item.");
1156
1157     // Wait for persistence...
1158     wait_for_flusher_to_settle(h, h1);
1159     wait_for_stat_change(h, h1, "ep_commit_num", commitNum);
1160     h1->release(h, NULL, i);
1161 }
1162
1163 void dcp_step(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const void* cookie) {
1164     struct dcp_message_producers* producers = get_dcp_producers();
1165     ENGINE_ERROR_CODE err = h1->dcp.step(h, cookie, producers);
1166     check(err == ENGINE_SUCCESS || err == ENGINE_WANT_MORE,
1167             "Expected success or engine_want_more");
1168     if (err == ENGINE_SUCCESS) {
1169         clear_dcp_data();
1170     }
1171     free(producers);
1172 }
1173
1174 void set_degraded_mode(ENGINE_HANDLE *h,
1175                        ENGINE_HANDLE_V1 *h1,
1176                        const void* cookie,
1177                        bool enable)
1178 {
1179     protocol_binary_request_header *pkt;
1180     if (enable) {
1181         pkt = createPacket(PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC, 0, 0);
1182     } else {
1183         pkt = createPacket(PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC, 0, 0);
1184     }
1185
1186     ENGINE_ERROR_CODE errcode = h1->unknown_command(h, NULL, pkt, add_response);
1187     if (errcode != ENGINE_SUCCESS) {
1188         std::cerr << "Failed to set degraded mode to " << enable
1189                   << ". api call return engine code: " << errcode << std::endl;
1190         cb_assert(false);
1191     }
1192
1193     if (last_status != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
1194         std::cerr << "Failed to set degraded mode to " << enable
1195                   << ". protocol code: " << last_status << std::endl;
1196         if (last_body.size() > 0) {
1197             std::cerr << "\tBody: [" << last_body << "]" << std::endl;
1198         }
1199
1200         cb_assert(false);
1201     }
1202 }