6e44285f7d6fffff9b3a3c9a804e0bac60c0c53e
[ep-engine.git] / tests / ep_testsuite.cc
1 /* -*- MODE: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 // Usage: (to repeatedly run just a single test case)
19 // make engine_tests IS_LOOP=-L EP_TEST_NUM=3
20
21 #include "config.h"
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/stat.h>
27 #ifdef _MSC_VER
28 #include <direct.h>
29 #define mkdir(a, b) _mkdir(a)
30 #else
31 #include <sys/wait.h>
32 #endif
33
34 #include <cstdlib>
35 #include <iostream>
36 #include <map>
37 #include <set>
38 #include <sstream>
39 #include <string>
40 #include <vector>
41
42 #include <platform/dirutils.h>
43
44 #include "atomic.h"
45 #include "ep-engine/command_ids.h"
46 #include "ep_test_apis.h"
47 #include "ep_testsuite.h"
48 #include "locks.h"
49 #include <libcouchstore/couch_db.h>
50 #include "mock/mock_dcp.h"
51 #include "mutex.h"
52
53 #include <snappy-c.h>
54 #include <JSON_checker.h>
55
56 #ifdef linux
57 /* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
58  * optimize the conversion functions, but the prototypes generate warnings
59  * from gcc. The conversion methods isn't the bottleneck for my app, so
60  * just remove the warnings by undef'ing the optimization ..
61  */
62 #undef ntohs
63 #undef ntohl
64 #undef htons
65 #undef htonl
66 #endif
67
68 #undef THREAD_SANITIZER
69 #if __clang__
70 #   if defined(__has_feature) && __has_feature(thread_sanitizer)
71 #define THREAD_SANITIZER
72 #   endif
73 #endif
74
75 // ptr_fun don't like the extern "C" thing for unlock cookie.. cast it
76 // away ;)
77 typedef void (*UNLOCK_COOKIE_T)(const void *cookie);
78
79 extern "C" {
80
81 #define check(expr, msg) \
82     static_cast<void>((expr) ? 0 : abort_msg(#expr, msg, __LINE__))
83
84 #define WHITESPACE_DB "whitespace sucks.db"
85 #define MULTI_DISPATCHER_CONFIG \
86     "ht_size=129;ht_locks=3;chk_remover_stime=1;chk_period=60"
87
88 // Exists in platform.h upstream, local copy here to enable test backports
89 #ifdef WIN32
90 #define DIRECTORY_SEPARATOR_CHARACTER '\\'
91 #else
92 #define DIRECTORY_SEPARATOR_CHARACTER '/'
93 #endif
94
95 struct test_harness testHarness;
96
97 class ThreadData {
98 public:
99     ThreadData(ENGINE_HANDLE *eh, ENGINE_HANDLE_V1 *ehv1,
100                int e=0) : h(eh), h1(ehv1), extra(e) {}
101     ENGINE_HANDLE    *h;
102     ENGINE_HANDLE_V1 *h1;
103     int               extra;
104 };
105
106 bool abort_msg(const char *expr, const char *msg, int line) {
107     fprintf(stderr, "%s:%d Test failed: `%s' (%s)\n",
108             __FILE__, line, msg, expr);
109     abort();
110     // UNREACHABLE
111     return false;
112 }
113
114 static const char *dbname_env;
115 static enum test_result rmdb(void)
116 {
117     const char *files[] = { WHITESPACE_DB,
118                             "/tmp/test",
119                             "/tmp/mutation.log",
120                             dbname_env,
121                             NULL };
122     int ii = 0;
123     while (files[ii] != NULL) {
124         CouchbaseDirectoryUtilities::rmrf(files[ii]);
125         if (access(files[ii], F_OK) != -1) {
126             std::cerr << "Failed to remove: " << files[ii] << " " << std::endl;
127             return FAIL;
128         }
129         ++ii;
130     }
131
132     return SUCCESS;
133 }
134
135 static enum test_result skipped_test_function(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
136     (void) h;
137     (void) h1;
138     return SKIPPED;
139 }
140
141 static bool teardown(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
142     (void)h; (void)h1;
143     vals.clear();
144     return true;
145 }
146
147 static const void* createTapConn(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
148                                  const char *name) {
149     const void *cookie = testHarness.create_cookie();
150     testHarness.lock_cookie(cookie);
151     TAP_ITERATOR iter = h1->get_tap_iterator(h, cookie, name,
152                                              strlen(name),
153                                              TAP_CONNECT_FLAG_DUMP, NULL,
154                                              0);
155     check(iter != NULL, "Failed to create a tap iterator");
156     return cookie;
157 }
158
159 static void check_key_value(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
160                             const char* key, const char* val, size_t vlen,
161                             uint16_t vbucket = 0) {
162     item_info info;
163     check(get_item_info(h, h1, &info, key, vbucket), "checking key and value");
164     check(info.nvalue == 1, "info.nvalue != 1");
165     check(vlen == info.value[0].iov_len, "Value length mismatch");
166     check(memcmp(info.value[0].iov_base, val, vlen) == 0, "Data mismatch");
167 }
168
169 static bool test_setup(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
170     wait_for_warmup_complete(h, h1);
171
172     check(h1->get_stats(h, NULL, "prev-vbucket", 12, add_stats) == ENGINE_SUCCESS,
173           "Failed to get the previous state of vbuckets");
174     if (vals.find("vb_0") == vals.end()) {
175         check(set_vbucket_state(h, h1, 0, vbucket_state_active),
176               "Failed to set VB0 state.");
177     }
178
179     wait_for_stat_change(h, h1, "ep_vb_snapshot_total", 0);
180
181     // warmup is complete, notify ep engine that it must now enable
182     // data traffic
183     protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC);
184     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
185           "Failed to enable data traffic");
186     free(pkt);
187
188     return true;
189 }
190
191 static enum test_result test_getl(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
192     const char *key = "k1";
193     uint16_t vbucketId = 0;
194     uint32_t expiration = 25;
195
196     getl(h, h1, key, vbucketId, expiration);
197     check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
198           "expected the key to be missing...");
199     if (!last_body.empty() && last_body != "NOT_FOUND") {
200         fprintf(stderr, "Should have returned NOT_FOUND. Getl Failed");
201         abort();
202     }
203
204     item *i = NULL;
205     check(store(h, h1, NULL, OPERATION_SET, key, "{\"lock\":\"data\"}",
206                 &i, 0, vbucketId, 3600, PROTOCOL_BINARY_DATATYPE_JSON)
207           == ENGINE_SUCCESS, "Failed to store an item.");
208     h1->release(h, NULL, i);
209
210     /* retry getl, should succeed */
211     getl(h, h1, key, vbucketId, expiration);
212     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
213           "Expected to be able to getl on first try");
214     check(last_body == "{\"lock\":\"data\"}", "Body was malformed.");
215     check(last_datatype == PROTOCOL_BINARY_DATATYPE_JSON,
216             "Expected datatype to be JSON");
217
218     /* wait 16 seconds */
219     testHarness.time_travel(16);
220
221     /* lock's taken so this should fail */
222     getl(h, h1, key, vbucketId, expiration);
223     check(last_status == PROTOCOL_BINARY_RESPONSE_ETMPFAIL,
224           "Expected to fail getl on second try");
225
226     if (!last_body.empty() && last_body != "LOCK_ERROR") {
227         fprintf(stderr, "Should have returned LOCK_ERROR. Getl Failed");
228         abort();
229     }
230
231     check(store(h, h1, NULL, OPERATION_SET, key, "lockdata2", &i, 0, vbucketId)
232           != ENGINE_SUCCESS, "Should have failed to store an item.");
233     h1->release(h, NULL, i);
234
235     /* wait another 10 seconds */
236     testHarness.time_travel(10);
237
238     /* retry set, should succeed */
239     check(store(h, h1, NULL, OPERATION_SET, key, "lockdata", &i, 0, vbucketId)
240           == ENGINE_SUCCESS, "Failed to store an item.");
241     h1->release(h, NULL, i);
242
243     /* point to wrong vbucket, to test NOT_MY_VB response */
244     getl(h, h1, key, 10, expiration);
245     check(last_status == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET,
246           "Should have received not my vbucket response");
247
248     /* acquire lock, should succeed */
249     getl(h, h1, key, vbucketId, expiration);
250     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
251           "Aquire lock should have succeeded");
252     check(last_datatype == PROTOCOL_BINARY_RAW_BYTES,
253             "Expected datatype to be RAW BYTES");
254
255     /* try an incr operation followed by a delete, both of which should fail */
256     uint64_t cas = 0;
257     uint64_t result = 0;
258
259     check(h1->arithmetic(h, NULL, key, 2, true, false, 1, 1, 0,
260                          &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
261                          0)  == ENGINE_TMPFAIL, "Incr failed");
262
263
264     check(del(h, h1, key, 0, 0) == ENGINE_TMPFAIL, "Delete failed");
265
266
267     /* bug MB 2699 append after getl should fail with ENGINE_TMPFAIL */
268
269     testHarness.time_travel(26);
270
271     char binaryData1[] = "abcdefg\0gfedcba";
272     char binaryData2[] = "abzdefg\0gfedcba";
273
274     check(storeCasVb11(h, h1, NULL, OPERATION_SET, key,
275                        binaryData1, sizeof(binaryData1) - 1, 82758, &i, 0, 0)
276           == ENGINE_SUCCESS,
277           "Failed set.");
278     h1->release(h, NULL, i);
279
280     /* acquire lock, should succeed */
281     getl(h, h1, key, vbucketId, expiration);
282     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
283           "Aquire lock should have succeeded");
284
285     /* append should fail */
286     check(storeCasVb11(h, h1, NULL, OPERATION_APPEND, key,
287                        binaryData2, sizeof(binaryData2) - 1, 82758, &i, 0, 0)
288           == ENGINE_TMPFAIL,
289           "Append should fail.");
290     h1->release(h, NULL, i);
291
292     /* bug MB 3252 & MB 3354.
293      * 1. Set a key with an expiry value.
294      * 2. Take a lock on the item before it expires
295      * 3. Wait for the item to expire
296      * 4. Perform a CAS operation, should fail
297      * 5. Perform a set operation, should succeed
298      */
299     const char *ekey = "test_expiry";
300     const char *edata = "some test data here.";
301
302     item *it = NULL;
303
304     check(h1->allocate(h, NULL, &it, ekey, strlen(ekey), strlen(edata), 0, 2,
305           PROTOCOL_BINARY_RAW_BYTES) == ENGINE_SUCCESS, "Allocation Failed");
306
307     item_info info;
308     info.nvalue = 1;
309     if (!h1->get_item_info(h, NULL, it, &info)) {
310         abort();
311     }
312     memcpy(info.value[0].iov_base, edata, strlen(edata));
313
314     check(h1->store(h, NULL, it, &cas, OPERATION_SET, 0) ==
315         ENGINE_SUCCESS, "Failed to Store item");
316     check_key_value(h, h1, ekey, edata, strlen(edata));
317     h1->release(h, NULL, it);
318
319     testHarness.time_travel(3);
320     cas = last_cas;
321
322     /* cas should fail */
323     check(storeCasVb11(h, h1, NULL, OPERATION_CAS, ekey,
324                        binaryData1, sizeof(binaryData1) - 1, 82758, &i, cas, 0)
325           != ENGINE_SUCCESS,
326           "CAS succeeded.");
327     h1->release(h, NULL, i);
328
329     /* but a simple store should succeed */
330     check(store(h, h1, NULL, OPERATION_SET, ekey, edata, &i, 0, vbucketId)
331           == ENGINE_SUCCESS, "Failed to store an item.");
332     h1->release(h, NULL, i);
333     return SUCCESS;
334 }
335
336 static enum test_result test_unl(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
337
338     const char *key = "k2";
339     uint16_t vbucketId = 0;
340
341     unl(h, h1, key, vbucketId);
342     check(last_status != PROTOCOL_BINARY_RESPONSE_SUCCESS,
343           "expected the key to be missing...");
344
345     item *i = NULL;
346     check(store(h, h1, NULL, OPERATION_SET, key, "lockdata", &i, 0, vbucketId)
347           == ENGINE_SUCCESS, "Failed to store an item.");
348     h1->release(h, NULL, i);
349
350     /* getl, should succeed */
351     getl(h, h1, key, vbucketId, 0);
352     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
353           "Expected to be able to getl on first try");
354
355     /* save the returned cas value for later */
356     uint64_t cas = last_cas;
357
358     /* lock's taken unlocking with a random cas value should fail */
359     unl(h, h1, key, vbucketId);
360     check(last_status == PROTOCOL_BINARY_RESPONSE_ETMPFAIL,
361           "Expected to fail getl on second try");
362
363     if (!last_body.empty() && last_body != "UNLOCK_ERROR") {
364         fprintf(stderr, "Should have returned UNLOCK_ERROR. Unl Failed");
365         abort();
366     }
367
368     unl(h, h1, key, vbucketId, cas);
369     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
370           "Expected to succed unl with correct cas");
371
372     /* acquire lock, should succeed */
373     getl(h, h1, key, vbucketId, 0);
374
375     /* wait 16 seconds */
376     testHarness.time_travel(16);
377
378     /* lock has expired, unl should fail */
379     unl(h, h1, key, vbucketId, last_cas);
380     check(last_status == PROTOCOL_BINARY_RESPONSE_ETMPFAIL,
381           "Expected to fail unl on lock timeout");
382
383     return SUCCESS;
384 }
385
386 static enum test_result test_wrong_vb_mutation(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
387                                                ENGINE_STORE_OPERATION op) {
388     item *i = NULL;
389     int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
390     uint64_t cas = 11;
391     if (op == OPERATION_ADD) {
392         // Add operation with cas != 0 doesn't make sense
393         cas = 0;
394     }
395     check(store(h, h1, NULL, op,
396                 "key", "somevalue", &i, cas, 1) == ENGINE_NOT_MY_VBUCKET,
397         "Expected not_my_vbucket");
398     h1->release(h, NULL, i);
399     wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
400     return SUCCESS;
401 }
402
403 static enum test_result test_pending_vb_mutation(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
404                                                  ENGINE_STORE_OPERATION op) {
405     const void *cookie = testHarness.create_cookie();
406     testHarness.set_ewouldblock_handling(cookie, false);
407     item *i = NULL;
408     check(set_vbucket_state(h, h1, 1, vbucket_state_pending), "Failed to set vbucket state.");
409     check(verify_vbucket_state(h, h1, 1, vbucket_state_pending), "Bucket state was not set to pending.");
410     uint64_t cas = 11;
411     if (op == OPERATION_ADD) {
412         // Add operation with cas != 0 doesn't make sense..
413         cas = 0;
414     }
415     check(store(h, h1, cookie, op,
416                 "key", "somevalue", &i, cas, 1) == ENGINE_EWOULDBLOCK,
417         "Expected woodblock");
418     h1->release(h, NULL, i);
419     testHarness.destroy_cookie(cookie);
420     return SUCCESS;
421 }
422
423 static enum test_result test_replica_vb_mutation(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
424                                                  ENGINE_STORE_OPERATION op) {
425     item *i = NULL;
426     check(set_vbucket_state(h, h1, 1, vbucket_state_replica), "Failed to set vbucket state.");
427     check(verify_vbucket_state(h, h1, 1, vbucket_state_replica), "Bucket state was not set to replica.");
428     int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
429
430     uint64_t cas = 11;
431     if (op == OPERATION_ADD) {
432         // performing add with a CAS != 0 doesn't make sense...
433         cas = 0;
434     }
435     check(store(h, h1, NULL, op,
436                 "key", "somevalue", &i, cas, 1) == ENGINE_NOT_MY_VBUCKET,
437         "Expected not my vbucket");
438     wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
439     h1->release(h, NULL, i);
440     return SUCCESS;
441 }
442
443 //
444 // ----------------------------------------------------------------------
445 // The actual tests are below.
446 // ----------------------------------------------------------------------
447 //
448
449 static enum test_result test_get_miss(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
450     check(verify_key(h, h1, "k") == ENGINE_KEY_ENOENT, "Expected miss.");
451     return SUCCESS;
452 }
453
454 static enum test_result test_set(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
455     item *i = NULL;
456     const int num_sets = 5, num_keys = 4;
457
458     std::string key_arr[num_keys] = { "dummy_key",
459                                       "checkpoint_start",
460                                       "checkpoint_end",
461                                       "key" };
462
463
464     for (int k = 0; k < num_keys; k++) {
465         for (int j = 0; j < num_sets; j++) {
466             std::string err_string("Error setting " + key_arr[k]);
467             checkeq(ENGINE_SUCCESS,
468                     store(h, h1, NULL, OPERATION_SET, key_arr[k].c_str(),
469                           "somevalue", &i),
470                     err_string.c_str());
471             h1->release(h, NULL, i);
472         }
473     }
474
475     wait_for_flusher_to_settle(h, h1);
476     checkeq(num_keys, get_int_stat(h, h1, "ep_total_persisted"),
477             "Expected ep_total_persisted equals 4");
478     return SUCCESS;
479 }
480
481 struct handle_pair {
482     ENGINE_HANDLE *h;
483     ENGINE_HANDLE_V1 *h1;
484 };
485
486 extern "C" {
487     static void conc_del_set_thread(void *arg) {
488         struct handle_pair *hp = static_cast<handle_pair *>(arg);
489         item *it = NULL;
490
491         for (int i = 0; i < 5000; ++i) {
492             store(hp->h, hp->h1, NULL, OPERATION_ADD,
493                   "key", "somevalue", &it);
494             hp->h1->release(hp->h, NULL, it);
495             usleep(10);
496             checkeq(ENGINE_SUCCESS,
497                     store(hp->h, hp->h1, NULL, OPERATION_SET,
498                           "key", "somevalue", &it),
499                     "Error setting.");
500             hp->h1->release(hp->h, NULL, it);
501             usleep(10);
502             // Ignoring the result here -- we're racing.
503             del(hp->h, hp->h1, "key", 0, 0);
504             usleep(10);
505         }
506     }
507
508     static void conc_incr_thread(void *arg) {
509         struct handle_pair *hp = static_cast<handle_pair *>(arg);
510         uint64_t cas = 0, result = 0;
511
512         for (int i = 0; i < 10; i++) {
513             check(hp->h1->arithmetic(hp->h, NULL, "key", 3, true, true, 1, 1, 0,
514                                      &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
515                                      0) == ENGINE_SUCCESS,
516                                      "Failed arithmetic operation");
517         }
518     }
519 }
520
521 static enum test_result test_conc_set(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
522
523     const int n_threads = 8;
524     cb_thread_t threads[n_threads];
525     struct handle_pair hp = {h, h1};
526
527     wait_for_persisted_value(h, h1, "key", "value1");
528
529     for (int i = 0; i < n_threads; i++) {
530         int r = cb_create_thread(&threads[i], conc_del_set_thread, &hp, 0);
531         cb_assert(r == 0);
532     }
533
534     for (int i = 0; i < n_threads; i++) {
535         int r = cb_join_thread(threads[i]);
536         cb_assert(r == 0);
537     }
538
539     wait_for_flusher_to_settle(h, h1);
540
541     testHarness.reload_engine(&h, &h1,
542                               testHarness.engine_path,
543                               testHarness.get_current_testcase()->cfg,
544                               true, false);
545     wait_for_warmup_complete(h, h1);
546
547     cb_assert(0 == get_int_stat(h, h1, "ep_warmed_dups"));
548
549     return SUCCESS;
550 }
551
552 static enum test_result test_conc_incr(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
553     const int n_threads = 10;
554     cb_thread_t threads[n_threads];
555     struct handle_pair hp = {h, h1};
556     item *i = NULL;
557     check(store(h, h1, NULL, OPERATION_SET, "key", "0", &i) == ENGINE_SUCCESS,
558           "store failure");
559     h1->release(h, NULL, i);
560
561     for (int i = 0; i < n_threads; i++) {
562         int r = cb_create_thread(&threads[i], conc_incr_thread, &hp, 0);
563         cb_assert(r == 0);
564     }
565
566     for (int i = 0; i < n_threads; i++) {
567         int r = cb_join_thread(threads[i]);
568         cb_assert(r == 0);
569     }
570
571     check_key_value(h, h1, "key", "100", 3);
572
573     return SUCCESS;
574 }
575
576
577 static enum test_result test_set_get_hit(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
578     item *i = NULL;
579     check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
580           "store failure");
581     check_key_value(h, h1, "key", "somevalue", 9);
582     h1->release(h, NULL, i);
583     return SUCCESS;
584 }
585
586 static enum test_result test_set_get_hit_bin(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
587     char binaryData[] = "abcdefg\0gfedcba";
588     cb_assert(sizeof(binaryData) != strlen(binaryData));
589
590     item *i = NULL;
591     check(ENGINE_SUCCESS ==
592           storeCasVb11(h, h1, NULL, OPERATION_SET, "key",
593                        binaryData, sizeof(binaryData), 82758, &i, 0, 0),
594           "Failed to set.");
595     h1->release(h, NULL, i);
596     check_key_value(h, h1, "key", binaryData, sizeof(binaryData));
597     return SUCCESS;
598 }
599
600 static enum test_result test_set_with_cas_non_existent(ENGINE_HANDLE *h,
601                                                        ENGINE_HANDLE_V1 *h1) {
602     const char *key = "test_expiry_flush";
603     item *i = NULL;
604
605     check(h1->allocate(h, NULL, &i, key, strlen(key), 10, 0, 0,
606           PROTOCOL_BINARY_RAW_BYTES) == ENGINE_SUCCESS, "Allocation failed.");
607
608     Item *it = reinterpret_cast<Item*>(i);
609     it->setCas(1234);
610
611     uint64_t cas = 0;
612     check(h1->store(h, NULL, i, &cas, OPERATION_SET, 0) == ENGINE_KEY_ENOENT,
613           "Expected not found");
614     h1->release(h, NULL, i);
615
616     return SUCCESS;
617 }
618
619 static enum test_result test_set_change_flags(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
620     item *i = NULL;
621     check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
622           "Failed to set.");
623     h1->release(h, NULL, i);
624
625     item_info info;
626     uint32_t flags = 828258;
627     check(get_item_info(h, h1, &info, "key"), "Failed to get value.");
628     cb_assert(info.flags != flags);
629
630     check(storeCasVb11(h, h1, NULL, OPERATION_SET, "key",
631                        "newvalue", strlen("newvalue"), flags, &i, 0, 0) == ENGINE_SUCCESS,
632           "Failed to set again.");
633     h1->release(h, NULL, i);
634
635     check(get_item_info(h, h1, &info, "key"), "Failed to get value.");
636
637     return info.flags == flags ? SUCCESS : FAIL;
638 }
639
640 static enum test_result test_cas(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
641     item *i = NULL;
642     check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
643           "Failed to do initial set.");
644     h1->release(h, NULL, i);
645     check(store(h, h1, NULL, OPERATION_CAS, "key", "failcas", &i) != ENGINE_SUCCESS,
646           "Failed to fail initial CAS.");
647     h1->release(h, NULL, i);
648     check_key_value(h, h1, "key", "somevalue", 9);
649
650     check(h1->get(h, NULL, &i, "key", 3, 0) == ENGINE_SUCCESS,
651           "Failed to get value.");
652
653     item_info info;
654     info.nvalue = 1;
655     check(h1->get_item_info(h, NULL, i, &info), "Failed to get item info.");
656     h1->release(h, NULL, i);
657
658     check(store(h, h1, NULL, OPERATION_CAS, "key", "winCas", &i,
659                 info.cas) == ENGINE_SUCCESS,
660           "Failed to store CAS");
661     h1->release(h, NULL, i);
662     check_key_value(h, h1, "key", "winCas", 6);
663
664     uint64_t cval = 99999;
665     check(store(h, h1, NULL, OPERATION_CAS, "non-existing", "winCas", &i,
666                 cval) == ENGINE_KEY_ENOENT,
667           "CAS for non-existing key returned the wrong error code");
668     h1->release(h, NULL, i);
669     return SUCCESS;
670 }
671
672 static enum test_result test_add(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
673     item *i = NULL;
674     check(store(h, h1, NULL, OPERATION_ADD,"key", "somevalue", &i) == ENGINE_SUCCESS,
675           "Failed to add value.");
676     h1->release(h, NULL, i);
677     check(store(h, h1, NULL, OPERATION_ADD,"key", "somevalue", &i) == ENGINE_NOT_STORED,
678           "Failed to fail to re-add value.");
679     h1->release(h, NULL, i);
680
681     // This aborts on failure.
682     check_key_value(h, h1, "key", "somevalue", 9);
683
684     // Expiration above was an hour, so let's go to The Future
685     testHarness.time_travel(3800);
686
687     check(store(h, h1, NULL, OPERATION_ADD,"key", "newvalue", &i) == ENGINE_SUCCESS,
688           "Failed to add value again.");
689     h1->release(h, NULL, i);
690     check_key_value(h, h1, "key", "newvalue", 8);
691     return SUCCESS;
692 }
693
694 static enum test_result test_add_add_with_cas(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
695     item *i = NULL;
696     check(store(h, h1, NULL, OPERATION_ADD, "key",
697                 "somevalue", &i) == ENGINE_SUCCESS,
698           "Failed set.");
699     check_key_value(h, h1, "key", "somevalue", 9);
700     item_info info;
701     info.nvalue = 1;
702     info.nvalue = 1;
703     check(h1->get_item_info(h, NULL, i, &info) == true,
704           "Should be able to get info");
705
706     item *i2 = NULL;
707     ENGINE_ERROR_CODE ret;
708     check((ret = store(h, h1, NULL, OPERATION_ADD, "key",
709                        "somevalue", &i2, info.cas)) == ENGINE_KEY_EEXISTS,
710           "Should not be able to add the key two times");
711
712     h1->release(h, NULL, i);
713     h1->release(h, NULL, i2);
714     return SUCCESS;
715 }
716
717 static enum test_result test_replace(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
718     item *i = NULL;
719     check(store(h, h1, NULL, OPERATION_REPLACE,"key", "somevalue", &i) != ENGINE_SUCCESS,
720           "Failed to fail to replace non-existing value.");
721     h1->release(h, NULL, i);
722     check(store(h, h1, NULL, OPERATION_SET,"key", "somevalue", &i) == ENGINE_SUCCESS,
723           "Failed to set value.");
724     h1->release(h, NULL, i);
725     check(store(h, h1, NULL, OPERATION_REPLACE,"key", "somevalue", &i) == ENGINE_SUCCESS,
726           "Failed to replace existing value.");
727     h1->release(h, NULL, i);
728     check_key_value(h, h1, "key", "somevalue", 9);
729     return SUCCESS;
730 }
731
732 static enum test_result test_replace_with_eviction(ENGINE_HANDLE *h,
733                                                    ENGINE_HANDLE_V1 *h1) {
734     item *i = NULL;
735     check(store(h, h1, NULL, OPERATION_SET,"key", "somevalue", &i) == ENGINE_SUCCESS,
736           "Failed to set value.");
737     h1->release(h, NULL, i);
738     wait_for_flusher_to_settle(h, h1);
739     evict_key(h, h1, "key");
740     int numBgFetched = get_int_stat(h, h1, "ep_bg_fetched");
741
742     check(store(h, h1, NULL, OPERATION_REPLACE,"key", "somevalue1", &i) == ENGINE_SUCCESS,
743           "Failed to replace existing value.");
744
745     check(h1->get_stats(h, NULL, NULL, 0, add_stats) == ENGINE_SUCCESS,
746           "Failed to get stats.");
747     std::string eviction_policy = vals.find("ep_item_eviction_policy")->second;
748     if (eviction_policy == "full_eviction") {
749         numBgFetched++;
750     }
751
752     check(get_int_stat(h, h1, "ep_bg_fetched") == numBgFetched,
753           "Bg fetched value didn't match");
754
755     h1->release(h, NULL, i);
756     check_key_value(h, h1, "key", "somevalue1", 10);
757     return SUCCESS;
758 }
759
760 static enum test_result test_incr_miss(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
761     uint64_t cas = 0, result = 0;
762     h1->arithmetic(h, NULL, "key", 3, true, false, 1, 0, 0,
763                    &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
764                    0);
765     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected to not find key");
766     return SUCCESS;
767 }
768
769 static enum test_result test_incr_default(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
770     uint64_t cas = 0, result = 0;
771     check(h1->arithmetic(h, NULL, "key", 3, true, true, 1, 1, 0,
772                          &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
773                          0) == ENGINE_SUCCESS,
774           "Failed first arith");
775     check(result == 1, "Failed result verification.");
776
777     check(h1->arithmetic(h, NULL, "key", 3, true, false, 1, 1, 0,
778                          &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
779                          0) == ENGINE_SUCCESS,
780           "Failed second arith.");
781     check(result == 2, "Failed second result verification.");
782
783     check(h1->arithmetic(h, NULL, "key", 3, true, true, 1, 1, 0,
784                          &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
785                          0) == ENGINE_SUCCESS,
786           "Failed third arith.");
787     check(result == 3, "Failed third result verification.");
788
789     check_key_value(h, h1, "key", "3", 1);
790     return SUCCESS;
791 }
792
793 static enum test_result test_append(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
794     item *i = NULL;
795
796     // MB-11332: append on non-existing key should return NOT_STORED
797     check(storeCasVb11(h, h1, NULL, OPERATION_APPEND, "key",
798                        "foo\r\n", 5, 82758, &i, 0, 0)
799           == ENGINE_NOT_STORED,
800           "MB-11332: Failed append.");
801
802     check(storeCasVb11(h, h1, NULL, OPERATION_SET, "key",
803                        "\r\n", 2, 82758, &i, 0, 0)
804           == ENGINE_SUCCESS,
805           "Failed set.");
806     h1->release(h, NULL, i);
807
808     check(storeCasVb11(h, h1, NULL, OPERATION_APPEND, "key",
809                        "foo\r\n", 5, 82758, &i, 0, 0)
810           == ENGINE_SUCCESS,
811           "Failed append.");
812     h1->release(h, NULL, i);
813
814     check_key_value(h, h1, "key", "\r\nfoo\r\n", 7);
815
816     char binaryData1[] = "abcdefg\0gfedcba\r\n";
817     char binaryData2[] = "abzdefg\0gfedcba\r\n";
818     size_t dataSize = 20*1024*1024;
819     char *bigBinaryData3 = new char[dataSize];
820     memset(bigBinaryData3, '\0', dataSize);
821
822     check(storeCasVb11(h, h1, NULL, OPERATION_SET, "key",
823                        binaryData1, sizeof(binaryData1) - 1, 82758, &i, 0, 0)
824           == ENGINE_SUCCESS,
825           "Failed set.");
826     h1->release(h, NULL, i);
827
828     check(storeCasVb11(h, h1, NULL, OPERATION_APPEND, "key",
829                        bigBinaryData3, dataSize, 82758, &i, 0, 0)
830           == ENGINE_E2BIG,
831           "Expected append failure.");
832     h1->release(h, NULL, i);
833     delete[] bigBinaryData3;
834
835     check(storeCasVb11(h, h1, NULL, OPERATION_APPEND, "key",
836                        binaryData2, sizeof(binaryData2) - 1, 82758, &i, 0, 0)
837           == ENGINE_SUCCESS,
838           "Failed append.");
839     h1->release(h, NULL, i);
840
841     std::string expected;
842     expected.append(binaryData1, sizeof(binaryData1) - 1);
843     expected.append(binaryData2, sizeof(binaryData2) - 1);
844
845     check_key_value(h, h1, "key", expected.data(), expected.length());
846     return SUCCESS;
847 }
848
849 static enum test_result test_prepend(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
850     item *i = NULL;
851
852     // MB-11332: prepend on non-existing key should return NOT_STORED
853     check(storeCasVb11(h, h1, NULL, OPERATION_PREPEND, "key",
854                        "foo\r\n", 5, 82758, &i, 0, 0)
855           == ENGINE_NOT_STORED,
856           "MB-11332: Failed prepend.");
857
858
859     check(storeCasVb11(h, h1, NULL, OPERATION_SET, "key",
860                        "\r\n", 2, 82758, &i, 0, 0)
861           == ENGINE_SUCCESS,
862           "Failed set.");
863     h1->release(h, NULL, i);
864
865     check(storeCasVb11(h, h1, NULL, OPERATION_PREPEND, "key",
866                        "foo\r\n", 5, 82758, &i, 0, 0)
867           == ENGINE_SUCCESS,
868           "Failed append.");
869     h1->release(h, NULL, i);
870
871     check_key_value(h, h1, "key", "foo\r\n\r\n", 7);
872
873     char binaryData1[] = "abcdefg\0gfedcba\r\n";
874     char binaryData2[] = "abzdefg\0gfedcba\r\n";
875     size_t dataSize = 20*1024*1024;
876     char *bigBinaryData3 = new char[dataSize];
877     memset(bigBinaryData3, '\0', dataSize);
878
879     check(storeCasVb11(h, h1, NULL, OPERATION_SET, "key",
880                        binaryData1, sizeof(binaryData1) - 1, 82758, &i, 0, 0)
881           == ENGINE_SUCCESS,
882           "Failed set.");
883     h1->release(h, NULL, i);
884
885     check(storeCasVb11(h, h1, NULL, OPERATION_PREPEND, "key",
886                        bigBinaryData3, dataSize, 82758, &i, 0, 0)
887           == ENGINE_E2BIG,
888           "Expected prepend failure.");
889     h1->release(h, NULL, i);
890     delete[] bigBinaryData3;
891
892     check(storeCasVb11(h, h1, NULL, OPERATION_PREPEND, "key",
893                        binaryData2, sizeof(binaryData2) - 1, 82758, &i, 0, 0)
894           == ENGINE_SUCCESS,
895           "Failed append.");
896     h1->release(h, NULL, i);
897
898     std::string expected;
899     expected.append(binaryData2, sizeof(binaryData2) - 1);
900     expected.append(binaryData1, sizeof(binaryData1) - 1);
901
902     check_key_value(h, h1, "key", expected.data(), expected.length());
903     return SUCCESS;
904 }
905
906 static enum test_result test_append_compressed(ENGINE_HANDLE *h,
907                                                ENGINE_HANDLE_V1 *h1) {
908
909     item *i = NULL;
910
911     size_t len = snappy_max_compressed_length(2);
912     char *newBuf = (char *) malloc (len);
913     snappy_compress("\r\n", 2, newBuf, &len);
914     check(storeCasVb11(h, h1, NULL, OPERATION_SET, "key1",
915                        (const char *)newBuf, len, 82758, &i, 0, 0, 3600,
916                        PROTOCOL_BINARY_DATATYPE_COMPRESSED)
917           == ENGINE_SUCCESS, "Failed set.");
918     h1->release(h, NULL, i);
919     free (newBuf);
920
921     check(storeCasVb11(h, h1, NULL, OPERATION_APPEND, "key1",
922                        "foo\r\n", 5, 82758, &i, 0, 0)
923           == ENGINE_SUCCESS,
924           "Failed append uncompressed to compressed.");
925     h1->release(h, NULL, i);
926
927     size_t len1 = snappy_max_compressed_length(7);
928     char *newBuf1 = (char *) malloc (len1);
929     snappy_compress("\r\nfoo\r\n", 7, newBuf1, &len1);
930
931     item_info info;
932     check(get_item_info(h, h1, &info, "key1", 0), "checking key and value");
933     check(info.nvalue == 1, "info.nvalue != 1");
934     check(len1 == info.value[0].iov_len, "Value length mismatch");
935     check(memcmp(info.value[0].iov_base, newBuf1, len1) == 0, "Data mismatch");
936     check(info.datatype == 0x02, "Datatype mismatch");
937     free (newBuf1);
938
939     len = snappy_max_compressed_length(3);
940     newBuf = (char *) malloc (len);
941     snappy_compress("bar", 3, newBuf, &len);
942     check(storeCasVb11(h, h1, NULL, OPERATION_APPEND, "key1",
943                        (const char*)newBuf, len, 82758, &i, 0, 0, 3600,
944                        PROTOCOL_BINARY_DATATYPE_COMPRESSED)
945             == ENGINE_SUCCESS,
946             "Failed append compressed to compressed.");
947     h1->release(h, NULL, i);
948     free (newBuf);
949
950     len1 = snappy_max_compressed_length(10);
951     newBuf1 = (char *) malloc (len1);
952     snappy_compress("\r\nfoo\r\nbar", 10, newBuf1, &len1);
953
954     check(get_item_info(h, h1, &info, "key1", 0), "checking key and value");
955     check(info.nvalue == 1, "info.nvalue != 1");
956     check(len1 == info.value[0].iov_len, "Value length mismatch");
957     check(memcmp(info.value[0].iov_base, newBuf1, len1) == 0, "Data mismatch");
958     check(info.datatype == 0x02, "Datatype mismatch");
959     free (newBuf1);
960
961     check(storeCasVb11(h, h1, NULL, OPERATION_SET, "key2",
962                        "foo", 3, 82758, &i, 0, 0, 3600,
963                        PROTOCOL_BINARY_RAW_BYTES)
964           == ENGINE_SUCCESS, "Failed set.");
965     h1->release(h, NULL, i);
966
967     len = snappy_max_compressed_length(3);
968     newBuf = (char *) malloc (len);
969     snappy_compress("bar", 3, newBuf, &len);
970     check(storeCasVb11(h, h1, NULL, OPERATION_APPEND, "key2",
971                        newBuf, len, 82758, &i, 0, 0, 3600,
972                        PROTOCOL_BINARY_DATATYPE_COMPRESSED)
973             == ENGINE_SUCCESS,
974             "Failed append compressed to uncompressed.");
975     h1->release(h, NULL, i);
976     free (newBuf);
977
978     check(get_item_info(h, h1, &info, "key2", 0), "checking key and value");
979     check(info.nvalue == 1, "info.nvalue != 1");
980     check(info.value[0].iov_len == 6, "Value length mismatch");
981     check(memcmp(info.value[0].iov_base, "foobar", 6) == 0, "Data mismatch");
982     check(info.datatype == 0x00, "Datatype mismatch");
983
984     return SUCCESS;
985 }
986
987 static enum test_result test_prepend_compressed(ENGINE_HANDLE *h,
988                                                ENGINE_HANDLE_V1 *h1) {
989
990     item *i = NULL;
991
992     size_t len = snappy_max_compressed_length(2);
993     char *newBuf = (char *) malloc (len);
994     snappy_compress("\r\n", 2, newBuf, &len);
995     check(storeCasVb11(h, h1, NULL, OPERATION_SET, "key1",
996                        (const char *)newBuf, len, 82758, &i, 0, 0, 3600,
997                        PROTOCOL_BINARY_DATATYPE_COMPRESSED)
998           == ENGINE_SUCCESS, "Failed set.");
999     h1->release(h, NULL, i);
1000     free (newBuf);
1001
1002     check(storeCasVb11(h, h1, NULL, OPERATION_PREPEND, "key1",
1003                        "foo\r\n", 5, 82758, &i, 0, 0)
1004           == ENGINE_SUCCESS,
1005           "Failed prepend uncompressed to compressed.");
1006     h1->release(h, NULL, i);
1007
1008     size_t len1 = snappy_max_compressed_length(7);
1009     char *newBuf1 = (char *) malloc (len1);
1010     snappy_compress("foo\r\n\r\n", 7, newBuf1, &len1);
1011
1012     item_info info;
1013     check(get_item_info(h, h1, &info, "key1", 0), "checking key and value");
1014     check(info.nvalue == 1, "info.nvalue != 1");
1015     check(len1 == info.value[0].iov_len, "Value length mismatch");
1016     check(memcmp(info.value[0].iov_base, newBuf1, len1) == 0, "Data mismatch");
1017     check(info.datatype == 0x02, "Datatype mismatch");
1018     free (newBuf1);
1019
1020     len = snappy_max_compressed_length(3);
1021     newBuf = (char *) malloc (len);
1022     snappy_compress("bar", 3, newBuf, &len);
1023     check(storeCasVb11(h, h1, NULL, OPERATION_PREPEND, "key1",
1024                        (const char*)newBuf, len, 82758, &i, 0, 0, 3600,
1025                        PROTOCOL_BINARY_DATATYPE_COMPRESSED)
1026             == ENGINE_SUCCESS,
1027             "Failed prepend compressed to compressed.");
1028     h1->release(h, NULL, i);
1029     free (newBuf);
1030
1031     len1 = snappy_max_compressed_length(10);
1032     newBuf1 = (char *) malloc (len1);
1033     snappy_compress("barfoo\r\n\r\n", 10, newBuf1, &len1);
1034
1035     check(get_item_info(h, h1, &info, "key1", 0), "checking key and value");
1036     check(info.nvalue == 1, "info.nvalue != 1");
1037     check(len1 == info.value[0].iov_len, "Value length mismatch");
1038     check(memcmp(info.value[0].iov_base, newBuf1, len1) == 0, "Data mismatch");
1039     check(info.datatype == 0x02, "Datatype mismatch");
1040     free (newBuf1);
1041
1042     check(storeCasVb11(h, h1, NULL, OPERATION_SET, "key2",
1043                        "foo", 3, 82758, &i, 0, 0, 3600,
1044                        PROTOCOL_BINARY_RAW_BYTES)
1045           == ENGINE_SUCCESS, "Failed set.");
1046     h1->release(h, NULL, i);
1047
1048     len = snappy_max_compressed_length(3);
1049     newBuf = (char *) malloc (len);
1050     snappy_compress("bar", 3, newBuf, &len);
1051     check(storeCasVb11(h, h1, NULL, OPERATION_PREPEND, "key2",
1052                        newBuf, len, 82758, &i, 0, 0, 3600,
1053                        PROTOCOL_BINARY_DATATYPE_COMPRESSED)
1054             == ENGINE_SUCCESS,
1055             "Failed prepend compressed to uncompressed.");
1056     h1->release(h, NULL, i);
1057     free (newBuf);
1058
1059     check(get_item_info(h, h1, &info, "key2", 0), "checking key and value");
1060     check(info.nvalue == 1, "info.nvalue != 1");
1061     check(info.value[0].iov_len == 6, "Value length mismatch");
1062     check(memcmp(info.value[0].iov_base, "barfoo", 6) == 0, "Data mismatch");
1063     check(info.datatype == 0x00, "Datatype mismatch");
1064
1065     return SUCCESS;
1066 }
1067
1068 static enum test_result test_append_prepend_to_json(ENGINE_HANDLE *h,
1069                                                     ENGINE_HANDLE_V1 *h1) {
1070     item *i = NULL;
1071     item_info info;
1072
1073     const char* key1 = "foo1";
1074     const char* key2 = "foo2";
1075     const char* value1 = "{\"foo1\":\"bar1\"}";
1076     const char* value2 = "{\"foo2\":\"bar2\"}";
1077
1078     // APPEND
1079     check(storeCasVb11(h, h1, NULL, OPERATION_SET, key1,
1080                        value1, strlen(value1), 82758, &i, 0, 0,
1081                        3600, PROTOCOL_BINARY_DATATYPE_JSON)
1082           == ENGINE_SUCCESS, "Failed set.");
1083     h1->release(h, NULL, i);
1084
1085     check(h1->get(h, NULL, &i, key1, strlen(key1), 0) == ENGINE_SUCCESS,
1086             "Unable to get stored item");
1087     info.nvalue = 1;
1088     h1->get_item_info(h, NULL, i, &info);
1089     check(checkUTF8JSON((const unsigned char*)info.value[0].iov_base,
1090                         (int)info.value[0].iov_len) == 1, "Expected JSON");
1091     check(info.datatype == PROTOCOL_BINARY_DATATYPE_JSON, "Invalid datatype");
1092     h1->release(h, NULL, i);
1093
1094     check(storeCasVb11(h, h1, NULL, OPERATION_APPEND, key1,
1095                        value2, strlen(value2), 82758, &i, 0, 0)
1096           == ENGINE_SUCCESS,
1097           "Failed append.");
1098     h1->release(h, NULL, i);
1099
1100     check(h1->get(h, NULL, &i, key1, strlen(key1), 0) == ENGINE_SUCCESS,
1101             "Unable to get stored item");
1102     info.nvalue = 1;
1103     h1->get_item_info(h, NULL, i, &info);
1104     check(checkUTF8JSON((const unsigned char*)info.value[0].iov_base,
1105                         (int)info.value[0].iov_len) == 0, "Expected Binary");
1106     check(info.datatype == PROTOCOL_BINARY_RAW_BYTES,
1107                 "Invalid datatype after append");
1108     h1->release(h, NULL, i);
1109
1110     // PREPEND
1111     check(storeCasVb11(h, h1, NULL, OPERATION_SET, key2,
1112                        value1, strlen(value1), 82758, &i, 0, 0,
1113                        3600, PROTOCOL_BINARY_DATATYPE_JSON)
1114           == ENGINE_SUCCESS, "Failed set.");
1115     h1->release(h, NULL, i);
1116
1117     check(h1->get(h, NULL, &i, key2, strlen(key2), 0) == ENGINE_SUCCESS,
1118             "Unable to get stored item");
1119     info.nvalue = 1;
1120     h1->get_item_info(h, NULL, i, &info);
1121     check(checkUTF8JSON((const unsigned char*)info.value[0].iov_base,
1122                         (int)info.value[0].iov_len) == 1, "Expected JSON");
1123     check(info.datatype == PROTOCOL_BINARY_DATATYPE_JSON, "Invalid datatype");
1124     h1->release(h, NULL, i);
1125
1126     check(storeCasVb11(h, h1, NULL, OPERATION_PREPEND, key2,
1127                        value2, strlen(value2), 82758, &i, 0, 0)
1128           == ENGINE_SUCCESS,
1129           "Failed prepend.");
1130     h1->release(h, NULL, i);
1131
1132     check(h1->get(h, NULL, &i, key2, strlen(key2), 0) == ENGINE_SUCCESS,
1133             "Unable to get stored item");
1134     info.nvalue = 1;
1135     h1->get_item_info(h, NULL, i, &info);
1136     check(checkUTF8JSON((const unsigned char*)info.value[0].iov_base,
1137                         (int)info.value[0].iov_len) == 0, "Expected Binary");
1138     check(info.datatype == PROTOCOL_BINARY_RAW_BYTES,
1139                 "Invalid datatype after prepend");
1140     h1->release(h, NULL, i);
1141
1142     return SUCCESS;
1143 }
1144
1145 static enum test_result test_incr(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1146     uint64_t cas = 0, result = 0;
1147     item *i = NULL;
1148     check(store(h, h1, NULL, OPERATION_ADD,"key", "1", &i) == ENGINE_SUCCESS,
1149           "Failed to add value.");
1150     h1->release(h, NULL, i);
1151
1152     check(h1->arithmetic(h, NULL, "key", 3, true, false, 1, 1, 0,
1153                          &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
1154                          0) == ENGINE_SUCCESS,
1155           "Failed to incr value.");
1156
1157     check_key_value(h, h1, "key", "2", 1);
1158     return SUCCESS;
1159 }
1160
1161 static enum test_result test_bug2799(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1162     uint64_t cas = 0, result = 0;
1163     item *i = NULL;
1164     check(store(h, h1, NULL, OPERATION_ADD, "key", "1", &i) == ENGINE_SUCCESS,
1165           "Failed to add value.");
1166     h1->release(h, NULL, i);
1167
1168     check(h1->arithmetic(h, NULL, "key", 3, true, false, 1, 1, 0,
1169                          &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
1170                          0) == ENGINE_SUCCESS,
1171           "Failed to incr value.");
1172
1173     check_key_value(h, h1, "key", "2", 1);
1174
1175     testHarness.time_travel(3617);
1176
1177     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
1178     return SUCCESS;
1179 }
1180
1181 static enum test_result test_flush(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1182     item *i = NULL;
1183     // First try to delete something we know to not be there.
1184     check(del(h, h1, "key", 0, 0) == ENGINE_KEY_ENOENT, "Failed to fail initial delete.");
1185     check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
1186           "Failed set.");
1187     h1->release(h, NULL, i);
1188     check_key_value(h, h1, "key", "somevalue", 9);
1189     check(h1->flush(h, NULL, 0) == ENGINE_SUCCESS,
1190           "Failed to flush");
1191     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
1192
1193     check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
1194           "Failed post-flush set.");
1195     h1->release(h, NULL, i);
1196     check_key_value(h, h1, "key", "somevalue", 9);
1197
1198     return SUCCESS;
1199 }
1200
1201 static enum test_result test_flush_disabled(ENGINE_HANDLE *h,
1202                                             ENGINE_HANDLE_V1 *h1) {
1203     item *i = NULL;
1204     // start an engine with disabled flush, the flush() should be noop and
1205     // we expect to see the key after flush()
1206
1207     // store a key and check its existence
1208     check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
1209           "Failed set.");
1210     h1->release(h, NULL, i);
1211     check_key_value(h, h1, "key", "somevalue", 9);
1212     // expect error msg engine does not support operation
1213     check(h1->flush(h, NULL, 0) == ENGINE_ENOTSUP, "Flush should be disabled");
1214     //check the key
1215     check(ENGINE_SUCCESS == verify_key(h, h1, "key"), "Expected key");
1216
1217     // restart engine with flush enabled and redo the test, we expect flush to succeed
1218     std::string param = "flushall_enabled=false";
1219     std::string config = testHarness.get_current_testcase()->cfg;
1220     size_t found = config.find(param);
1221     if(found != config.npos) {
1222         config.replace(found, param.size(), "flushall_enabled=true");
1223     }
1224     testHarness.reload_engine(&h, &h1,
1225                               testHarness.engine_path,
1226                               config.c_str(),
1227                               true, false);
1228     wait_for_warmup_complete(h, h1);
1229
1230     check(h1->flush(h, NULL, 0) == ENGINE_SUCCESS, "Flush should be enabled");
1231
1232     //expect missing key
1233     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
1234
1235     return SUCCESS;
1236 }
1237
1238 static enum test_result test_flush_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1239     item *i = NULL;
1240     int mem_used = get_int_stat(h, h1, "mem_used");
1241     int overhead = get_int_stat(h, h1, "ep_overhead");
1242     int cacheSize = get_int_stat(h, h1, "ep_total_cache_size");
1243     int nonResident = get_int_stat(h, h1, "ep_num_non_resident");
1244
1245     int itemsRemoved = get_int_stat(h, h1, "ep_items_rm_from_checkpoints");
1246     check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
1247           "Failed set.");
1248     h1->release(h, NULL, i);
1249     check(store(h, h1, NULL, OPERATION_SET, "key2", "somevalue", &i) == ENGINE_SUCCESS,
1250           "Failed set.");
1251     h1->release(h, NULL, i);
1252     testHarness.time_travel(65);
1253     wait_for_stat_change(h, h1, "ep_items_rm_from_checkpoints", itemsRemoved);
1254
1255     check(ENGINE_SUCCESS == verify_key(h, h1, "key"), "Expected key");
1256     check(ENGINE_SUCCESS == verify_key(h, h1, "key2"), "Expected key2");
1257
1258     check_key_value(h, h1, "key", "somevalue", 9);
1259     check_key_value(h, h1, "key2", "somevalue", 9);
1260
1261     int mem_used2 = get_int_stat(h, h1, "mem_used");
1262     int overhead2 = get_int_stat(h, h1, "ep_overhead");
1263     int cacheSize2 = get_int_stat(h, h1, "ep_total_cache_size");
1264
1265     cb_assert(mem_used2 > mem_used);
1266     // "mem_used2 - overhead2" (i.e., ep_kv_size) should be greater than the hashtable cache size
1267     // due to the checkpoint overhead
1268     cb_assert(mem_used2 - overhead2 > cacheSize2);
1269
1270     check(h1->flush(h, NULL, 0) == ENGINE_SUCCESS, "Failed to flush");
1271     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
1272     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key2"), "Expected missing key");
1273
1274     wait_for_flusher_to_settle(h, h1);
1275
1276     mem_used2 = get_int_stat(h, h1, "mem_used");
1277     overhead2 = get_int_stat(h, h1, "ep_overhead");
1278     cacheSize2 = get_int_stat(h, h1, "ep_total_cache_size");
1279     int nonResident2 = get_int_stat(h, h1, "ep_num_non_resident");
1280
1281     cb_assert(mem_used2 == mem_used);
1282     cb_assert(overhead2 == overhead);
1283     cb_assert(nonResident2 == nonResident);
1284     cb_assert(cacheSize2 == cacheSize);
1285
1286     return SUCCESS;
1287 }
1288
1289 static enum test_result test_flush_multiv(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1290     item *i = NULL;
1291     check(set_vbucket_state(h, h1, 2, vbucket_state_active), "Failed to set vbucket state.");
1292     check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
1293           "Failed set.");
1294     h1->release(h, NULL, i);
1295     check(store(h, h1, NULL, OPERATION_SET, "key2", "somevalue", &i,
1296                 0, 2) == ENGINE_SUCCESS,
1297           "Failed set in vb2.");
1298     h1->release(h, NULL, i);
1299
1300     check(ENGINE_SUCCESS == verify_key(h, h1, "key"), "Expected key");
1301     check(ENGINE_SUCCESS == verify_key(h, h1, "key2", 2), "Expected key2");
1302
1303     check_key_value(h, h1, "key", "somevalue", 9);
1304     check_key_value(h, h1, "key2", "somevalue", 9, 2);
1305
1306     check(h1->flush(h, NULL, 0) == ENGINE_SUCCESS, "Failed to flush");
1307
1308     vals.clear();
1309     check(h1->get_stats(h, NULL, NULL, 0, add_stats) == ENGINE_SUCCESS,
1310           "Failed to get stats.");
1311     check(vals.find("ep_flush_all") != vals.end(), "Failed to get the status of flush_all");
1312
1313     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
1314     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key2", 2), "Expected missing key");
1315
1316     return SUCCESS;
1317 }
1318
1319 static int checkCurrItemsAfterShutdown(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1320                                        int numItems2Load, bool shutdownForce) {
1321     std::vector<std::string> keys;
1322     for (int index = 0; index < numItems2Load; ++index) {
1323         std::stringstream s;
1324         s << "keys_2_load-" << index;
1325         std::string key(s.str());
1326         keys.push_back(key);
1327     }
1328
1329     check(get_int_stat(h, h1, "ep_total_persisted") == 0,
1330           "Expected ep_total_persisted equals 0");
1331     check(get_int_stat(h, h1, "curr_items") == 0,
1332           "Expected curr_items equals 0");
1333
1334     // stop flusher before loading new items
1335     protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_STOP_PERSISTENCE);
1336     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
1337           "CMD_STOP_PERSISTENCE failed!");
1338     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
1339           "Failed to stop persistence!");
1340     free(pkt);
1341
1342     std::vector<std::string>::iterator itr;
1343     for (itr = keys.begin(); itr != keys.end(); ++itr) {
1344         item *i;
1345         check(store(h, h1, NULL, OPERATION_SET, itr->c_str(), "oracle", &i, 0, 0)
1346               == ENGINE_SUCCESS, "Failed to store a value");
1347         h1->release(h, NULL, i);
1348     }
1349
1350     check(get_int_stat(h, h1, "ep_total_persisted") == 0,
1351           "Incorrect ep_total_persisted, expected 0");
1352     std::stringstream ss;
1353     ss << "Incorrect curr_items, expected " << numItems2Load;
1354     std::string errmsg(ss.str());
1355     check(get_int_stat(h, h1, "curr_items") == numItems2Load,
1356           errmsg.c_str());
1357
1358     // resume flusher before shutdown + warmup
1359     pkt = createPacket(PROTOCOL_BINARY_CMD_START_PERSISTENCE);
1360     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
1361           "CMD_START_PERSISTENCE failed!");
1362     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
1363           "Failed to start persistence!");
1364     free(pkt);
1365
1366     // shutdown engine force and restart
1367     testHarness.reload_engine(&h, &h1,
1368                               testHarness.engine_path,
1369                               testHarness.get_current_testcase()->cfg,
1370                               true, shutdownForce);
1371     wait_for_warmup_complete(h, h1);
1372     return get_int_stat(h, h1, "curr_items");
1373 }
1374
1375 static enum test_result test_flush_shutdown_force(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1376     int numItems2load = 3000;
1377     bool shutdownForce = true;
1378     int currItems = checkCurrItemsAfterShutdown(h, h1, numItems2load, shutdownForce);
1379     check (currItems <= numItems2load,
1380            "Number of curr items should be <= 3000, unless previous "
1381            "shutdown force had to wait for the flusher");
1382     return SUCCESS;
1383 }
1384
1385 static enum test_result test_flush_shutdown_noforce(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1386     int numItems2load = 3000;
1387     bool shutdownForce = false;
1388     int currItems = checkCurrItemsAfterShutdown(h, h1, numItems2load, shutdownForce);
1389     check (currItems == numItems2load,
1390            "Number of curr items should be equal to 3000, unless previous "
1391            "shutdown did not wait for the flusher");
1392     return SUCCESS;
1393 }
1394
1395 static enum test_result test_flush_restart(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1396     item *i = NULL;
1397     // First try to delete something we know to not be there.
1398     check(del(h, h1, "key", 0, 0) == ENGINE_KEY_ENOENT, "Failed to fail initial delete.");
1399     check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
1400           "Failed set.");
1401     h1->release(h, NULL, i);
1402     check_key_value(h, h1, "key", "somevalue", 9);
1403
1404     // Restart once to ensure written to disk.
1405     testHarness.reload_engine(&h, &h1,
1406                               testHarness.engine_path,
1407                               testHarness.get_current_testcase()->cfg,
1408                               true, false);
1409     wait_for_warmup_complete(h, h1);
1410
1411     // Read value from disk.
1412     check_key_value(h, h1, "key", "somevalue", 9);
1413
1414     // Flush
1415     check(h1->flush(h, NULL, 0) == ENGINE_SUCCESS,
1416           "Failed to flush");
1417
1418     check(store(h, h1, NULL, OPERATION_SET, "key2", "somevalue", &i) == ENGINE_SUCCESS,
1419           "Failed post-flush set.");
1420     h1->release(h, NULL, i);
1421     check_key_value(h, h1, "key2", "somevalue", 9);
1422
1423     // Restart again, ensure written to disk.
1424     testHarness.reload_engine(&h, &h1,
1425                               testHarness.engine_path,
1426                               testHarness.get_current_testcase()->cfg,
1427                               true, false);
1428     wait_for_warmup_complete(h, h1);
1429
1430     check(store(h, h1, NULL, OPERATION_SET, "key3", "somevalue", &i) == ENGINE_SUCCESS,
1431           "Failed post-flush, post-restart set.");
1432     h1->release(h, NULL, i);
1433     check_key_value(h, h1, "key3", "somevalue", 9);
1434
1435     // Read value again, should not be there.
1436     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
1437     return SUCCESS;
1438 }
1439
1440 static enum test_result test_flush_multiv_restart(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1441     item *i = NULL;
1442     check(set_vbucket_state(h, h1, 2, vbucket_state_active), "Failed to set vbucket state.");
1443     check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
1444           "Failed set.");
1445     h1->release(h, NULL, i);
1446     check(store(h, h1, NULL, OPERATION_SET, "key2", "somevalue", &i,
1447                 0, 2) == ENGINE_SUCCESS,
1448           "Failed set in vb2.");
1449     h1->release(h, NULL, i);
1450
1451     // Restart once to ensure written to disk.
1452     testHarness.reload_engine(&h, &h1,
1453                               testHarness.engine_path,
1454                               testHarness.get_current_testcase()->cfg,
1455                               true, false);
1456     wait_for_warmup_complete(h, h1);
1457
1458     // Read value from disk.
1459     check_key_value(h, h1, "key", "somevalue", 9);
1460
1461     // Flush
1462     check(h1->flush(h, NULL, 0) == ENGINE_SUCCESS,
1463           "Failed to flush");
1464
1465     // Restart again, ensure written to disk.
1466     testHarness.reload_engine(&h, &h1,
1467                               testHarness.engine_path,
1468                               testHarness.get_current_testcase()->cfg,
1469                               true, false);
1470     wait_for_warmup_complete(h, h1);
1471
1472     // Read value again, should not be there.
1473     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
1474     check(verify_vbucket_missing(h, h1, 2), "Bucket 2 came back.");
1475     return SUCCESS;
1476 }
1477
1478 static enum test_result test_delete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1479     item *i = NULL;
1480     // First try to delete something we know to not be there.
1481     check(del(h, h1, "key", 0, 0) == ENGINE_KEY_ENOENT, "Failed to fail initial delete.");
1482     check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i) == ENGINE_SUCCESS,
1483           "Failed set.");
1484     Item *it = reinterpret_cast<Item*>(i);
1485     uint64_t orig_cas = it->getCas();
1486     h1->release(h, NULL, i);
1487     check_key_value(h, h1, "key", "somevalue", 9);
1488
1489     uint64_t cas = 0;
1490     check(h1->remove(h, NULL, "key", 3, &cas, 0) == ENGINE_SUCCESS,
1491           "Failed remove with value.");
1492     check(orig_cas + 1 == cas, "Cas mismatch on delete");
1493     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
1494
1495     // Can I time travel to an expired object and delete it?
1496     checkeq(ENGINE_SUCCESS, store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i),
1497             "Failed set.");
1498     h1->release(h, NULL, i);
1499     testHarness.time_travel(3617);
1500     checkeq(ENGINE_KEY_ENOENT, del(h, h1, "key", 0, 0),
1501             "Did not get ENOENT removing an expired object.");
1502     checkeq(ENGINE_KEY_ENOENT, verify_key(h, h1, "key"), "Expected missing key");
1503
1504     return SUCCESS;
1505 }
1506
1507 static enum test_result test_set_delete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1508     item *i = NULL;
1509     checkeq(ENGINE_SUCCESS, store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i),
1510             "Failed set.");
1511     h1->release(h, NULL, i);
1512     check_key_value(h, h1, "key", "somevalue", 9);
1513     checkeq(ENGINE_SUCCESS, del(h, h1, "key", 0, 0),
1514             "Failed remove with value.");
1515     checkeq(ENGINE_KEY_ENOENT, verify_key(h, h1, "key"), "Expected missing key");
1516     wait_for_flusher_to_settle(h, h1);
1517     wait_for_stat_to_be(h, h1, "curr_items", 0);
1518     return SUCCESS;
1519 }
1520
1521 static enum test_result test_set_delete_invalid_cas(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1522     item *i = NULL;
1523     check(store(h, h1, NULL, OPERATION_SET, "key",
1524                 "somevalue", &i) == ENGINE_SUCCESS,
1525           "Failed set.");
1526     check_key_value(h, h1, "key", "somevalue", 9);
1527     item_info info;
1528     info.nvalue = 1;
1529     check(h1->get_item_info(h, NULL, i, &info) == true,
1530           "Should be able to get info");
1531     h1->release(h, NULL, i);
1532
1533     check(del(h, h1, "key", info.cas + 1, 0) == ENGINE_KEY_EEXISTS,
1534           "Didn't expect to be able to remove the item with wrong cas");
1535     return SUCCESS;
1536 }
1537
1538 static enum test_result test_bug2509(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1539     for (int j = 0; j < 10000; ++j) {
1540         item *itm = NULL;
1541         checkeq(ENGINE_SUCCESS,
1542                 store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &itm),
1543                 "Failed set.");
1544         h1->release(h, NULL, itm);
1545         usleep(10);
1546         checkeq(ENGINE_SUCCESS, del(h, h1, "key", 0, 0), "Failed remove with value.");
1547         usleep(10);
1548     }
1549
1550     // Restart again, to verify we don't have any duplicates.
1551     testHarness.reload_engine(&h, &h1,
1552                               testHarness.engine_path,
1553                               testHarness.get_current_testcase()->cfg,
1554                               true, false);
1555     wait_for_warmup_complete(h, h1);
1556
1557     return get_int_stat(h, h1, "ep_warmup_dups") == 0 ? SUCCESS : FAIL;
1558 }
1559
1560 static enum test_result test_bug7023(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1561     std::vector<std::string> keys;
1562     // Make a vbucket mess.
1563     for (int j = 0; j < 10000; ++j) {
1564         std::stringstream ss;
1565         ss << "key" << j;
1566         std::string key(ss.str());
1567         keys.push_back(key);
1568     }
1569
1570     std::vector<std::string>::iterator it;
1571     for (int j = 0; j < 5; ++j) {
1572         check(set_vbucket_state(h, h1, 0, vbucket_state_dead), "Failed set set vbucket 0 dead.");
1573         vbucketDelete(h, h1, 0);
1574         check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
1575               "Expected vbucket deletion to work.");
1576         check(set_vbucket_state(h, h1, 0, vbucket_state_active), "Failed set set vbucket 0 active.");
1577         for (it = keys.begin(); it != keys.end(); ++it) {
1578             item *i;
1579             check(store(h, h1, NULL, OPERATION_SET, it->c_str(), it->c_str(), &i)
1580                   == ENGINE_SUCCESS, "Failed to store a value");
1581         }
1582     }
1583     wait_for_flusher_to_settle(h, h1);
1584
1585     // Restart again, to verify no data loss.
1586     testHarness.reload_engine(&h, &h1,
1587                               testHarness.engine_path,
1588                               testHarness.get_current_testcase()->cfg,
1589                               true, false);
1590     wait_for_warmup_complete(h, h1);
1591     return get_int_stat(h, h1, "ep_warmup_value_count", "warmup") == 10000 ? SUCCESS : FAIL;
1592 }
1593
1594 static enum test_result test_delete_set(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1595     wait_for_persisted_value(h, h1, "key", "value1");
1596
1597     check(del(h, h1, "key", 0, 0) == ENGINE_SUCCESS, "Failed remove with value.");
1598
1599     wait_for_persisted_value(h, h1, "key", "value2");
1600
1601     testHarness.reload_engine(&h, &h1,
1602                               testHarness.engine_path,
1603                               testHarness.get_current_testcase()->cfg,
1604                               true, false);
1605     wait_for_warmup_complete(h, h1);
1606
1607     check_key_value(h, h1, "key", "value2", 6);
1608     check(del(h, h1, "key", 0, 0) == ENGINE_SUCCESS, "Failed remove with value.");
1609     wait_for_flusher_to_settle(h, h1);
1610
1611     testHarness.reload_engine(&h, &h1,
1612                               testHarness.engine_path,
1613                               testHarness.get_current_testcase()->cfg,
1614                               true, false);
1615     wait_for_warmup_complete(h, h1);
1616
1617     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
1618
1619     return SUCCESS;
1620 }
1621
1622 static enum test_result test_get_delete_missing_file(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1623     const char *key = "key";
1624     wait_for_persisted_value(h, h1, key, "value2delete");
1625
1626     // whack the db file and directory where the key is stored
1627     rmdb();
1628
1629     item *i = NULL;
1630     int errorCode = h1->get(h, NULL, &i, key, strlen(key), 0);
1631     h1->release(h, NULL, i);
1632
1633     // ep engine must be unaware of well-being of the db file as long as
1634     // the item is still in the memory
1635     check(errorCode == ENGINE_SUCCESS, "Expected success for get");
1636
1637     i = NULL;
1638     evict_key(h, h1, key);
1639     errorCode = h1->get(h, NULL, &i, key, strlen(key), 0);
1640     h1->release(h, NULL, i);
1641
1642     // ep engine must be now aware of the ill-fated db file where
1643     // the item is supposedly stored
1644     check(errorCode == ENGINE_TMPFAIL, "Expected tmp fail for get");
1645
1646     return SUCCESS;
1647 }
1648
1649
1650 static enum test_result test_restart(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1651     item *i = NULL;
1652     static const char val[] = "somevalue";
1653     ENGINE_ERROR_CODE ret;
1654     check((ret = store(h, h1, NULL, OPERATION_SET, "key", val, &i)) == ENGINE_SUCCESS,
1655           "Failed set.");
1656     h1->release(h, NULL, i);
1657
1658     testHarness.reload_engine(&h, &h1,
1659                               testHarness.engine_path,
1660                               testHarness.get_current_testcase()->cfg,
1661                               true, false);
1662     wait_for_warmup_complete(h, h1);
1663     check_key_value(h, h1, "key", val, strlen(val));
1664     return SUCCESS;
1665 }
1666
1667 static enum test_result test_restart_session_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1668     createTapConn(h, h1, "tap_client_thread");
1669
1670     testHarness.reload_engine(&h, &h1,
1671                               testHarness.engine_path,
1672                               testHarness.get_current_testcase()->cfg,
1673                               true, false);
1674     wait_for_warmup_complete(h, h1);
1675     createTapConn(h, h1, "tap_client_thread");
1676
1677     check(h1->get_stats(h, NULL, "tap", 3, add_stats) == ENGINE_SUCCESS,
1678           "Failed to get stats.");
1679     std::string val = vals["eq_tapq:tap_client_thread:backfill_completed"];
1680     check(strcmp(val.c_str(), "true") == 0, "Don't expect the backfill upon restart");
1681     return SUCCESS;
1682 }
1683
1684 static enum test_result test_specialKeys(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1685     item *i = NULL;
1686     ENGINE_ERROR_CODE ret;
1687
1688     // Simplified Chinese "Couchbase"
1689     static const char key0[] = "沙发数据库";
1690     static const char val0[] = "some Chinese value";
1691     check((ret = store(h, h1, NULL, OPERATION_SET, key0, val0, &i)) == ENGINE_SUCCESS,
1692           "Failed set Chinese key");
1693     check_key_value(h, h1, key0, val0, strlen(val0));
1694     h1->release(h, NULL, i);
1695     // Traditional Chinese "Couchbase"
1696     static const char key1[] = "沙發數據庫";
1697     static const char val1[] = "some Traditional Chinese value";
1698     check((ret = store(h, h1, NULL, OPERATION_SET, key1, val1, &i)) == ENGINE_SUCCESS,
1699           "Failed set Traditional Chinese key");
1700     h1->release(h, NULL, i);
1701     // Korean "couch potato"
1702     static const char key2[] = "쇼파감자";
1703     static const char val2[] = "some Korean value";
1704     check((ret = store(h, h1, NULL, OPERATION_SET, key2, val2, &i)) == ENGINE_SUCCESS,
1705           "Failed set Korean key");
1706     h1->release(h, NULL, i);
1707     // Russian "couch potato"
1708     static const char key3[] = "лодырь, лентяй";
1709     static const char val3[] = "some Russian value";
1710     check((ret = store(h, h1, NULL, OPERATION_SET, key3, val3, &i)) == ENGINE_SUCCESS,
1711           "Failed set Russian key");
1712     h1->release(h, NULL, i);
1713     // Japanese "couch potato"
1714     static const char key4[] = "カウチポテト";
1715     static const char val4[] = "some Japanese value";
1716     check((ret = store(h, h1, NULL, OPERATION_SET, key4, val4, &i)) == ENGINE_SUCCESS,
1717           "Failed set Japanese key");
1718     h1->release(h, NULL, i);
1719     // Indian char key, and no idea what it is
1720     static const char key5[] = "हरियानवी";
1721     static const char val5[] = "some Indian value";
1722     check((ret = store(h, h1, NULL, OPERATION_SET, key5, val5, &i)) == ENGINE_SUCCESS,
1723           "Failed set Indian key");
1724     h1->release(h, NULL, i);
1725     // Portuguese translation "couch potato"
1726     static const char key6[] = "sedentário";
1727     static const char val6[] = "some Portuguese value";
1728     check((ret = store(h, h1, NULL, OPERATION_SET, key6, val6, &i)) == ENGINE_SUCCESS,
1729           "Failed set Portuguese key");
1730     h1->release(h, NULL, i);
1731     // Arabic translation "couch potato"
1732     static const char key7[] = "الحافلةالبطاطة";
1733     static const char val7[] = "some Arabic value";
1734     check((ret = store(h, h1, NULL, OPERATION_SET, key7, val7, &i)) == ENGINE_SUCCESS,
1735           "Failed set Arabic key");
1736     h1->release(h, NULL, i);
1737
1738     testHarness.reload_engine(&h, &h1,
1739                               testHarness.engine_path,
1740                               testHarness.get_current_testcase()->cfg,
1741                               true, false);
1742     wait_for_warmup_complete(h, h1);
1743     check_key_value(h, h1, key0, val0, strlen(val0));
1744     check_key_value(h, h1, key1, val1, strlen(val1));
1745     check_key_value(h, h1, key2, val2, strlen(val2));
1746     check_key_value(h, h1, key3, val3, strlen(val3));
1747     check_key_value(h, h1, key4, val4, strlen(val4));
1748     check_key_value(h, h1, key5, val5, strlen(val5));
1749     check_key_value(h, h1, key6, val6, strlen(val6));
1750     check_key_value(h, h1, key7, val7, strlen(val7));
1751     return SUCCESS;
1752 }
1753
1754 static enum test_result test_binKeys(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1755     item *i = NULL;
1756     ENGINE_ERROR_CODE ret;
1757
1758     // binary key with char values beyond 0x7F
1759     static const char key0[] = "\xe0\xed\xf1\x6f\x7f\xf8\xfa";
1760     static const char val0[] = "some value val8";
1761     check((ret = store(h, h1, NULL, OPERATION_SET, key0, val0, &i)) == ENGINE_SUCCESS,
1762           "Failed set binary key0");
1763     check_key_value(h, h1, key0, val0, strlen(val0));
1764     h1->release(h, NULL, i);
1765     // binary keys with char values beyond 0x7F
1766     static const char key1[] = "\xf1\xfd\xfe\xff\xf0\xf8\xef";
1767     static const char val1[] = "some value val9";
1768     check((ret = store(h, h1, NULL, OPERATION_SET, key1, val1, &i)) == ENGINE_SUCCESS,
1769           "Failed set binary key1");
1770     check_key_value(h, h1, key1, val1, strlen(val1));
1771     h1->release(h, NULL, i);
1772     // binary keys with special utf-8 BOM (Byte Order Mark) values 0xBB 0xBF 0xEF
1773     static const char key2[] = "\xff\xfe\xbb\xbf\xef";
1774     static const char val2[] = "some utf-8 bom value";
1775     check((ret = store(h, h1, NULL, OPERATION_SET, key2, val2, &i)) == ENGINE_SUCCESS,
1776           "Failed set binary utf-8 bom key");
1777     check_key_value(h, h1, key2, val2, strlen(val2));
1778     h1->release(h, NULL, i);
1779     // binary keys with special utf-16BE BOM values "U+FEFF"
1780     static const char key3[] = "U+\xfe\xff\xefU+\xff\xfe";
1781     static const char val3[] = "some utf-16 bom value";
1782     check((ret = store(h, h1, NULL, OPERATION_SET, key3, val3, &i)) == ENGINE_SUCCESS,
1783           "Failed set binary utf-16 bom key");
1784     check_key_value(h, h1, key3, val3, strlen(val3));
1785     h1->release(h, NULL, i);
1786
1787     testHarness.reload_engine(&h, &h1,
1788                               testHarness.engine_path,
1789                               testHarness.get_current_testcase()->cfg,
1790                               true, false);
1791     wait_for_warmup_complete(h, h1);
1792     check_key_value(h, h1, key0, val0, strlen(val0));
1793     check_key_value(h, h1, key1, val1, strlen(val1));
1794     check_key_value(h, h1, key2, val2, strlen(val2));
1795     check_key_value(h, h1, key3, val3, strlen(val3));
1796     return SUCCESS;
1797 }
1798
1799 static enum test_result test_restart_bin_val(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1800
1801
1802
1803     char binaryData[] = "abcdefg\0gfedcba";
1804     cb_assert(sizeof(binaryData) != strlen(binaryData));
1805
1806     item *i = NULL;
1807     check(storeCasVb11(h, h1, NULL, OPERATION_SET, "key",
1808                        binaryData, sizeof(binaryData), 82758, &i, 0, 0)
1809           == ENGINE_SUCCESS,
1810           "Failed set.");
1811     h1->release(h, NULL, i);
1812
1813     testHarness.reload_engine(&h, &h1,
1814                               testHarness.engine_path,
1815                               testHarness.get_current_testcase()->cfg,
1816                               true, false);
1817     wait_for_warmup_complete(h, h1);
1818
1819     check_key_value(h, h1, "key", binaryData, sizeof(binaryData));
1820     return SUCCESS;
1821 }
1822
1823 static enum test_result test_wrong_vb_get(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1824     int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
1825     check(ENGINE_NOT_MY_VBUCKET == verify_key(h, h1, "key", 1),
1826           "Expected wrong bucket.");
1827     wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
1828     return SUCCESS;
1829 }
1830
1831 static enum test_result test_vb_get_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1832     check(set_vbucket_state(h, h1, 1, vbucket_state_pending), "Failed to set vbucket state.");
1833     const void *cookie = testHarness.create_cookie();
1834     testHarness.set_ewouldblock_handling(cookie, false);
1835
1836     item *i = NULL;
1837     check(ENGINE_EWOULDBLOCK == h1->get(h, cookie, &i, "key", strlen("key"), 1),
1838           "Expected woodblock.");
1839     h1->release(h, NULL, i);
1840
1841     testHarness.destroy_cookie(cookie);
1842     return SUCCESS;
1843 }
1844
1845 static enum test_result test_vb_get_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1846     check(set_vbucket_state(h, h1, 1, vbucket_state_replica), "Failed to set vbucket state.");
1847     int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
1848     check(ENGINE_NOT_MY_VBUCKET == verify_key(h, h1, "key", 1),
1849           "Expected not my bucket.");
1850     wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
1851     return SUCCESS;
1852 }
1853
1854 static enum test_result test_wrong_vb_incr(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1855     uint64_t cas, result;
1856     int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
1857     check(h1->arithmetic(h, NULL, "key", 3, true, false, 1, 1, 0,
1858                          &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
1859                          1) == ENGINE_NOT_MY_VBUCKET,
1860           "Expected not my vbucket.");
1861     wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
1862     return SUCCESS;
1863 }
1864
1865 static enum test_result test_vb_incr_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1866     const void *cookie = testHarness.create_cookie();
1867     testHarness.set_ewouldblock_handling(cookie, false);
1868     uint64_t cas, result;
1869     check(set_vbucket_state(h, h1, 1, vbucket_state_pending), "Failed to set vbucket state.");
1870     check(h1->arithmetic(h, cookie, "key", 3, true, false, 1, 1, 0,
1871                          &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
1872                          1) == ENGINE_EWOULDBLOCK,
1873           "Expected woodblock.");
1874     testHarness.destroy_cookie(cookie);
1875     return SUCCESS;
1876 }
1877
1878 static enum test_result test_vb_incr_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1879     uint64_t cas, result;
1880     check(set_vbucket_state(h, h1, 1, vbucket_state_replica), "Failed to set vbucket state.");
1881     int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
1882     check(h1->arithmetic(h, NULL, "key", 3, true, false, 1, 1, 0,
1883                          &cas, PROTOCOL_BINARY_RAW_BYTES, &result,
1884                          1) == ENGINE_NOT_MY_VBUCKET,
1885           "Expected not my bucket.");
1886     wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
1887     return SUCCESS;
1888 }
1889
1890 static enum test_result test_wrong_vb_set(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1891     return test_wrong_vb_mutation(h, h1, OPERATION_SET);
1892 }
1893
1894 static enum test_result test_wrong_vb_cas(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1895     return test_wrong_vb_mutation(h, h1, OPERATION_CAS);
1896 }
1897
1898 static enum test_result test_wrong_vb_add(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1899     return test_wrong_vb_mutation(h, h1, OPERATION_ADD);
1900 }
1901
1902 static enum test_result test_wrong_vb_replace(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1903     return test_wrong_vb_mutation(h, h1, OPERATION_REPLACE);
1904 }
1905
1906 static enum test_result test_wrong_vb_append(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1907     return test_wrong_vb_mutation(h, h1, OPERATION_APPEND);
1908 }
1909
1910 static enum test_result test_wrong_vb_prepend(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1911     return test_wrong_vb_mutation(h, h1, OPERATION_PREPEND);
1912 }
1913
1914 static enum test_result test_wrong_vb_del(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1915     int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
1916     check(ENGINE_NOT_MY_VBUCKET == del(h, h1, "key", 0, 1), "Expected wrong bucket.");
1917     wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
1918     return SUCCESS;
1919 }
1920
1921 static enum test_result test_expiry(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1922     const char *key = "test_expiry";
1923     const char *data = "some test data here.";
1924
1925     item *it = NULL;
1926
1927     ENGINE_ERROR_CODE rv;
1928     rv = h1->allocate(h, NULL, &it, key, strlen(key), strlen(data), 0, 2,
1929                       PROTOCOL_BINARY_RAW_BYTES);
1930     check(rv == ENGINE_SUCCESS, "Allocation failed.");
1931
1932     item_info info;
1933     info.nvalue = 1;
1934     if (!h1->get_item_info(h, NULL, it, &info)) {
1935         abort();
1936     }
1937     memcpy(info.value[0].iov_base, data, strlen(data));
1938
1939     uint64_t cas = 0;
1940     rv = h1->store(h, NULL, it, &cas, OPERATION_SET, 0);
1941     check(rv == ENGINE_SUCCESS, "Set failed.");
1942     check_key_value(h, h1, key, data, strlen(data));
1943     h1->release(h, NULL, it);
1944
1945     testHarness.time_travel(5);
1946     check(h1->get(h, NULL, &it, key, strlen(key), 0) == ENGINE_KEY_ENOENT,
1947           "Item didn't expire");
1948
1949     int expired_access = get_int_stat(h, h1, "ep_expired_access");
1950     int expired_pager = get_int_stat(h, h1, "ep_expired_pager");
1951     int active_expired = get_int_stat(h, h1, "vb_active_expired");
1952     check(expired_pager == 0, "Expected zero expired item by pager");
1953     check(expired_access == 1, "Expected an expired item on access");
1954     check(active_expired == 1, "Expected an expired active item");
1955     checkeq(ENGINE_SUCCESS, store(h, h1, NULL, OPERATION_SET, key, data, &it),
1956             "Failed set.");
1957     h1->release(h, NULL, it);
1958
1959     std::stringstream ss;
1960     ss << "curr_items stat should be still 1 after ";
1961     ss << "overwriting the key that was expired, but not purged yet";
1962     checkeq(1, get_int_stat(h, h1, "curr_items"), ss.str().c_str());
1963
1964     return SUCCESS;
1965 }
1966
1967 static enum test_result test_expiry_loader(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1968     const char *key = "test_expiry_loader";
1969     const char *data = "some test data here.";
1970
1971     item *it = NULL;
1972
1973     ENGINE_ERROR_CODE rv;
1974     rv = h1->allocate(h, NULL, &it, key, strlen(key), strlen(data), 0, 2,
1975                       PROTOCOL_BINARY_RAW_BYTES);
1976     check(rv == ENGINE_SUCCESS, "Allocation failed.");
1977
1978     item_info info;
1979     info.nvalue = 1;
1980     if (!h1->get_item_info(h, NULL, it, &info)) {
1981         abort();
1982     }
1983     memcpy(info.value[0].iov_base, data, strlen(data));
1984
1985     uint64_t cas = 0;
1986     rv = h1->store(h, NULL, it, &cas, OPERATION_SET, 0);
1987     check(rv == ENGINE_SUCCESS, "Set failed.");
1988     check_key_value(h, h1, key, data, strlen(data));
1989     h1->release(h, NULL, it);
1990
1991     testHarness.time_travel(3);
1992
1993     check(h1->get(h, NULL, &it, key, strlen(key), 0) == ENGINE_KEY_ENOENT,
1994           "Item didn't expire");
1995
1996     // Restart the engine to ensure the above expired item is not loaded
1997     testHarness.reload_engine(&h, &h1,
1998                               testHarness.engine_path,
1999                               testHarness.get_current_testcase()->cfg,
2000                               true, false);
2001     wait_for_warmup_complete(h, h1);
2002     cb_assert(0 == get_int_stat(h, h1, "ep_warmup_value_count", "warmup"));
2003
2004     return SUCCESS;
2005 }
2006
2007 static enum test_result test_expiration_on_warmup(ENGINE_HANDLE *h,
2008                                                   ENGINE_HANDLE_V1 *h1) {
2009     const char *key = "KEY";
2010     const char *data = "VALUE";
2011
2012     item *it = NULL;
2013
2014     ENGINE_ERROR_CODE rv;
2015     rv = h1->allocate(h, NULL, &it, key, strlen(key), strlen(data), 0, 3,
2016                       PROTOCOL_BINARY_RAW_BYTES);
2017     check(rv == ENGINE_SUCCESS, "Allocation failed.");
2018
2019     item_info info;
2020     info.nvalue = 1;
2021     if (!h1->get_item_info(h, NULL, it, &info)) {
2022         abort();
2023     }
2024     memcpy(info.value[0].iov_base, data, strlen(data));
2025
2026     uint64_t cas = 0;
2027     rv = h1->store(h, NULL, it, &cas, OPERATION_SET, 0);
2028     check(rv == ENGINE_SUCCESS, "Set failed.");
2029     check_key_value(h, h1, key, data, strlen(data));
2030     h1->release(h, NULL, it);
2031     wait_for_flusher_to_settle(h, h1);
2032
2033     check(get_int_stat(h, h1, "curr_items") == 1, "Failed store item");
2034     testHarness.time_travel(5);
2035
2036     // Restart the engine to ensure the above item is expired
2037     testHarness.reload_engine(&h, &h1,
2038                               testHarness.engine_path,
2039                               testHarness.get_current_testcase()->cfg,
2040                               true, false);
2041     wait_for_warmup_complete(h, h1);
2042     int pager_runs = get_int_stat(h, h1, "ep_num_expiry_pager_runs");
2043     wait_for_stat_change(h, h1, "ep_num_expiry_pager_runs", pager_runs);
2044     wait_for_flusher_to_settle(h, h1);
2045     check(get_int_stat(h, h1, "curr_items") == 0,
2046             "The item should have been expired.");
2047
2048     return SUCCESS;
2049
2050 }
2051
2052 static enum test_result test_bug3454(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2053     const char *key = "test_expiry_duplicate_warmup";
2054     const char *data = "some test data here.";
2055
2056     item *it = NULL;
2057
2058     ENGINE_ERROR_CODE rv;
2059     rv = h1->allocate(h, NULL, &it, key, strlen(key), strlen(data), 0, 5,
2060                       PROTOCOL_BINARY_RAW_BYTES);
2061     check(rv == ENGINE_SUCCESS, "Allocation failed.");
2062
2063     item_info info;
2064     info.nvalue = 1;
2065     if (!h1->get_item_info(h, NULL, it, &info)) {
2066         abort();
2067     }
2068     memcpy(info.value[0].iov_base, data, strlen(data));
2069
2070     uint64_t cas = 0;
2071     rv = h1->store(h, NULL, it, &cas, OPERATION_SET, 0);
2072     check(rv == ENGINE_SUCCESS, "Set failed.");
2073     check_key_value(h, h1, key, data, strlen(data));
2074     h1->release(h, NULL, it);
2075     wait_for_flusher_to_settle(h, h1);
2076
2077     // Advance the ep_engine time by 10 sec for the above item to be expired.
2078     testHarness.time_travel(10);
2079     check(h1->get(h, NULL, &it, key, strlen(key), 0) == ENGINE_KEY_ENOENT,
2080           "Item didn't expire");
2081
2082     rv = h1->allocate(h, NULL, &it, key, strlen(key), strlen(data), 0, 0,
2083                       PROTOCOL_BINARY_RAW_BYTES);
2084     check(rv == ENGINE_SUCCESS, "Allocation failed.");
2085
2086     info.nvalue = 1;
2087     if (!h1->get_item_info(h, NULL, it, &info)) {
2088         abort();
2089     }
2090     memcpy(info.value[0].iov_base, data, strlen(data));
2091
2092     cas = 0;
2093     // Add a new item with the same key.
2094     rv = h1->store(h, NULL, it, &cas, OPERATION_ADD, 0);
2095     check(rv == ENGINE_SUCCESS, "Add failed.");
2096     check_key_value(h, h1, key, data, strlen(data));
2097     h1->release(h, NULL, it);
2098
2099     check(h1->get(h, NULL, &it, key, strlen(key), 0) == ENGINE_SUCCESS,
2100           "Item shouldn't expire");
2101     h1->release(h, NULL, it);
2102
2103     // Restart the engine to ensure the above unexpired new item is loaded
2104     testHarness.reload_engine(&h, &h1,
2105                               testHarness.engine_path,
2106                               testHarness.get_current_testcase()->cfg,
2107                               true, false);
2108     wait_for_warmup_complete(h, h1);
2109     cb_assert(1 == get_int_stat(h, h1, "ep_warmup_value_count", "warmup"));
2110     cb_assert(0 == get_int_stat(h, h1, "ep_warmup_dups", "warmup"));
2111
2112     return SUCCESS;
2113 }
2114
2115 static enum test_result test_bug3522(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2116     const char *key = "test_expiry_no_items_warmup";
2117     const char *data = "some test data here.";
2118
2119     item *it = NULL;
2120
2121     ENGINE_ERROR_CODE rv;
2122     rv = h1->allocate(h, NULL, &it, key, strlen(key), strlen(data), 0, 0,
2123                       PROTOCOL_BINARY_RAW_BYTES);
2124     check(rv == ENGINE_SUCCESS, "Allocation failed.");
2125
2126     item_info info;
2127     info.nvalue = 1;
2128     if (!h1->get_item_info(h, NULL, it, &info)) {
2129         abort();
2130     }
2131     memcpy(info.value[0].iov_base, data, strlen(data));
2132
2133     uint64_t cas = 0;
2134     rv = h1->store(h, NULL, it, &cas, OPERATION_SET, 0);
2135     check(rv == ENGINE_SUCCESS, "Set failed.");
2136     check_key_value(h, h1, key, data, strlen(data));
2137     h1->release(h, NULL, it);
2138     wait_for_flusher_to_settle(h, h1);
2139
2140     // Add a new item with the same key and 2 sec of expiration.
2141     const char *new_data = "new data here.";
2142     rv = h1->allocate(h, NULL, &it, key, strlen(key), strlen(new_data), 0, 2,
2143                       PROTOCOL_BINARY_RAW_BYTES);
2144     check(rv == ENGINE_SUCCESS, "Allocation failed.");
2145
2146     info.nvalue = 1;
2147     if (!h1->get_item_info(h, NULL, it, &info)) {
2148         abort();
2149     }
2150     memcpy(info.value[0].iov_base, new_data, strlen(new_data));
2151
2152     int pager_runs = get_int_stat(h, h1, "ep_num_expiry_pager_runs");
2153     cas = 0;
2154     rv = h1->store(h, NULL, it, &cas, OPERATION_SET, 0);
2155     check(rv == ENGINE_SUCCESS, "Set failed.");
2156     check_key_value(h, h1, key, new_data, strlen(new_data));
2157     h1->release(h, NULL, it);
2158     testHarness.time_travel(3);
2159     wait_for_stat_change(h, h1, "ep_num_expiry_pager_runs", pager_runs);
2160     wait_for_flusher_to_settle(h, h1);
2161
2162     // Restart the engine.
2163     testHarness.reload_engine(&h, &h1,
2164                               testHarness.engine_path,
2165                               testHarness.get_current_testcase()->cfg,
2166                               true, false);
2167     wait_for_warmup_complete(h, h1);
2168     // TODO: modify this for a better test case
2169     cb_assert(0 == get_int_stat(h, h1, "ep_warmup_dups", "warmup"));
2170
2171     return SUCCESS;
2172 }
2173
2174 static enum test_result test_get_replica_active_state(ENGINE_HANDLE *h,
2175                                                       ENGINE_HANDLE_V1 *h1) {
2176     protocol_binary_request_header *pkt;
2177     pkt = prepare_get_replica(h, h1, vbucket_state_active);
2178     check(h1->unknown_command(h, NULL, pkt, add_response) ==
2179           ENGINE_SUCCESS, "Get Replica Failed");
2180     check(last_status == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET,
2181           "Expected PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET response.");
2182
2183     return SUCCESS;
2184 }
2185
2186 static enum test_result test_get_replica_pending_state(ENGINE_HANDLE *h,
2187                                                        ENGINE_HANDLE_V1 *h1) {
2188     protocol_binary_request_header *pkt;
2189
2190     const void *cookie = testHarness.create_cookie();
2191     testHarness.set_ewouldblock_handling(cookie, false);
2192     pkt = prepare_get_replica(h, h1, vbucket_state_pending);
2193     check(h1->unknown_command(h, cookie, pkt, add_response) ==
2194           ENGINE_EWOULDBLOCK, "Should have returned error for pending state");
2195     testHarness.destroy_cookie(cookie);
2196     return SUCCESS;
2197 }
2198
2199 static enum test_result test_get_replica_dead_state(ENGINE_HANDLE *h,
2200                                                     ENGINE_HANDLE_V1 *h1) {
2201     protocol_binary_request_header *pkt;
2202     pkt = prepare_get_replica(h, h1, vbucket_state_dead);
2203     check(h1->unknown_command(h, NULL, pkt, add_response) ==
2204           ENGINE_SUCCESS, "Get Replica Failed");
2205     check(last_status == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET,
2206           "Expected PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET response.");
2207
2208     return SUCCESS;
2209 }
2210
2211 static enum test_result test_get_replica(ENGINE_HANDLE *h,
2212                                          ENGINE_HANDLE_V1 *h1) {
2213     protocol_binary_request_header *pkt;
2214     pkt = prepare_get_replica(h, h1, vbucket_state_replica);
2215     check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
2216                               "Get Replica Failed");
2217     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
2218           "Expected PROTOCOL_BINARY_RESPONSE_SUCCESS response.");
2219     check(last_body == "replicadata",
2220           "Should have returned identical value");
2221
2222     return SUCCESS;
2223 }
2224
2225 static enum test_result test_get_replica_non_resident(ENGINE_HANDLE *h,
2226                                                       ENGINE_HANDLE_V1 *h1) {
2227
2228     item *i = NULL;
2229     check(store(h, h1, NULL, OPERATION_SET, "key", "value", &i, 0, 0)
2230           == ENGINE_SUCCESS, "Store Failed");
2231     h1->release(h, NULL, i);
2232     wait_for_flusher_to_settle(h, h1);
2233     wait_for_stat_to_be(h, h1, "ep_total_persisted", 1);
2234
2235     evict_key(h, h1, "key", 0, "Ejected.");
2236     check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
2237           "Failed to set vbucket to replica");
2238
2239     get_replica(h, h1, "key", 0);
2240     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "Expected success");
2241
2242     return SUCCESS;
2243 }
2244
2245 static enum test_result test_get_replica_invalid_key(ENGINE_HANDLE *h,
2246                                                      ENGINE_HANDLE_V1 *h1) {
2247     protocol_binary_request_header *pkt;
2248     bool makeinvalidkey = true;
2249     pkt = prepare_get_replica(h, h1, vbucket_state_replica, makeinvalidkey);
2250     check(h1->unknown_command(h, NULL, pkt, add_response) ==
2251           ENGINE_SUCCESS, "Get Replica Failed");
2252     check(last_status == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET,
2253           "Expected PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET response.");
2254     return SUCCESS;
2255 }
2256
2257 static enum test_result test_vb_del_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2258     const void *cookie = testHarness.create_cookie();
2259     testHarness.set_ewouldblock_handling(cookie, false);
2260     check(set_vbucket_state(h, h1, 1, vbucket_state_pending), "Failed to set vbucket state.");
2261     check(ENGINE_EWOULDBLOCK == del(h, h1, "key", 0, 1, cookie),
2262           "Expected woodblock.");
2263     testHarness.destroy_cookie(cookie);
2264     return SUCCESS;
2265 }
2266
2267 static enum test_result test_vb_del_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2268     check(set_vbucket_state(h, h1, 1, vbucket_state_replica), "Failed to set vbucket state.");
2269     int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
2270     check(ENGINE_NOT_MY_VBUCKET == del(h, h1, "key", 0, 1),
2271           "Expected not my vbucket.");
2272     wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
2273     return SUCCESS;
2274 }
2275
2276 static enum test_result test_touch(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2277     // key is a mandatory field!
2278     touch(h, h1, NULL, 0, (time(NULL) + 10));
2279     check(last_status == PROTOCOL_BINARY_RESPONSE_EINVAL, "Testing invalid arguments");
2280
2281     // extlen is a mandatory field!
2282     protocol_binary_request_header *request;
2283     request = createPacket(PROTOCOL_BINARY_CMD_TOUCH, 0, 0, NULL, 0, "akey", 4);
2284     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
2285           "Failed to call touch");
2286     check(last_status == PROTOCOL_BINARY_RESPONSE_EINVAL, "Testing invalid arguments");
2287     free(request);
2288
2289     // Try to touch an unknown item...
2290     touch(h, h1, "mykey", 0, (time(NULL) + 10));
2291     check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, "Testing unknown key");
2292
2293     // illegal vbucket
2294     touch(h, h1, "mykey", 5, (time(NULL) + 10));
2295     check(last_status == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, "Testing illegal vbucket");
2296
2297     // Store the item!
2298     item *itm = NULL;
2299     check(store(h, h1, NULL, OPERATION_SET, "mykey", "somevalue", &itm) == ENGINE_SUCCESS,
2300           "Failed set.");
2301     h1->release(h, NULL, itm);
2302
2303     check_key_value(h, h1, "mykey", "somevalue", strlen("somevalue"));
2304
2305     touch(h, h1, "mykey", 0, (time(NULL) + 10));
2306     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "touch mykey");
2307
2308     // time-travel 9 secs..
2309     testHarness.time_travel(9);
2310
2311     // The item should still exist
2312     check_key_value(h, h1, "mykey", "somevalue", 9);
2313
2314     // time-travel 2 secs..
2315     testHarness.time_travel(2);
2316
2317     // The item should have expired now...
2318     check(h1->get(h, NULL, &itm, "mykey", 5, 0) == ENGINE_KEY_ENOENT, "Item should be gone");
2319     return SUCCESS;
2320 }
2321
2322 static enum test_result test_touch_mb7342(ENGINE_HANDLE *h,
2323                                           ENGINE_HANDLE_V1 *h1) {
2324     const char *key = "MB-7342";
2325     // Store the item!
2326     item *itm = NULL;
2327     check(store(h, h1, NULL, OPERATION_SET, key, "v", &itm) == ENGINE_SUCCESS,
2328           "Failed set.");
2329     h1->release(h, NULL, itm);
2330
2331     touch(h, h1, key, 0, 0);
2332     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "touch key");
2333
2334     check_key_value(h, h1, key, "v", 1);
2335
2336     // Travel a loong time to see if the object is still there (the default
2337     // store sets an exp time of 3600
2338     testHarness.time_travel(3700);
2339
2340     check_key_value(h, h1, key, "v", 1);
2341
2342     return SUCCESS;
2343 }
2344
2345 static enum test_result test_touch_mb10277(ENGINE_HANDLE *h,
2346                                             ENGINE_HANDLE_V1 *h1) {
2347     const char *key = "MB-10277";
2348     // Store the item!
2349     item *itm = NULL;
2350     check(store(h, h1, NULL, OPERATION_SET, key, "v", &itm) == ENGINE_SUCCESS,
2351           "Failed set.");
2352     h1->release(h, NULL, itm);
2353     wait_for_flusher_to_settle(h, h1);
2354     evict_key(h, h1, key, 0, "Ejected.");
2355
2356     touch(h, h1, key, 0, 3600); // A new expiration time remains in the same.
2357     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "touch key");
2358
2359     return SUCCESS;
2360 }
2361
2362 static enum test_result test_gat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2363     // key is a mandatory field!
2364     gat(h, h1, NULL, 0, 10);
2365     check(last_status == PROTOCOL_BINARY_RESPONSE_EINVAL, "Testing invalid arguments");
2366
2367     // extlen is a mandatory field!
2368     protocol_binary_request_header *request;
2369     request = createPacket(PROTOCOL_BINARY_CMD_GAT, 0, 0, NULL, 0, "akey", 4);
2370     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
2371           "Failed to call gat");
2372     check(last_status == PROTOCOL_BINARY_RESPONSE_EINVAL, "Testing invalid arguments");
2373     free(request);
2374
2375     // Try to gat an unknown item...
2376     gat(h, h1, "mykey", 0, 10);
2377     check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, "Testing unknown key");
2378
2379     // illegal vbucket
2380     gat(h, h1, "mykey", 5, 10);
2381     check(last_status == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, "Testing illegal vbucket");
2382
2383     // Store the item!
2384     item *itm = NULL;
2385     check(store(h, h1, NULL, OPERATION_SET, "mykey", "{\"some\":\"value\"}",
2386                 &itm, 0, 0, 3600, PROTOCOL_BINARY_DATATYPE_JSON) == ENGINE_SUCCESS,
2387           "Failed set.");
2388     h1->release(h, NULL, itm);
2389
2390     check_key_value(h, h1, "mykey", "{\"some\":\"value\"}",
2391             strlen("{\"some\":\"value\"}"));
2392
2393     gat(h, h1, "mykey", 0, 10);
2394     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "gat mykey");
2395     check(last_datatype == PROTOCOL_BINARY_DATATYPE_JSON, "Expected datatype to be JSON");
2396     check(last_body.compare(0, sizeof("{\"some\":\"value\"}"),
2397                             "{\"some\":\"value\"}") == 0,
2398           "Invalid data returned");
2399
2400     // time-travel 9 secs..
2401     testHarness.time_travel(9);
2402
2403     // The item should still exist
2404     check_key_value(h, h1, "mykey", "{\"some\":\"value\"}",
2405                     strlen("{\"some\":\"value\"}"));
2406
2407     // time-travel 2 secs..
2408     testHarness.time_travel(2);
2409
2410     // The item should have expired now...
2411     check(h1->get(h, NULL, &itm, "mykey", 5, 0) == ENGINE_KEY_ENOENT, "Item should be gone");
2412     return SUCCESS;
2413 }
2414
2415 static enum test_result test_gatq(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2416     // key is a mandatory field!
2417     gat(h, h1, NULL, 0, 10, true);
2418     check(last_status == PROTOCOL_BINARY_RESPONSE_EINVAL, "Testing invalid arguments");
2419
2420     // extlen is a mandatory field!
2421     protocol_binary_request_header *request;
2422     request = createPacket(PROTOCOL_BINARY_CMD_GATQ, 0, 0, NULL, 0, "akey", 4);
2423     check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
2424           "Failed to call gatq");
2425     check(last_status == PROTOCOL_BINARY_RESPONSE_EINVAL, "Testing invalid arguments");
2426     free(request);
2427
2428     // Try to gatq an unknown item...
2429     last_status = static_cast<protocol_binary_response_status>(0xffff);
2430     gat(h, h1, "mykey", 0, 10, true);
2431
2432     // We should not have sent any response!
2433     check(last_status == (protocol_binary_response_status)0xffff, "Testing unknown key");
2434
2435     // illegal vbucket
2436     gat(h, h1, "mykey", 5, 10, true);
2437     check(last_status == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET,
2438           "Testing illegal vbucket");
2439
2440     // Store the item!
2441     item *itm = NULL;
2442     check(store(h, h1, NULL, OPERATION_SET, "mykey", "{\"some\":\"value\"}",
2443                 &itm, 0, 0, 3600, PROTOCOL_BINARY_DATATYPE_JSON) == ENGINE_SUCCESS,
2444           "Failed set.");
2445     h1->release(h, NULL, itm);
2446
2447     check_key_value(h, h1, "mykey", "{\"some\":\"value\"}",
2448                     strlen("{\"some\":\"value\"}"));
2449
2450     gat(h, h1, "mykey", 0, 10, true);
2451     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "gat mykey");
2452     check(last_datatype == PROTOCOL_BINARY_DATATYPE_JSON, "Expected datatype to be JSON");
2453     check(last_body.compare(0, sizeof("{\"some\":\"value\"}"),
2454                             "{\"some\":\"value\"}") == 0,
2455           "Invalid data returned");
2456
2457     // time-travel 9 secs..
2458     testHarness.time_travel(9);
2459
2460     // The item should still exist
2461     check_key_value(h, h1, "mykey", "{\"some\":\"value\"}",
2462                     strlen("{\"some\":\"value\"}"));
2463
2464     // time-travel 2 secs..
2465     testHarness.time_travel(2);
2466
2467     // The item should have expired now...
2468     check(h1->get(h, NULL, &itm, "mykey", 5, 0) == ENGINE_KEY_ENOENT, "Item should be gone");
2469     return SUCCESS;
2470 }
2471
2472 static enum test_result test_mb5215(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2473     item *itm = NULL;
2474     check(store(h, h1, NULL, OPERATION_SET, "coolkey", "cooler", &itm)
2475           == ENGINE_SUCCESS, "Failed set.");
2476     h1->release(h, NULL, itm);
2477
2478     check_key_value(h, h1, "coolkey", "cooler", strlen("cooler"));
2479
2480     // set new exptime to 111
2481     int expTime = time(NULL) + 111;
2482
2483     touch(h, h1, "coolkey", 0, expTime);
2484     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "touch coolkey");
2485
2486     //reload engine
2487     testHarness.reload_engine(&h, &h1,
2488                               testHarness.engine_path,
2489                               testHarness.get_current_testcase()->cfg,
2490                               true, false);
2491     wait_for_warmup_complete(h, h1);
2492
2493     //verify persisted expiration time
2494     const char *statkey = "key coolkey 0";
2495     int newExpTime;
2496     check(h1->get(h, NULL, &itm, "coolkey", 7, 0) == ENGINE_SUCCESS,
2497           "Missing key");
2498     h1->release(h, NULL, itm);
2499     newExpTime = get_int_stat(h, h1, "key_exptime", statkey);
2500     check(newExpTime == expTime, "Failed to persist new exptime");
2501
2502     // evict key, touch expiration time, and verify
2503     evict_key(h, h1, "coolkey", 0, "Ejected.");
2504
2505     expTime = time(NULL) + 222;
2506     touch(h, h1, "coolkey", 0, expTime);
2507     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "touch coolkey");
2508
2509     testHarness.reload_engine(&h, &h1,
2510                               testHarness.engine_path,
2511                               testHarness.get_current_testcase()->cfg,
2512                               true, false);
2513     wait_for_warmup_complete(h, h1);
2514
2515     check(h1->get(h, NULL, &itm, "coolkey", 7, 0) == ENGINE_SUCCESS,
2516           "Missing key");
2517     h1->release(h, NULL, itm);
2518     newExpTime = get_int_stat(h, h1, "key_exptime", statkey);
2519     check(newExpTime == expTime, "Failed to persist new exptime");
2520
2521     return SUCCESS;
2522 }
2523
2524 static enum test_result test_alloc_limit(ENGINE_HANDLE *h,
2525                                          ENGINE_HANDLE_V1 *h1) {
2526     item *it = NULL;
2527     ENGINE_ERROR_CODE rv;
2528
2529     rv = h1->allocate(h, NULL, &it, "key", 3, 20 * 1024 * 1024, 0, 0,
2530                       PROTOCOL_BINARY_RAW_BYTES);
2531     check(rv == ENGINE_SUCCESS, "Allocated 20MB item");
2532     h1->release(h, NULL, it);
2533
2534     rv = h1->allocate(h, NULL, &it, "key", 3, (20 * 1024 * 1024) + 1, 0, 0,
2535                       PROTOCOL_BINARY_RAW_BYTES);
2536     check(rv == ENGINE_E2BIG, "Object too big");
2537
2538     return SUCCESS;
2539 }
2540
2541 static enum test_result test_whitespace_db(ENGINE_HANDLE *h,
2542                                            ENGINE_HANDLE_V1 *h1) {
2543     vals.clear();
2544     check(h1->get_stats(h, NULL, NULL, 0, add_stats) == ENGINE_SUCCESS,
2545           "Failed to get stats.");
2546     if (vals["ep_dbname"] != std::string(WHITESPACE_DB)) {
2547         std::cerr << "Expected dbname = ``" << WHITESPACE_DB << "''"
2548                   << ", got ``" << vals["ep_dbname"] << "''" << std::endl;
2549         return FAIL;
2550     }
2551
2552     check(access(WHITESPACE_DB, F_OK) != -1, "I expected the whitespace db to exist");
2553     return SUCCESS;
2554 }
2555
2556 static enum test_result test_memory_limit(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2557     set_param(h, h1, protocol_binary_engine_param_flush, "mutation_mem_threshold", "95");
2558     int used = get_int_stat(h, h1, "mem_used");
2559     double mem_threshold =
2560         static_cast<double>(get_int_stat(h, h1, "ep_mutation_mem_threshold")) / 100;
2561     int max = static_cast<int>(get_int_stat(h, h1, "ep_max_size") * mem_threshold);
2562     check(get_int_stat(h, h1, "ep_oom_errors") == 0 &&
2563           get_int_stat(h, h1, "ep_tmp_oom_errors") == 0, "Expected no OOM errors.");
2564     cb_assert(used < max);
2565
2566     char data[8192];
2567     memset(data, 'x', sizeof(data));
2568     size_t vlen = max - used - 192;
2569     data[vlen] = 0x00;
2570
2571     item *i = NULL;
2572     // So if we add an item,
2573     check(store(h, h1, NULL, OPERATION_SET, "key", data, &i) == ENGINE_SUCCESS,
2574           "store failure");
2575     check_key_value(h, h1, "key", data, vlen);
2576     h1->release(h, NULL, i);
2577
2578     // There should be no room for another.
2579     ENGINE_ERROR_CODE second = store(h, h1, NULL, OPERATION_SET, "key2", data, &i);
2580     check(second == ENGINE_ENOMEM || second == ENGINE_TMPFAIL,
2581           "should have failed second set");
2582     h1->release(h, NULL, i);
2583     check(get_int_stat(h, h1, "ep_oom_errors") == 1 ||
2584           get_int_stat(h, h1, "ep_tmp_oom_errors") == 1, "Expected an OOM error.");
2585
2586     ENGINE_ERROR_CODE overwrite = store(h, h1, NULL, OPERATION_SET, "key", data, &i);
2587     check(overwrite == ENGINE_ENOMEM || overwrite == ENGINE_TMPFAIL,
2588           "should have failed second override");
2589     h1->release(h, NULL, i);
2590     check(get_int_stat(h, h1, "ep_oom_errors") == 2 ||
2591           get_int_stat(h, h1, "ep_tmp_oom_errors") == 2, "Expected another OOM error.");
2592     check_key_value(h, h1, "key", data, vlen);
2593     check(ENGINE_SUCCESS != verify_key(h, h1, "key2"), "Expected a failure in GET");
2594     int itemsRemoved = get_int_stat(h, h1, "ep_items_rm_from_checkpoints");
2595     // Until we remove that item
2596     check(del(h, h1, "key", 0, 0) == ENGINE_SUCCESS, "Failed remove with value.");
2597     check(ENGINE_KEY_ENOENT == verify_key(h, h1, "key"), "Expected missing key");
2598     testHarness.time_travel(65);
2599     wait_for_stat_change(h, h1, "ep_items_rm_from_checkpoints", itemsRemoved);
2600
2601     check(store(h, h1, NULL, OPERATION_SET, "key2", "somevalue2", &i) == ENGINE_SUCCESS,
2602           "should have succeded on the last set");
2603     check_key_value(h, h1, "key2", "somevalue2", 10);
2604     h1->release(h, NULL, i);
2605     return SUCCESS;
2606 }
2607
2608 static enum test_result test_vbucket_get_miss(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2609     return verify_vbucket_missing(h, h1, 1) ? SUCCESS : FAIL;
2610 }
2611
2612 static enum test_result test_vbucket_get(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2613     return verify_vbucket_state(h, h1, 0, vbucket_state_active) ? SUCCESS : FAIL;
2614 }
2615
2616 static enum test_result test_vbucket_create(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2617     if (!verify_vbucket_missing(h, h1, 1)) {
2618         fprintf(stderr, "vbucket wasn't missing.\n");
2619         return FAIL;
2620     }
2621
2622     if (!set_vbucket_state(h, h1, 1, vbucket_state_active)) {
2623         fprintf(stderr, "set state failed.\n");
2624         return FAIL;
2625     }
2626
2627     return verify_vbucket_state(h, h1, 1, vbucket_state_active) ? SUCCESS : FAIL;
2628 }
2629
2630 static enum test_result test_vbucket_compact(ENGINE_HANDLE *h,
2631                                              ENGINE_HANDLE_V1 *h1) {
2632     const char *key = "Carss";
2633     const char *value = "pollute";
2634     if (!verify_vbucket_missing(h, h1, 0)) {
2635         fprintf(stderr, "vbucket wasn't missing.\n");
2636         return FAIL;
2637     }
2638
2639     if (!set_vbucket_state(h, h1, 0, vbucket_state_active)) {
2640         fprintf(stderr, "set state failed.\n");
2641         return FAIL;
2642     }
2643
2644     check(verify_vbucket_state(h, h1, 0, vbucket_state_active),
2645             "VBucket state not active");
2646
2647     // Set two keys - one to be expired and other to remain...
2648     item *itm = NULL;
2649     check(store(h, h1, NULL, OPERATION_SET, key, value, &itm)
2650           == ENGINE_SUCCESS, "Failed set.");
2651     h1->release(h, NULL, itm);
2652
2653     check_key_value(h, h1, key, value, strlen(value));
2654
2655     // Set a non-expiring key...
2656     check(store(h, h1, NULL, OPERATION_SET, "trees", "cleanse", &itm)
2657           == ENGINE_SUCCESS, "Failed set.");
2658     h1->release(h, NULL, itm);
2659
2660     check_key_value(h, h1, "trees", "cleanse", strlen("cleanse"));
2661
2662     touch(h, h1, key, 0, 11);
2663     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS, "touch Carss");
2664
2665     testHarness.time_travel(12);
2666     wait_for_flusher_to_settle(h, h1);
2667
2668     // Store a dummy item since we do not purge the item with highest seqno
2669     check(ENGINE_SUCCESS ==
2670             store(h, h1, NULL, OPERATION_SET, "dummykey", "dummyvalue", &itm,
2671                 0, 0, 0), "Error setting.");
2672
2673     wait_for_flusher_to_settle(h, h1);
2674
2675     check(get_int_stat(h, h1, "vb_0:purge_seqno", "vbucket-seqno") == 0,
2676             "purge_seqno not found to be zero before compaction");
2677
2678     // Compaction on VBucket
2679     compact_db(h, h1, 0, 2, 3, 1);
2680
2681     check(get_int_stat(h, h1, "ep_pending_compactions") == 0,
2682     "ep_pending_compactions stat did not tick down after compaction command");
2683
2684     // the key tree and its value should be intact...
2685     check(verify_key(h, h1, "trees") == ENGINE_SUCCESS,
2686           "key trees should be found.");
2687     // the key Carrs should have disappeared...
2688     int val = verify_key(h, h1, "Carss");
2689     check(val == ENGINE_KEY_ENOENT, "Key Carss has not expired.");
2690
2691     check(get_int_stat(h, h1, "vb_0:purge_seqno", "vbucket-seqno") == 4,
2692         "purge_seqno didn't match expected value");
2693
2694     return SUCCESS;
2695 }
2696
2697 static enum test_result test_compaction_config(ENGINE_HANDLE *h,
2698                                                ENGINE_HANDLE_V1 *h1) {
2699
2700     check(get_int_stat(h, h1, "ep_compaction_write_queue_cap") == 10000,
2701             "Expected compaction queue cap to be 10000");
2702     set_param(h, h1, protocol_binary_engine_param_flush,
2703               "compaction_write_queue_cap", "100000");
2704     check(get_int_stat(h, h1, "ep_compaction_write_queue_cap") == 100000,
2705             "Expected compaction queue cap to be 100000");
2706     return SUCCESS;
2707 }
2708
2709 struct comp_thread_ctx {
2710     ENGINE_HANDLE *h;
2711     ENGINE_HANDLE_V1 *h1;
2712     uint16_t vbid;
2713 };
2714
2715 extern "C" {
2716     static void compaction_thread(void *arg) {
2717         struct comp_thread_ctx *ctx = static_cast<comp_thread_ctx *>(arg);
2718         compact_db(ctx->h, ctx->h1, ctx->vbid, 0, 0, 0);
2719     }
2720 }
2721
2722 static enum test_result test_multiple_vb_compactions(ENGINE_HANDLE *h,
2723                                                      ENGINE_HANDLE_V1 *h1) {
2724     for (uint16_t i = 0; i < 4; ++i) {
2725         if (!set_vbucket_state(h, h1, i, vbucket_state_active)) {
2726             fprintf(stderr, "set state failed for vbucket %d.\n", i);
2727             return FAIL;
2728         }
2729         check(verify_vbucket_state(h, h1, i, vbucket_state_active),
2730               "VBucket state not active");
2731     }
2732
2733     std::vector<std::string> keys;
2734     for (int j = 0; j < 20000; ++j) {
2735         std::stringstream ss;
2736         ss << "key" << j;
2737         std::string key(ss.str());
2738         keys.push_back(key);
2739     }
2740
2741     int count = 0;
2742     std::vector<std::string>::iterator it;
2743     for (it = keys.begin(); it != keys.end(); ++it) {
2744         uint16_t vbid = count % 4;
2745         item *i;
2746         check(store(h, h1, NULL, OPERATION_SET, it->c_str(), it->c_str(), &i, 0, vbid)
2747               == ENGINE_SUCCESS, "Failed to store a value");
2748         h1->release(h, NULL, i);
2749         ++count;
2750     }
2751
2752     // Compact multiple vbuckets.
2753     const int n_threads = 4;
2754     cb_thread_t threads[n_threads];
2755     struct comp_thread_ctx ctx[n_threads];
2756
2757     for (int i = 0; i < n_threads; i++) {
2758         ctx[i].h = h;
2759         ctx[i].h1 = h1;
2760         ctx[i].vbid = static_cast<uint16_t>(i);
2761         int r = cb_create_thread(&threads[i], compaction_thread, &ctx[i], 0);
2762         cb_assert(r == 0);
2763     }
2764
2765     for (int i = 0; i < n_threads; i++) {
2766         int r = cb_join_thread(threads[i]);
2767         cb_assert(r == 0);
2768     }
2769
2770     check(get_int_stat(h, h1, "ep_pending_compactions") == 0,
2771     "ep_pending_compactions stat did not tick down after compaction command");
2772
2773     return SUCCESS;
2774 }
2775
2776 static enum test_result
2777 test_multi_vb_compactions_with_workload(ENGINE_HANDLE *h,
2778                                         ENGINE_HANDLE_V1 *h1) {
2779     for (uint16_t i = 0; i < 4; ++i) {
2780         if (!set_vbucket_state(h, h1, i, vbucket_state_active)) {
2781             fprintf(stderr, "set state failed for vbucket %d.\n", i);
2782             return FAIL;
2783         }
2784         check(verify_vbucket_state(h, h1, i, vbucket_state_active),
2785               "VBucket state not active");
2786     }
2787
2788     std::vector<std::string> keys;
2789     for (int j = 0; j < 10000; ++j) {
2790         std::stringstream ss;
2791         ss << "key" << j;
2792         std::string key(ss.str());
2793         keys.push_back(key);
2794     }
2795
2796     int count = 0;
2797     std::vector<std::string>::iterator it;
2798     for (it = keys.begin(); it != keys.end(); ++it) {
2799         uint16_t vbid = count % 4;
2800         item *i;
2801         check(store(h, h1, NULL, OPERATION_SET, it->c_str(), it->c_str(), &i, 0, vbid)
2802               == ENGINE_SUCCESS, "Failed to store a value");
2803         h1->release(h, NULL, i);
2804         ++count;
2805     }
2806     wait_for_flusher_to_settle(h, h1);
2807
2808     for (int i = 0; i < 2; ++i) {
2809         count = 0;
2810         for (it = keys.begin(); it != keys.end(); ++it) {
2811             uint16_t vbid = count % 4;
2812             item *i = NULL;
2813             check(h1->get(h, NULL, &i, it->c_str(), strlen(it->c_str()), vbid) ==
2814                   ENGINE_SUCCESS, "Unable to get stored item");
2815             h1->release(h, NULL, i);
2816             ++count;
2817         }
2818     }
2819     wait_for_str_stat_to_be(h, h1, "ep_workload_pattern", "read_heavy", NULL);
2820
2821     // Compact multiple vbuckets.
2822     const int n_threads = 4;
2823     cb_thread_t threads[n_threads];
2824     struct comp_thread_ctx ctx[n_threads];
2825
2826     for (int i = 0; i < n_threads; i++) {
2827         ctx[i].h = h;
2828         ctx[i].h1 = h1;
2829         ctx[i].vbid = static_cast<uint16_t>(i);
2830         int r = cb_create_thread(&threads[i], compaction_thread, &ctx[i], 0);
2831         cb_assert(r == 0);
2832     }
2833
2834     for (int i = 0; i < n_threads; i++) {
2835         int r = cb_join_thread(threads[i]);
2836         cb_assert(r == 0);
2837     }
2838
2839     check(get_int_stat(h, h1, "ep_pending_compactions") == 0,
2840     "ep_pending_compactions stat did not tick down after compaction command");
2841
2842     return SUCCESS;
2843 }
2844
2845 static enum test_result vbucket_destroy(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
2846                                              const char* value = NULL) {
2847     check(set_vbucket_state(h, h1, 1, vbucket_state_active), "Failed to set vbucket state.");
2848
2849     vbucketDelete(h, h1, 2, value);
2850     check(last_status == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET,
2851           "Expected failure deleting non-existent bucket.");
2852
2853     check(set_vbucket_state(h, h1, 1, vbucket_state_dead), "Failed set set vbucket 1 state.");
2854
2855     vbucketDelete(h, h1, 1, value);
2856     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
2857           "Expected failure deleting non-existent bucket.");
2858
2859     check(verify_vbucket_missing(h, h1, 1),
2860           "vbucket 0 was not missing after deleting it.");
2861
2862     return SUCCESS;
2863 }
2864
2865 static enum test_result test_vbucket_destroy_stats(ENGINE_HANDLE *h,
2866                                                    ENGINE_HANDLE_V1 *h1) {
2867
2868     int mem_used = get_int_stat(h, h1, "mem_used");
2869     int cacheSize = get_int_stat(h, h1, "ep_total_cache_size");
2870     int overhead = get_int_stat(h, h1, "ep_overhead");
2871     int nonResident = get_int_stat(h, h1, "ep_num_non_resident");
2872
2873     check(set_vbucket_state(h, h1, 1, vbucket_state_active), "Failed to set vbucket state.");
2874
2875     std::vector<std::string> keys;
2876     for (int j = 0; j < 2000; ++j) {
2877         std::stringstream ss;
2878         ss << "key" << j;
2879         std::string key(ss.str());
2880         keys.push_back(key);
2881     }
2882
2883     int itemsRemoved = get_int_stat(h, h1, "ep_items_rm_from_checkpoints");
2884     std::vector<std::string>::iterator it;
2885     for (it = keys.begin(); it != keys.end(); ++it) {
2886         item *i;
2887         check(store(h, h1, NULL, OPERATION_SET, it->c_str(), it->c_str(), &i, 0, 1)
2888               == ENGINE_SUCCESS, "Failed to store a value");
2889         h1->release(h, NULL, i);
2890     }
2891     wait_for_flusher_to_settle(h, h1);
2892     testHarness.time_travel(65);
2893     wait_for_stat_change(h, h1, "ep_items_rm_from_checkpoints", itemsRemoved);
2894
2895     check(set_vbucket_state(h, h1, 1, vbucket_state_dead), "Failed set set vbucket 1 state.");
2896
2897     int vbucketDel = get_int_stat(h, h1, "ep_vbucket_del");
2898     vbucketDelete(h, h1, 1);
2899     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
2900           "Expected failure deleting non-existent bucket.");
2901
2902     check(verify_vbucket_missing(h, h1, 1),
2903           "vbucket 1 was not missing after deleting it.");
2904
2905     wait_for_stat_change(h, h1, "ep_vbucket_del", vbucketDel);
2906
2907     wait_for_stat_to_be(h, h1, "mem_used", mem_used);
2908     wait_for_stat_to_be(h, h1, "ep_total_cache_size", cacheSize);
2909     wait_for_stat_to_be(h, h1, "ep_overhead", overhead);
2910     wait_for_stat_to_be(h, h1, "ep_num_non_resident", nonResident);
2911
2912     return SUCCESS;
2913 }
2914
2915 static enum test_result vbucket_destroy_restart(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
2916                                                 const char* value = NULL) {
2917     check(set_vbucket_state(h, h1, 1, vbucket_state_active), "Failed to set vbucket state.");
2918
2919     // Store a value so the restart will try to resurrect it.
2920     item *i = NULL;
2921     check(store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i, 0, 1)
2922           == ENGINE_SUCCESS, "Failed to set a value");
2923     check_key_value(h, h1, "key", "somevalue", 9, 1);
2924     h1->release(h, NULL, i);
2925
2926     // Reload to get a flush forced.
2927     testHarness.reload_engine(&h, &h1,
2928                               testHarness.engine_path,
2929                               testHarness.get_current_testcase()->cfg,
2930                               true, false);
2931     wait_for_warmup_complete(h, h1);
2932
2933     check(verify_vbucket_state(h, h1, 1, vbucket_state_active),
2934           "Bucket state was what it was initially, after restart.");
2935     check(set_vbucket_state(h, h1, 1, vbucket_state_active), "Failed to set vbucket state.");
2936     check_key_value(h, h1, "key", "somevalue", 9, 1);
2937
2938     check(set_vbucket_state(h, h1, 1, vbucket_state_dead), "Failed set set vbucket 1 state.");
2939
2940     vbucketDelete(h, h1, 1, value);
2941     check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
2942           "Expected failure deleting non-existent bucket.");
2943
2944     check(verify_vbucket_missing(h, h1, 1),
2945           "vbucket 1 was not missing after deleting it.");
2946
2947     testHarness.reload_engine(&h, &h1,
2948                               testHarness.engine_path,
2949                               testHarness.get_current_testcase()->cfg,
2950                               true, false);
2951     wait_for_warmup_complete(h, h1);
2952
2953     if (verify_vbucket_state(h, h1, 1, vbucket_state_pending, true)) {
2954         std::cerr << "Bucket came up in pending state after delete." << std::endl;
2955         abort();
2956     }
2957
2958     check(verify_vbucket_missing(h, h1, 1),
2959           "vbucket 1 was not missing after restart.");
2960
2961     return SUCCESS;
2962 }
2963
2964 static enum test_result test_async_vbucket_destroy(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2965     return vbucket_destroy(h, h1);
2966 }
2967
2968 static enum test_result test_sync_vbucket_destroy(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2969     return vbucket_destroy(h, h1, "async=0");
2970 }
2971
2972 static enum test_result test_async_vbucket_destroy_restart(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2973     return vbucket_destroy_restart(h, h1);
2974 }
2975
2976 static enum test_result test_sync_vbucket_destroy_restart(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2977     return vbucket_destroy_restart(h, h1, "async=0");
2978 }
2979
2980 static enum test_result test_vb_set_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2981     return test_pending_vb_mutation(h, h1, OPERATION_SET);
2982 }
2983
2984 static enum test_result test_vb_add_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2985     return test_pending_vb_mutation(h, h1, OPERATION_ADD);
2986 }
2987
2988 static enum test_result test_vb_cas_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2989     return test_pending_vb_mutation(h, h1, OPERATION_CAS);
2990 }
2991
2992 static enum test_result test_vb_append_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2993     return test_pending_vb_mutation(h, h1, OPERATION_APPEND);
2994 }
2995
2996 static enum test_result test_vb_prepend_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
2997     return test_pending_vb_mutation(h, h1, OPERATION_PREPEND);
2998 }
2999
3000 static enum test_result test_vb_set_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
3001     return test_replica_vb_mutation(h, h1, OPERATION_SET);
3002 }
3003
3004 static enum test_result test_vb_replace_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
3005     return test_replica_vb_mutation(h, h1, OPERATION_REPLACE);
3006 }
3007
3008 static enum test_result test_vb_replace_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
3009     return test_pending_vb_mutation(h, h1, OPERATION_REPLACE);
3010 }
3011
3012 static enum test_result test_vb_add_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
3013     return test_replica_vb_mutation(h, h1, OPERATION_ADD);
3014 }
3015
3016 static enum test_result test_vb_cas_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
3017     return test_replica_vb_mutation(h, h1, OPERATION_CAS);
3018 }
3019
3020 static enum test_result test_vb_append_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
3021     return test_replica_vb_mutation(h, h1, OPERATION_APPEND);
3022 }
3023
3024 static enum test_result test_vb_prepend_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
3025     return test_replica_vb_mutation(h, h1, OPERATION_PREPEND);
3026 }
3027
3028 static enum test_result test_stats_seqno(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
3029     check(set_vbucket_state(h, h1, 1, vbucket_state_active),
3030           "Failed to set vbucket state.");
3031
3032     int num_keys = 100;
3033     for (int ii = 0; ii < num_keys; ++ii) {
3034         std::stringstream ss;
3035         ss << "key" << ii;
3036         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(),
3037                     "value", NULL, 0, 0) == ENGINE_SUCCESS,
3038               "Failed to store an item.");
3039     }
3040
3041     check(get_int_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno") == 100,
3042           "Invalid seqno");
3043     check(get_int_stat(h, h1, "vb_1:high_seqno", "vbucket-seqno") == 0,
3044           "Invalid seqno");
3045     check(get_int_stat(h, h1, "vb_1:high_seqno", "vbucket-seqno 1") == 0,
3046           "Invalid seqno");
3047
3048     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_1:0:id", "failovers");
3049     check(get_ull_stat(h, h1, "vb_1:uuid", "vbucket-seqno 1") == vb_uuid,
3050           "Invalid uuid");
3051     check(vals.size() == 4, "Expected four stats");
3052
3053     // Check invalid vbucket
3054     check(h1->get_stats(h, NULL, "vbucket-seqno 2", 15, add_stats)
3055           == ENGINE_NOT_MY_VBUCKET, "Expected not my vbucket");
3056
3057     // Check bad vbucket parameter (not numeric)
3058     check(h1->get_stats(h, NULL, "vbucket-seqno tt2", 17, add_stats)
3059           == ENGINE_EINVAL, "Expected invalid");
3060
3061     // Check extra spaces at the end
3062     check(h1->get_stats(h, NULL, "vbucket-seqno    ", 17, add_stats)
3063           == ENGINE_EINVAL, "Expected invalid");
3064
3065     return SUCCESS;
3066 }
3067
3068 static enum test_result test_stats_diskinfo(ENGINE_HANDLE *h,
3069                                             ENGINE_HANDLE_V1 *h1) {
3070     check(set_vbucket_state(h, h1, 1, vbucket_state_active),
3071           "Failed to set vbucket state.");
3072
3073     int num_keys = 100;
3074     for (int ii = 0; ii < num_keys; ++ii) {
3075         std::stringstream ss;
3076         ss << "key" << ii;
3077         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(),
3078                     "value", NULL, 0, 1) == ENGINE_SUCCESS,
3079               "Failed to store an item.");
3080     }
3081     wait_for_flusher_to_settle(h, h1);
3082
3083     size_t file_size = get_int_stat(h, h1, "ep_db_file_size", "diskinfo");
3084     size_t data_size = get_int_stat(h, h1, "ep_db_data_size", "diskinfo");
3085     check(file_size > 0, "DB file size should be greater than 0");
3086     check(data_size > 0, "DB data size should be greater than 0");
3087     check(file_size >= data_size, "DB file size should be >= DB data size");
3088     check(get_int_stat(h, h1, "vb_1:data_size", "diskinfo detail") > 0,
3089           "VB 1 data size should be greater than 0");
3090
3091     check(h1->get_stats(h, NULL, "diskinfo ", 9, add_stats)
3092           == ENGINE_EINVAL, "Expected invalid");
3093
3094     check(h1->get_stats(h, NULL, "diskinfo detai", 14, add_stats)
3095           == ENGINE_EINVAL, "Expected invalid");
3096
3097     check(h1->get_stats(h, NULL, "diskinfo detaillll", 18, add_stats)
3098           == ENGINE_EINVAL, "Expected invalid");
3099
3100     return SUCCESS;
3101 }
3102
3103 static void notifier_request(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
3104                              const void* cookie, uint32_t opaque,
3105                              uint16_t vbucket, uint64_t start,
3106                              bool shouldSucceed) {
3107
3108     uint32_t flags = 0;
3109     uint64_t rollback = 0;
3110     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
3111     uint64_t snap_start_seqno = get_ull_stat(h, h1, "vb_0:0:seq", "failovers");
3112     uint64_t snap_end_seqno = snap_start_seqno;
3113     ENGINE_ERROR_CODE err = h1->dcp.stream_req(h, cookie, flags, opaque,
3114                                                vbucket, start, 0, vb_uuid,
3115                                                snap_start_seqno, snap_end_seqno,
3116                                                &rollback,
3117                                                mock_dcp_add_failover_log);
3118     check(err == ENGINE_SUCCESS, "Failed to initiate stream request");
3119
3120     std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
3121     check(type.compare("notifier") == 0, "Consumer not found");
3122
3123     check((uint32_t)get_int_stat(h, h1, "eq_dcpq:unittest:stream_0_flags", "dcp")
3124           == flags, "Flags didn't match");
3125     check((uint32_t)get_int_stat(h, h1, "eq_dcpq:unittest:stream_0_opaque", "dcp")
3126           == opaque, "Opaque didn't match");
3127     check((uint64_t)get_ull_stat(h, h1, "eq_dcpq:unittest:stream_0_start_seqno", "dcp")
3128           == start, "Start Seqno Didn't match");
3129     check((uint64_t)get_ull_stat(h, h1, "eq_dcpq:unittest:stream_0_end_seqno", "dcp")
3130           == 0, "End Seqno didn't match");
3131     check((uint64_t)get_ull_stat(h, h1, "eq_dcpq:unittest:stream_0_vb_uuid", "dcp")
3132           == vb_uuid, "VBucket UUID didn't match");
3133     check((uint64_t)get_ull_stat(h, h1, "eq_dcpq:unittest:stream_0_snap_start_seqno", "dcp")
3134           == snap_start_seqno, "snap start seqno didn't match");
3135 }
3136
3137 static enum test_result test_dcp_vbtakeover_no_stream(ENGINE_HANDLE *h,
3138                                                       ENGINE_HANDLE_V1 *h1) {
3139
3140     int num_items = 10;
3141     for (int j = 0; j < num_items; ++j) {
3142         item *i = NULL;
3143         std::stringstream ss;
3144         ss << "key" << j;
3145         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
3146               == ENGINE_SUCCESS, "Failed to store a value");
3147         h1->release(h, NULL, i);
3148     }
3149
3150     int est = get_int_stat(h, h1, "estimate", "dcp-vbtakeover 0");
3151     check(est == 10, "Invalid estimate for non-existent stream");
3152
3153     check(h1->get_stats(h, NULL, "dcp-vbtakeover 1", strlen("dcp-vbtakeover 1"),
3154                         add_stats) == ENGINE_NOT_MY_VBUCKET,
3155                         "Expected not my vbucket");
3156
3157     return SUCCESS;
3158 }
3159
3160 static enum test_result test_dcp_notifier(ENGINE_HANDLE *h,
3161                                           ENGINE_HANDLE_V1 *h1) {
3162
3163     int num_items = 10;
3164     for (int j = 0; j < num_items; ++j) {
3165         item *i = NULL;
3166         std::stringstream ss;
3167         ss << "key" << j;
3168         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
3169               == ENGINE_SUCCESS, "Failed to store a value");
3170         h1->release(h, NULL, i);
3171     }
3172
3173     const void *cookie = testHarness.create_cookie();
3174     uint32_t opaque = 0;
3175     uint32_t flags = DCP_OPEN_NOTIFIER;
3176     const char *name = "unittest";
3177     uint16_t nname = strlen(name);
3178
3179     check(h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname)
3180           == ENGINE_SUCCESS,
3181           "Failed dcp notifier open connection.");
3182
3183     // Get notification for an old item
3184     notifier_request(h, h1, cookie, ++opaque, 0, 0, true);
3185     dcp_step(h, h1, cookie);
3186     check(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_STREAM_END,
3187           "Expected stream end");
3188
3189     // Get notification when we're slightly behind
3190     notifier_request(h, h1, cookie, ++opaque, 0, 9, true);
3191     dcp_step(h, h1, cookie);
3192     check(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_STREAM_END,
3193           "Expected stream end");
3194
3195     // Wait for notification of a future item
3196     notifier_request(h, h1, cookie, ++opaque, 0, 20, true);
3197     dcp_step(h, h1, cookie);
3198
3199     for (int j = 0; j < 5; ++j) {
3200         item *i = NULL;
3201         std::stringstream ss;
3202         ss << "key" << j;
3203         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
3204               == ENGINE_SUCCESS, "Failed to store a value");
3205         h1->release(h, NULL, i);
3206     }
3207
3208     // Shouldn't get a stream end yet
3209     dcp_step(h, h1, cookie);
3210     check(dcp_last_op != PROTOCOL_BINARY_CMD_DCP_STREAM_END,
3211           "Wasn't expecting a stream end");
3212
3213     for (int j = 0; j < 6; ++j) {
3214         item *i = NULL;
3215         std::stringstream ss;
3216         ss << "key" << j;
3217         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
3218               == ENGINE_SUCCESS, "Failed to store a value");
3219         h1->release(h, NULL, i);
3220     }
3221
3222     // Should get a stream end
3223     dcp_step(h, h1, cookie);
3224     check(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_STREAM_END,
3225           "Expected stream end");
3226
3227     testHarness.destroy_cookie(cookie);
3228
3229     return SUCCESS;
3230 }
3231
3232 static enum test_result test_dcp_consumer_open(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
3233     const void *cookie1 = testHarness.create_cookie();
3234     uint32_t opaque = 0;
3235     uint32_t seqno = 0;
3236     uint32_t flags = 0;
3237     const char *name = "unittest";
3238     uint16_t nname = strlen(name);
3239
3240     check(h1->dcp.open(h, cookie1, opaque, seqno, flags, (void*)name, nname)
3241           == ENGINE_SUCCESS,
3242           "Failed dcp consumer open connection.");
3243
3244     std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
3245     int created = get_int_stat(h, h1, "eq_dcpq:unittest:created", "dcp");
3246     check(type.compare("consumer") == 0, "Consumer not found");
3247     testHarness.destroy_cookie(cookie1);
3248
3249     testHarness.time_travel(600);
3250
3251     const void *cookie2 = testHarness.create_cookie();
3252     check(h1->dcp.open(h, cookie2, opaque, seqno, flags, (void*)name, nname)
3253           == ENGINE_SUCCESS,
3254           "Failed dcp consumer open connection.");
3255
3256     type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
3257     check(type.compare("consumer") == 0, "Consumer not found");
3258     check(get_int_stat(h, h1, "eq_dcpq:unittest:created", "dcp") > created,
3259           "New dcp stream is not newer");
3260     testHarness.destroy_cookie(cookie2);
3261
3262     return SUCCESS;
3263 }
3264
3265 static enum test_result test_dcp_producer_open(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
3266     const void *cookie1 = testHarness.create_cookie();
3267     uint32_t opaque = 0;
3268     uint32_t seqno = 0;
3269     uint32_t flags = DCP_OPEN_PRODUCER;
3270     const char *name  = "unittest";
3271     uint16_t nname = strlen(name);
3272
3273     check(h1->dcp.open(h, cookie1, opaque, seqno, flags, (void*)name, nname)
3274           == ENGINE_SUCCESS,
3275           "Failed dcp producer open connection.");
3276
3277     std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
3278     int created = get_int_stat(h, h1, "eq_dcpq:unittest:created", "dcp");
3279     check(type.compare("producer") == 0, "Producer not found");
3280     testHarness.destroy_cookie(cookie1);
3281
3282     testHarness.time_travel(600);
3283
3284     const void *cookie2 = testHarness.create_cookie();
3285     check(h1->dcp.open(h, cookie2, opaque, seqno, flags, (void*)name, nname)
3286           == ENGINE_SUCCESS,
3287           "Failed dcp producer open connection.");
3288
3289     type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
3290     check(type.compare("producer") == 0, "Producer not found");
3291     check(get_int_stat(h, h1, "eq_dcpq:unittest:created", "dcp") > created,
3292           "New dcp stream is not newer");
3293     testHarness.destroy_cookie(cookie2);
3294
3295     return SUCCESS;
3296 }
3297
3298 static enum test_result test_dcp_noop(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
3299
3300     const void *cookie = testHarness.create_cookie();
3301     const char *name = "unittest";
3302     uint32_t opaque = 1;
3303
3304     check(h1->dcp.open(h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER, (void*)name,
3305                        strlen(name)) == ENGINE_SUCCESS,
3306           "Failed dcp producer open connection.");
3307
3308     check(h1->dcp.control(h, cookie, ++opaque, "connection_buffer_size", 22,
3309                           "1024", 4) == ENGINE_SUCCESS,
3310           "Failed to establish connection buffer");
3311
3312     check(h1->dcp.control(h, cookie, ++opaque, "enable_noop", 11, "true", 4)
3313                 == ENGINE_SUCCESS,
3314           "Failed to enable no-ops");
3315
3316     testHarness.time_travel(201);
3317
3318     struct dcp_message_producers* producers = get_dcp_producers();
3319     bool done = false;
3320     while (!done) {
3321         ENGINE_ERROR_CODE err = h1->dcp.step(h, cookie, producers);
3322         if (err == ENGINE_DISCONNECT) {
3323             done = true;
3324         } else {
3325             if (dcp_last_op == PROTOCOL_BINARY_CMD_DCP_NOOP) {
3326                 done = true;
3327                 checkeq(1, get_int_stat(h, h1, "eq_dcpq:unittest:noop_wait", "dcp"),
3328                         "Didn't send noop");
3329                 sendDcpAck(h, h1, cookie, PROTOCOL_BINARY_CMD_DCP_NOOP,
3330                            PROTOCOL_BINARY_RESPONSE_SUCCESS, dcp_last_opaque);
3331                 checkeq(0, get_int_stat(h, h1, "eq_dcpq:unittest:noop_wait", "dcp"),
3332                         "Didn't send noop");
3333             } else if (dcp_last_op != 0) {
3334                 abort();
3335             }
3336
3337             dcp_last_op = 0;
3338         }
3339     }
3340
3341     free(producers);
3342     testHarness.destroy_cookie(cookie);
3343     return SUCCESS;
3344 }
3345
3346 static enum test_result test_dcp_noop_fail(ENGINE_HANDLE *h,
3347                                            ENGINE_HANDLE_V1 *h1) {
3348     const void *cookie = testHarness.create_cookie();
3349     const char *name = "unittest";
3350     uint32_t opaque = 1;
3351
3352     check(h1->dcp.open(h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER, (void*)name,
3353                        strlen(name)) == ENGINE_SUCCESS,
3354           "Failed dcp producer open connection.");
3355
3356     check(h1->dcp.control(h, cookie, ++opaque, "connection_buffer_size", 22,
3357                           "1024", 4) == ENGINE_SUCCESS,
3358           "Failed to establish connection buffer");
3359
3360     check(h1->dcp.control(h, cookie, ++opaque, "enable_noop", 11, "true", 4)
3361                 == ENGINE_SUCCESS,
3362           "Failed to enable no-ops");
3363
3364     testHarness.time_travel(201);
3365
3366     struct dcp_message_producers* producers = get_dcp_producers();
3367     bool done = false;
3368     bool disconnected = false;
3369     while (!done) {
3370         ENGINE_ERROR_CODE err = h1->dcp.step(h, cookie, producers);
3371         if (err == ENGINE_DISCONNECT) {
3372             done = true;
3373             disconnected = true;
3374         } else {
3375             if (dcp_last_op == PROTOCOL_BINARY_CMD_DCP_NOOP) {
3376                 checkeq(1, get_int_stat(h, h1, "eq_dcpq:unittest:noop_wait", "dcp"),
3377                         "Didn't send noop");
3378                 testHarness.time_travel(201);
3379             } else if (dcp_last_op != 0) {
3380                 abort();
3381             }
3382
3383             dcp_last_op = 0;
3384         }
3385     }
3386
3387     check(disconnected, "Connection should have been disconnected");
3388
3389     free(producers);
3390     testHarness.destroy_cookie(cookie);
3391     return SUCCESS;
3392 }
3393
3394 static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
3395                        const void *cookie, uint16_t vbucket, uint32_t flags,
3396                        uint64_t start, uint64_t end, uint64_t vb_uuid,
3397                        uint64_t snap_start_seqno, uint64_t snap_end_seqno,
3398                        int exp_mutations, int exp_deletions, int exp_markers,
3399                        int extra_takeover_ops, int exp_nru_value,
3400                        bool exp_disk_snapshot = false,
3401                        bool skipEstimateCheck = false,
3402                        uint64_t flow_control_buf_size = 1024,
3403                        bool disable_ack = false) {
3404     uint32_t opaque = 1;
3405     uint16_t nname = strlen(name);
3406
3407     check(h1->dcp.open(h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER, (void*)name,
3408                        nname) == ENGINE_SUCCESS,
3409           "Failed dcp producer open connection.");
3410
3411     std::stringstream flow_control_buf_sz;
3412     flow_control_buf_sz << flow_control_buf_size;
3413     checkeq(ENGINE_SUCCESS,
3414             h1->dcp.control(h, cookie, ++opaque, "connection_buffer_size", 22,
3415                             flow_control_buf_sz.str().c_str(),
3416                             flow_control_buf_sz.str().length()),
3417             "Failed to establish connection buffer");
3418     char stats_buffer[50] = {0};
3419     if (flow_control_buf_size) {
3420         snprintf(stats_buffer, sizeof(stats_buffer),
3421                  "eq_dcpq:%s:max_buffer_bytes", name);
3422         checkeq(static_cast<int>(flow_control_buf_size),
3423                 get_int_stat(h, h1, stats_buffer, "dcp"),
3424                 "Buffer Size did not get set correctly");
3425     } else {
3426         snprintf(stats_buffer, sizeof(stats_buffer),
3427                  "eq_dcpq:%s:flow_control", name);
3428         std::string status = get_str_stat(h, h1, stats_buffer, "dcp");
3429         checkeq(0, status.compare("disabled"), "Flow control enabled!");
3430     }
3431
3432     uint64_t rollback = 0;
3433     check(h1->dcp.stream_req(h, cookie, flags, opaque, vbucket, start, end,
3434                              vb_uuid, snap_start_seqno, snap_end_seqno, &rollback,
3435                              mock_dcp_add_failover_log)
3436                 == ENGINE_SUCCESS,
3437           "Failed to initiate stream request");
3438
3439     if (flags == DCP_ADD_STREAM_FLAG_TAKEOVER) {
3440         end  = -1;
3441     } else if (flags == DCP_ADD_STREAM_FLAG_LATEST ||
3442                flags == DCP_ADD_STREAM_FLAG_DISKONLY) {
3443         end = get_int_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
3444     }
3445
3446     char stats_flags[50];
3447     snprintf(stats_flags, sizeof(stats_flags),"eq_dcpq:%s:stream_0_flags", name);
3448     check((uint32_t)get_int_stat(h, h1, stats_flags, "dcp")
3449           == flags, "Flags didn't match");
3450
3451     char stats_opaque[50];
3452     snprintf(stats_opaque, sizeof(stats_opaque),"eq_dcpq:%s:stream_0_opaque", name);
3453     check((uint32_t)get_int_stat(h, h1, stats_opaque, "dcp")
3454           == opaque, "Opaque didn't match");
3455
3456     char stats_start_seqno[50];
3457     snprintf(stats_start_seqno, sizeof(stats_start_seqno),"eq_dcpq:%s:stream_0_start_seqno", name);
3458     check((uint64_t)get_ull_stat(h, h1, stats_start_seqno, "dcp")
3459           == start, "Start Seqno Didn't match");
3460
3461     char stats_end_seqno[50];
3462     snprintf(stats_end_seqno, sizeof(stats_end_seqno),"eq_dcpq:%s:stream_0_end_seqno", name);
3463     checkeq(end, (uint64_t)get_ull_stat(h, h1, stats_end_seqno, "dcp"),
3464             "End Seqno didn't match");
3465
3466     char stats_vb_uuid[50];
3467     snprintf(stats_vb_uuid, sizeof(stats_vb_uuid),"eq_dcpq:%s:stream_0_vb_uuid", name);
3468     check((uint64_t)get_ull_stat(h, h1, stats_vb_uuid, "dcp")
3469           == vb_uuid, "VBucket UUID didn't match");
3470
3471     char stats_snap_seqno[50];
3472     snprintf(stats_snap_seqno, sizeof(stats_snap_seqno),"eq_dcpq:%s:stream_0_snap_start_seqno", name);
3473     check((uint64_t)get_ull_stat(h, h1, stats_snap_seqno, "dcp")
3474           == snap_start_seqno, "snap start seqno didn't match");
3475
3476     struct dcp_message_producers* producers = get_dcp_producers();
3477
3478     if ((flags & DCP_ADD_STREAM_FLAG_TAKEOVER) == 0 &&
3479         (flags & DCP_ADD_STREAM_FLAG_DISKONLY) == 0 &&
3480         !skipEstimateCheck) {
3481         int est = end - start;
3482         std::stringstream stats_takeover;
3483         stats_takeover << "dcp-vbtakeover " << vbucket << " " << name;
3484         wait_for_stat_to_be_lte(h, h1, "estimate", est,
3485                                 stats_takeover.str().c_str());
3486     }
3487
3488     bool done = false;
3489     bool exp_all_items_streamed = true;
3490     int num_mutations = 0;
3491     int num_deletions = 0;
3492     int num_snapshot_marker = 0;
3493     int num_set_vbucket_pending = 0;
3494     int num_set_vbucket_active = 0;
3495
3496     bool pending_marker_ack = false;
3497     uint64_t marker_end = 0;
3498
3499     uint64_t last_by_seqno = 0;
3500     uint32_t bytes_read = 0;
3501     uint64_t all_bytes = 0;
3502     uint64_t total_acked_bytes = 0;
3503     uint64_t ack_limit = flow_control_buf_size/2;
3504     do {
3505         if (!disable_ack && (bytes_read > ack_limit)) {
3506             h1->dcp.buffer_acknowledgement(h, cookie, ++opaque, 0, bytes_read);
3507             total_acked_bytes += bytes_read;
3508             bytes_read = 0;
3509         }
3510         ENGINE_ERROR_CODE err = h1->dcp.step(h, cookie, producers);
3511         if (err == ENGINE_DISCONNECT) {
3512             done = true;
3513         } else {
3514             switch (dcp_last_op) {
3515                 case PROTOCOL_BINARY_CMD_DCP_MUTATION:
3516                     check(last_by_seqno < dcp_last_byseqno, "Expected bigger seqno");
3517                     check(dcp_last_nru == exp_nru_value, "Expected different NRU value");
3518                     last_by_seqno = dcp_last_byseqno;
3519                     num_mutations++;
3520                     bytes_read += dcp_last_packet_size;
3521                     all_bytes += dcp_last_packet_size;
3522                     if (pending_marker_ack && dcp_last_byseqno == marker_end) {
3523                         sendDcpAck(h, h1, cookie, PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER,
3524                                PROTOCOL_BINARY_RESPONSE_SUCCESS, dcp_last_opaque);
3525                     }
3526                     break;
3527                 case PROTOCOL_BINARY_CMD_DCP_DELETION:
3528                     check(last_by_seqno < dcp_last_byseqno, "Expected bigger seqno");
3529                     last_by_seqno = dcp_last_byseqno;
3530                     num_deletions++;
3531                     bytes_read += dcp_last_packet_size;
3532                     all_bytes += dcp_last_packet_size;
3533                     if (pending_marker_ack && dcp_last_byseqno == marker_end) {
3534                         sendDcpAck(h, h1, cookie, PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER,
3535                                PROTOCOL_BINARY_RESPONSE_SUCCESS, dcp_last_opaque);
3536                     }
3537                     break;
3538                 case PROTOCOL_BINARY_CMD_DCP_STREAM_END:
3539                     done = true;
3540                     bytes_read += dcp_last_packet_size;
3541                     all_bytes += dcp_last_packet_size;
3542                     break;
3543                 case PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER:
3544                     if (exp_disk_snapshot && num_snapshot_marker == 0) {
3545                         check(dcp_last_flags == 1, "Expected disk snapshot");
3546                     }
3547
3548                     if (dcp_last_flags & 8) {
3549                         pending_marker_ack = true;
3550                         marker_end = dcp_last_snap_end_seqno;
3551                     }
3552
3553                     num_snapshot_marker++;
3554                     bytes_read += dcp_last_packet_size;
3555                     all_bytes += dcp_last_packet_size;
3556                     break;
3557                 case PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE:
3558                     if (dcp_last_vbucket_state == vbucket_state_pending) {
3559                         num_set_vbucket_pending++;
3560                         for (int j = 0; j < extra_takeover_ops; ++j) {
3561                             item *i = NULL;
3562                             std::stringstream ss;
3563                             ss << "key" << j;
3564                             check(store(h, h1, NULL, OPERATION_SET,
3565                                         ss.str().c_str(), "data", &i)
3566                                   == ENGINE_SUCCESS, "Failed to store a value");
3567                             h1->release(h, NULL, i);
3568                         }
3569                     } else if (dcp_last_vbucket_state == vbucket_state_active) {
3570                         num_set_vbucket_active++;
3571                     }
3572                     bytes_read += dcp_last_packet_size;
3573                     all_bytes += dcp_last_packet_size;
3574                     sendDcpAck(h, h1, cookie, PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE,
3575                                PROTOCOL_BINARY_RESPONSE_SUCCESS, dcp_last_opaque);
3576                     break;
3577                 case 0:
3578                     /* No messages were ready on the last step call so we
3579                      * should just ignore this case. Note that we check for 0
3580                      * because we clear the dcp_last_op value below.
3581                      */
3582                      if (disable_ack && flow_control_buf_size) {
3583                          /* If there is no acking and if flow control is enabled
3584                             we are done because producer should not send us any
3585                             more items. */
3586                          done = true;
3587                          exp_all_items_streamed = false;
3588                      }
3589                      break;
3590                 default:
3591                     break;
3592                     abort();
3593             }
3594             dcp_last_op = 0;
3595             dcp_last_nru = 0;
3596         }
3597     } while (!done);
3598
3599     check(num_mutations == exp_mutations, "Invalid number of mutations");
3600     check(num_deletions == exp_deletions, "Invalid number of deletes");
3601     check(num_snapshot_marker == exp_markers,
3602           "Didn't receive expected number of snapshot marker");
3603
3604     if (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) {
3605         check(num_set_vbucket_pending == 1, "Didn't receive pending set state");
3606         check(num_set_vbucket_active == 1, "Didn't receive active set state");
3607     }
3608
3609     /* Check if the readyQ size goes to zero after all items are streamed */
3610     if (exp_all_items_streamed) {
3611         char stats_ready_queue_memory[50];
3612         snprintf(stats_ready_queue_memory, sizeof(stats_ready_queue_memory),
3613                  "eq_dcpq:%s:stream_0_ready_queue_memory", name);
3614         check((uint64_t)get_ull_stat(h, h1, stats_ready_queue_memory, "dcp")
3615               == 0, "readyQ size did not go to zero");
3616     }
3617
3618     /* Check if the producer has updated flow control stat correctly */
3619     if (flow_control_buf_size) {
3620         memset(stats_buffer, 0, 50);
3621         snprintf(stats_buffer, sizeof(stats_buffer), "eq_dcpq:%s:unacked_bytes",
3622                  name);
3623         checkeq(static_cast<int>(all_bytes - total_acked_bytes),
3624                 get_int_stat(h, h1, stats_buffer, "dcp"),
3625                 "Buffer Size did not get set correctly");
3626     }
3627     free(producers);
3628 }
3629
3630 static void dcp_stream_req(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
3631                            uint32_t opaque, uint32_t stream_flag,
3632                            uint16_t vbucket, uint64_t start,
3633                            uint64_t end, uint64_t uuid,
3634                            uint64_t snap_start_seqno, uint64_t snap_end_seqno,
3635                            uint64_t exp_rollback, ENGINE_ERROR_CODE err) {
3636     const void *cookie = testHarness.create_cookie();
3637     uint32_t flags = DCP_OPEN_PRODUCER;
3638     const char *name = "unittest";
3639
3640     // Open consumer connection
3641     check(h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, strlen(name))
3642           == ENGINE_SUCCESS, "Failed dcp Consumer open connection.");
3643
3644     uint64_t rollback = 0;
3645     ENGINE_ERROR_CODE rv = h1->dcp.stream_req(h, cookie, stream_flag, 1,
3646                                               vbucket, start, end, uuid,
3647                                               snap_start_seqno, snap_end_seqno,
3648                                               &rollback,
3649                                               mock_dcp_add_failover_log);
3650     check(rv == err, "Unexpected error code");
3651     if (err == ENGINE_ROLLBACK || err == ENGINE_KEY_ENOENT) {
3652         check(exp_rollback == rollback, "Rollback didn't match expected value");
3653     }
3654     testHarness.destroy_cookie(cookie);
3655 }
3656
3657 static enum test_result test_dcp_producer_stream_req_partial(ENGINE_HANDLE *h,
3658                                                              ENGINE_HANDLE_V1 *h1) {
3659     int num_items = 200;
3660     for (int j = 0; j < num_items; ++j) {
3661         item *i = NULL;
3662         std::stringstream ss;
3663         ss << "key" << j;
3664         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
3665               == ENGINE_SUCCESS, "Failed to store a value");
3666         h1->release(h, NULL, i);
3667     }
3668
3669     wait_for_flusher_to_settle(h, h1);
3670     stop_persistence(h, h1);
3671
3672     for (int j = 0; j < (num_items / 2); ++j) {
3673         std::stringstream ss;
3674         ss << "key" << j;
3675         check(del(h, h1, ss.str().c_str(), 0, 0) == ENGINE_SUCCESS,
3676               "Expected delete to succeed");
3677     }
3678
3679     wait_for_stat_to_be(h, h1, "vb_0:num_checkpoints", 2, "checkpoint");
3680
3681     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
3682
3683     const void *cookie = testHarness.create_cookie();
3684
3685     dcp_stream(h, h1, "unittest", cookie, 0, 0, 95, 209, vb_uuid, 95, 95, 105,
3686                100, 2, 0, 2);
3687
3688     testHarness.destroy_cookie(cookie);
3689
3690     return SUCCESS;
3691 }
3692
3693 static enum test_result test_dcp_producer_stream_req_full(ENGINE_HANDLE *h,
3694                                                           ENGINE_HANDLE_V1 *h1) {
3695     int num_items = 300;
3696     for (int j = 0; j < num_items; ++j) {
3697         if (j % 100 == 0) {
3698             wait_for_flusher_to_settle(h, h1);
3699         }
3700         item *i = NULL;
3701         std::stringstream ss;
3702         ss << "key" << j;
3703         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
3704               == ENGINE_SUCCESS, "Failed to store a value");
3705         h1->release(h, NULL, i);
3706     }
3707
3708     wait_for_flusher_to_settle(h, h1);
3709     verify_curr_items(h, h1, num_items, "Wrong amount of items");
3710     wait_for_stat_to_be(h, h1, "vb_0:num_checkpoints", 1, "checkpoint");
3711
3712     uint64_t end = get_int_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
3713     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
3714
3715     const void *cookie = testHarness.create_cookie();
3716
3717     dcp_stream(h, h1, "unittest", cookie, 0, 0, 0, end, vb_uuid, 0, 0,
3718                num_items, 0, 1, 0, 2);
3719
3720     testHarness.destroy_cookie(cookie);
3721
3722     return SUCCESS;
3723 }
3724
3725 static enum test_result test_dcp_producer_stream_req_disk(ENGINE_HANDLE *h,
3726                                                           ENGINE_HANDLE_V1 *h1) {
3727     int num_items = 400;
3728     for (int j = 0; j < num_items; ++j) {
3729         if (j == 200) {
3730             wait_for_flusher_to_settle(h, h1);
3731             wait_for_stat_to_be(h, h1, "ep_items_rm_from_checkpoints", 200);
3732             stop_persistence(h, h1);
3733         }
3734         item *i = NULL;
3735         std::stringstream ss;
3736         ss << "key" << j;
3737         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
3738               == ENGINE_SUCCESS, "Failed to store a value");
3739         h1->release(h, NULL, i);
3740     }
3741
3742     verify_curr_items(h, h1, num_items, "Wrong amount of items");
3743     wait_for_stat_to_be(h, h1, "vb_0:num_checkpoints", 2, "checkpoint");
3744
3745     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
3746
3747     const void *cookie = testHarness.create_cookie();
3748
3749     dcp_stream(h, h1,"unittest", cookie, 0, 0, 0, 200, vb_uuid, 0, 0, 200, 0, 1,
3750                0, 2);
3751
3752     testHarness.destroy_cookie(cookie);
3753
3754     return SUCCESS;
3755 }
3756
3757 static enum test_result test_dcp_producer_stream_req_diskonly(ENGINE_HANDLE *h,
3758                                                               ENGINE_HANDLE_V1 *h1) {
3759     int num_items = 300;
3760     for (int j = 0; j < num_items; ++j) {
3761         if (j % 100 == 0) {
3762             wait_for_flusher_to_settle(h, h1);
3763         }
3764         item *i = NULL;
3765         std::stringstream ss;
3766         ss << "key" << j;
3767         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
3768               == ENGINE_SUCCESS, "Failed to store a value");
3769         h1->release(h, NULL, i);
3770     }
3771
3772     wait_for_flusher_to_settle(h, h1);
3773     verify_curr_items(h, h1, num_items, "Wrong amount of items");
3774     wait_for_stat_to_be(h, h1, "vb_0:num_checkpoints", 1, "checkpoint");
3775
3776     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
3777     uint32_t flags = DCP_ADD_STREAM_FLAG_DISKONLY;
3778
3779     const void *cookie = testHarness.create_cookie();
3780
3781     dcp_stream(h, h1, "unittest", cookie, 0, flags, 0, -1, vb_uuid, 0, 0, 300,
3782                0, 1, 0, 2);
3783
3784     testHarness.destroy_cookie(cookie);
3785
3786     return SUCCESS;
3787 }
3788
3789 static enum test_result test_dcp_producer_stream_req_mem(ENGINE_HANDLE *h,
3790                                                          ENGINE_HANDLE_V1 *h1) {
3791     int num_items = 300;
3792     for (int j = 0; j < num_items; ++j) {
3793         if (j % 100 == 0) {
3794             wait_for_flusher_to_settle(h, h1);
3795         }
3796         item *i = NULL;
3797         std::stringstream ss;
3798         ss << "key" << j;
3799         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
3800               == ENGINE_SUCCESS, "Failed to store a value");
3801         h1->release(h, NULL, i);
3802     }
3803
3804     wait_for_flusher_to_settle(h, h1);
3805     verify_curr_items(h, h1, num_items, "Wrong amount of items");
3806
3807     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
3808
3809     const void *cookie = testHarness.create_cookie();
3810
3811     dcp_stream(h, h1, "unittest", cookie, 0, 0, 200, 300, vb_uuid, 200, 200,
3812                100, 0, 1, 0, 2);
3813
3814     testHarness.destroy_cookie(cookie);
3815
3816     return SUCCESS;
3817 }
3818
3819 static enum test_result test_dcp_producer_stream_latest(ENGINE_HANDLE *h,
3820                                                         ENGINE_HANDLE_V1 *h1) {
3821     int num_items = 300;
3822     for (int j = 0; j < num_items; ++j) {
3823         if (j % 100 == 0) {
3824             wait_for_flusher_to_settle(h, h1);
3825         }
3826         item *i = NULL;
3827         std::stringstream ss;
3828         ss << "key" << j;
3829         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
3830               == ENGINE_SUCCESS, "Failed to store a value");
3831         h1->release(h, NULL, i);
3832     }
3833
3834     wait_for_flusher_to_settle(h, h1);
3835     verify_curr_items(h, h1, num_items, "Wrong amount of items");
3836
3837     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
3838
3839     const void *cookie = testHarness.create_cookie();
3840
3841     uint32_t flags = DCP_ADD_STREAM_FLAG_LATEST;
3842     dcp_stream(h, h1, "unittest", cookie, 0, flags, 200, 205, vb_uuid, 200, 200,
3843                100, 0, 1, 0, 2);
3844
3845     testHarness.destroy_cookie(cookie);
3846
3847     return SUCCESS;
3848 }
3849
3850 static test_result test_dcp_producer_stream_req_nmvb(ENGINE_HANDLE *h,
3851                                                      ENGINE_HANDLE_V1 *h1) {
3852     const void *cookie1 = testHarness.create_cookie();
3853     uint32_t opaque = 0;
3854     uint32_t seqno = 0;
3855     uint32_t flags = DCP_OPEN_PRODUCER;
3856     const char *name = "unittest";
3857     uint16_t nname = strlen(name);
3858
3859     check(h1->dcp.open(h, cookie1, opaque, seqno, flags, (void*)name, nname)
3860           == ENGINE_SUCCESS,
3861           "Failed dcp producer open connection.");
3862
3863     uint32_t req_vbucket = 1;
3864     uint64_t rollback = 0;
3865
3866     check(h1->dcp.stream_req(h, cookie1, 0, 0, req_vbucket, 0, 0, 0, 0,
3867                              0, &rollback, mock_dcp_add_failover_log)
3868                 == ENGINE_NOT_MY_VBUCKET,
3869           "Expected not my vbucket");
3870     testHarness.destroy_cookie(cookie1);
3871
3872     return SUCCESS;
3873 }
3874
3875 static test_result test_dcp_agg_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
3876     int num_items = 300;
3877     for (int j = 0; j < num_items; ++j) {
3878         if (j % 100 == 0) {
3879             wait_for_flusher_to_settle(h, h1);
3880         }
3881         item *i = NULL;
3882         std::stringstream ss;
3883         ss << "key" << j;
3884         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
3885               == ENGINE_SUCCESS, "Failed to store a value");
3886         h1->release(h, NULL, i);
3887     }
3888
3889     wait_for_flusher_to_settle(h, h1);
3890     verify_curr_items(h, h1, num_items, "Wrong amount of items");
3891
3892     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
3893
3894     const void *cookie[5];
3895
3896     for (int j = 0; j < 5; ++j) {
3897         char name[12];
3898         snprintf(name, sizeof(name), "unittest_%d", j);
3899         cookie[j] = testHarness.create_cookie();
3900         dcp_stream(h, h1, name, cookie[j], 0, 0, 200, 300, vb_uuid, 200, 200,
3901                    100, 0, 1, 0, 2);
3902     }
3903
3904     check(get_int_stat(h, h1, "unittest:producer_count", "dcpagg _") == 5,
3905           "producer count mismatch");
3906     check(get_int_stat(h, h1, "unittest:total_bytes", "dcpagg _") == 32860,
3907           "aggregate total bytes sent mismatch");
3908     check(get_int_stat(h, h1, "unittest:items_sent", "dcpagg _") == 500,
3909           "aggregate total items sent mismatch");
3910     check(get_int_stat(h, h1, "unittest:items_remaining", "dcpagg _") == 0,
3911           "aggregate total items remaining mismatch");
3912
3913     for (int j = 0; j < 5; ++j) {
3914         testHarness.destroy_cookie(cookie[j]);
3915     }
3916
3917     return SUCCESS;
3918 }
3919
3920 static test_result test_dcp_takeover(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
3921     int num_items = 10;
3922     for (int j = 0; j < num_items; ++j) {
3923         item *i = NULL;
3924         std::stringstream ss;
3925         ss << "key" << j;
3926         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
3927               == ENGINE_SUCCESS, "Failed to store a value");
3928         h1->release(h, NULL, i);
3929     }
3930
3931     const void *cookie = testHarness.create_cookie();
3932     const char *name = "unittest";
3933     uint16_t nname = strlen(name);
3934
3935     check(h1->dcp.open(h, cookie, 0, 0, DCP_OPEN_PRODUCER, (void*)name, nname)
3936           == ENGINE_SUCCESS,
3937           "Failed dcp producer open connection.");
3938
3939     uint32_t flags = DCP_ADD_STREAM_FLAG_TAKEOVER;
3940     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
3941
3942     const void *cookie1 = testHarness.create_cookie();
3943     dcp_stream(h, h1, "unittest", cookie1, 0, flags, 0, 1000, vb_uuid, 0, 0, 20,
3944                0, 2, 10, 2);
3945
3946     check(verify_vbucket_state(h, h1, 0, vbucket_state_dead), "Wrong vb state");
3947
3948     testHarness.destroy_cookie(cookie);
3949     testHarness.destroy_cookie(cookie1);
3950
3951     return SUCCESS;
3952 }
3953
3954 static test_result test_dcp_takeover_no_items(ENGINE_HANDLE *h,
3955                                               ENGINE_HANDLE_V1 *h1) {
3956     int num_items = 10;
3957     for (int j = 0; j < num_items; ++j) {
3958         item *i = NULL;
3959         std::stringstream ss;
3960         ss << "key" << j;
3961         check(store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i)
3962               == ENGINE_SUCCESS, "Failed to store a value");
3963         h1->release(h, NULL, i);
3964     }
3965
3966     const void *cookie = testHarness.create_cookie();
3967     const char *name = "unittest";
3968     uint32_t opaque = 1;
3969
3970     check(h1->dcp.open(h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER, (void*)name,
3971                        strlen(name)) == ENGINE_SUCCESS,
3972           "Failed dcp producer open connection.");
3973
3974     uint16_t vbucket = 0;
3975     uint32_t flags = DCP_ADD_STREAM_FLAG_TAKEOVER;
3976     uint64_t start_seqno = 10;
3977     uint64_t end_seqno = std::numeric_limits<uint64_t>::max();
3978     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
3979     uint64_t snap_start_seqno = 10;
3980     uint64_t snap_end_seqno = 10;
3981
3982     uint64_t rollback = 0;
3983     check(h1->dcp.stream_req(h, cookie, flags, ++opaque, vbucket, start_seqno,
3984                              end_seqno, vb_uuid, snap_start_seqno,
3985                              snap_end_seqno, &rollback,
3986                              mock_dcp_add_failover_log)
3987                 == ENGINE_SUCCESS,
3988           "Failed to initiate stream request");
3989
3990     struct dcp_message_producers* producers = get_dcp_producers();
3991
3992     bool done = false;
3993     int num_snapshot_marker = 0;
3994     int num_set_vbucket_pending = 0;
3995     int num_set_vbucket_active = 0;
3996
3997     do {
3998         ENGINE_ERROR_CODE err = h1->dcp.step(h, cookie, producers);
3999         if (err == ENGINE_DISCONNECT) {
4000             done = true;
4001         } else {
4002             switch (dcp_last_op) {
4003                 case PROTOCOL_BINARY_CMD_DCP_STREAM_END:
4004                     done = true;
4005                     break;
4006                 case PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER:
4007                     num_snapshot_marker++;
4008                     break;
4009                 case PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE:
4010                     if (dcp_last_vbucket_state == vbucket_state_pending) {
4011                         num_set_vbucket_pending++;
4012                     } else if (dcp_last_vbucket_state == vbucket_state_active) {
4013                         num_set_vbucket_active++;
4014                     }
4015                     sendDcpAck(h, h1, cookie, PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE,
4016                                PROTOCOL_BINARY_RESPONSE_SUCCESS, dcp_last_opaque);
4017                     break;
4018                 case 0:
4019                      break;
4020                 default:
4021                     break;
4022                     abort();
4023             }
4024             dcp_last_op = 0;
4025         }
4026     } while (!done);
4027
4028     check(num_snapshot_marker == 0, "Invalid number of snapshot marker");
4029     check(num_set_vbucket_pending == 1, "Didn't receive pending set state");
4030     check(num_set_vbucket_active == 1, "Didn't receive active set state");
4031
4032     free(producers);
4033     check(verify_vbucket_state(h, h1, 0, vbucket_state_dead), "Wrong vb state");
4034     testHarness.destroy_cookie(cookie);
4035
4036     return SUCCESS;
4037 }
4038
4039 static uint32_t add_stream_for_consumer(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
4040                                         const void* cookie, uint32_t opaque,
4041                                         uint16_t vbucket, uint32_t flags,
4042                                         protocol_binary_response_status response,
4043                      &