1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2012 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include "ep_test_apis.h"
21 #include <memcached/util.h>
22 #include <platform/platform.h>
30 #include "mock/mock_dcp.h"
32 #define check(expr, msg) \
33 static_cast<void>((expr) ? 0 : abort_msg(#expr, msg, __LINE__))
35 std::map<std::string, std::string> vals;
36 bool dump_stats = false;
37 AtomicValue<protocol_binary_response_status> last_status(
38 static_cast<protocol_binary_response_status>(0));
40 std::string last_body;
41 bool last_deleted_flag(false);
42 AtomicValue<uint8_t> last_conflict_resolution_mode{0xff};
43 AtomicValue<uint64_t> last_cas(0);
44 AtomicValue<uint8_t> last_datatype(0x00);
45 ItemMetaData last_meta;
46 uint64_t last_uuid = 0;
47 uint64_t last_seqno = 0;
49 extern "C" bool add_response_get_meta(const void *key, uint16_t keylen,
50 const void *ext, uint8_t extlen,
51 const void *body, uint32_t bodylen,
52 uint8_t datatype, uint16_t status,
53 uint64_t cas, const void *cookie);
54 void encodeExt(char *buffer, uint32_t val);
55 void encodeWithMetaExt(char *buffer, ItemMetaData *meta);
57 void decayingSleep(useconds_t *sleepTime) {
58 static const useconds_t maxSleepTime = 500000;
60 *sleepTime = std::min(*sleepTime << 1, maxSleepTime);
63 ENGINE_ERROR_CODE vb_map_response(const void *cookie,
67 last_body.assign(static_cast<const char*>(map), mapsize);
68 return ENGINE_SUCCESS;
71 bool add_response(const void *key, uint16_t keylen, const void *ext,
72 uint8_t extlen, const void *body, uint32_t bodylen,
73 uint8_t datatype, uint16_t status, uint64_t cas,
78 last_status = static_cast<protocol_binary_response_status>(status);
79 last_body.assign(static_cast<const char*>(body), bodylen);
80 last_key.assign(static_cast<const char*>(key), keylen);
82 last_datatype = datatype;
86 bool add_response_get_meta(const void *key, uint16_t keylen, const void *ext,
87 uint8_t extlen, const void *body, uint32_t bodylen,
88 uint8_t datatype, uint16_t status, uint64_t cas,
91 const uint8_t* ext_bytes = reinterpret_cast<const uint8_t*> (ext);
92 if (ext && extlen > 0) {
94 memcpy(&flags, ext_bytes, 4);
95 last_deleted_flag = ntohl(flags) & GET_META_ITEM_DELETED_FLAG;
96 memcpy(&last_meta.flags, ext_bytes + 4, 4);
97 memcpy(&last_meta.exptime, ext_bytes + 8, 4);
98 last_meta.exptime = ntohl(last_meta.exptime);
99 memcpy(&last_meta.revSeqno, ext_bytes + 12, 8);
100 last_meta.revSeqno = ntohll(last_meta.revSeqno);
103 memcpy(&last_conflict_resolution_mode, ext_bytes + 20, 1);
106 return add_response(key, keylen, ext, extlen, body, bodylen, datatype,
107 status, cas, cookie);
110 bool add_response_set_del_meta(const void *key, uint16_t keylen, const void *ext,
111 uint8_t extlen, const void *body, uint32_t bodylen,
112 uint8_t datatype, uint16_t status, uint64_t cas,
113 const void *cookie) {
115 const uint8_t* ext_bytes = reinterpret_cast<const uint8_t*> (ext);
116 if (ext && extlen > 0) {
119 memcpy(&vb_uuid, ext_bytes, 8);
120 memcpy(&seqno, ext_bytes + 8, 8);
121 last_uuid = ntohll(vb_uuid);
122 last_seqno = ntohll(seqno);
125 return add_response(key, keylen, ext, extlen, body, bodylen, datatype,
126 status, cas, cookie);
129 bool add_response_ret_meta(const void *key, uint16_t keylen, const void *ext,
130 uint8_t extlen, const void *body, uint32_t bodylen,
131 uint8_t datatype, uint16_t status, uint64_t cas,
132 const void *cookie) {
134 const uint8_t* ext_bytes = reinterpret_cast<const uint8_t*> (ext);
135 if (ext && extlen == 16) {
136 memcpy(&last_meta.flags, ext_bytes, 4);
137 memcpy(&last_meta.exptime, ext_bytes + 4, 4);
138 last_meta.exptime = ntohl(last_meta.exptime);
139 memcpy(&last_meta.revSeqno, ext_bytes + 8, 8);
140 last_meta.revSeqno = ntohll(last_meta.revSeqno);
143 return add_response(key, keylen, ext, extlen, body, bodylen, datatype,
144 status, cas, cookie);
147 void add_stats(const char *key, const uint16_t klen, const char *val,
148 const uint32_t vlen, const void *cookie) {
150 std::string k(key, klen);
151 std::string v(val, vlen);
154 std::cout << "stat[" << k << "] = " << v << std::endl;
160 void encodeExt(char *buffer, uint32_t val) {
162 memcpy(buffer, (char*)&val, sizeof(val));
165 void encodeWithMetaExt(char *buffer, ItemMetaData *meta) {
166 uint32_t flags = meta->flags;
167 uint32_t exp = htonl(meta->exptime);
168 uint64_t seqno = htonll(meta->revSeqno);
169 uint64_t cas = htonll(meta->cas);
171 memcpy(buffer, (char*)&flags, sizeof(flags));
172 memcpy(buffer + 4, (char*)&exp, sizeof(exp));
173 memcpy(buffer + 8, (char*)&seqno, sizeof(seqno));
174 memcpy(buffer + 16, (char*)&cas, sizeof(cas));
177 protocol_binary_request_header* createPacket(uint8_t opcode,
190 uint32_t headerlen = sizeof(protocol_binary_request_header);
191 pkt_raw = static_cast<char*>(calloc(1, headerlen + extlen + keylen + vallen + nmeta));
193 protocol_binary_request_header *req =
194 (protocol_binary_request_header*)pkt_raw;
195 req->request.opcode = opcode;
196 req->request.keylen = htons(keylen);
197 req->request.extlen = extlen;
198 req->request.vbucket = htons(vbid);
199 req->request.bodylen = htonl(keylen + vallen + extlen + nmeta);
200 req->request.cas = htonll(cas);
201 req->request.datatype = datatype;
204 memcpy(pkt_raw + headerlen, ext, extlen);
208 memcpy(pkt_raw + headerlen + extlen, key, keylen);
212 memcpy(pkt_raw + headerlen + extlen + keylen, val, vallen);
215 // Extended meta: To be used for set_with_meta/del_with_meta/add_with_meta
216 if (meta && nmeta > 0) {
217 memcpy(pkt_raw + headerlen + extlen + keylen + vallen,
224 void set_drift_counter_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
225 int64_t initialDriftCount, uint8_t timeSync) {
227 protocol_binary_request_header *request;
229 int64_t driftCount = htonll(initialDriftCount);
230 uint8_t extlen = sizeof(driftCount) + sizeof(timeSync);
231 char *ext = new char[extlen];
232 memcpy(ext, (char*)&driftCount, sizeof(driftCount));
233 memcpy(ext + sizeof(driftCount), (char*)&timeSync, sizeof(timeSync));
235 request = createPacket(PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE,
237 h1->unknown_command(h, NULL, request, add_response);
238 check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
239 "Expected success for CMD_SET_DRIFT_COUNTER_STATE");
243 void add_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
244 const size_t keylen, const char *val, const size_t vallen,
245 const uint32_t vb, ItemMetaData *itemMeta,
246 bool skipConflictResolution, uint8_t datatype,
247 bool includeExtMeta, int64_t adjustedTime) {
250 ExtendedMetaData *emd = NULL;
251 if (!includeExtMeta) {
252 blen = skipConflictResolution ? 28 : 24;
253 ext = new char[blen];
254 encodeWithMetaExt(ext, itemMeta);
256 if (skipConflictResolution) {
257 uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
259 memcpy(ext + 24, (char*)&flag, sizeof(flag));
263 ext = new char[blen];
264 encodeWithMetaExt(ext, itemMeta);
265 emd = new ExtendedMetaData(adjustedTime);
266 // nmeta added to ext below
269 protocol_binary_request_header *pkt;
271 std::pair<const char*, uint16_t> meta = emd->getExtMeta();
272 uint16_t nmeta = htons(meta.second);
273 memcpy(ext + 24, (char*)&nmeta, sizeof(nmeta));
274 pkt = createPacket(PROTOCOL_BINARY_CMD_ADD_WITH_META, vb, 0, ext, blen, key,
275 keylen, val, vallen, datatype, meta.first, meta.second);
278 pkt = createPacket(PROTOCOL_BINARY_CMD_ADD_WITH_META, vb, 0, ext, blen, key,
279 keylen, val, vallen, datatype);
281 check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
282 "Expected to be able to store with meta");
286 void changeVBFilter(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, std::string name,
287 std::map<uint16_t, uint64_t> &filtermap) {
288 std::stringstream value;
289 uint16_t vbs = htons(filtermap.size());
290 std::map<uint16_t, uint64_t>::iterator it;
292 value.write((char*) &vbs, sizeof(uint16_t));
293 for (it = filtermap.begin(); it != filtermap.end(); ++it) {
294 uint16_t vb = htons(it->first);
295 uint64_t chkid = htonll(it->second);
296 value.write((char*) &vb, sizeof(uint16_t));
297 value.write((char*) &chkid, sizeof(uint64_t));
300 protocol_binary_request_header *request;
301 request = createPacket(PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER, 0, 0, NULL, 0, name.c_str(),
302 name.length(), value.str().data(), value.str().length());
303 check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
304 "Failed to change the TAP VB filter.");
308 void createCheckpoint(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
309 protocol_binary_request_header *request = createPacket(PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT);
310 check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
311 "Failed to create a new checkpoint.");
315 ENGINE_ERROR_CODE del(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
316 uint64_t cas, uint16_t vbucket, const void* cookie) {
317 mutation_descr_t mut_info;
318 return h1->remove(h, cookie, key, strlen(key), &cas, vbucket, &mut_info);
321 void del_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
322 const size_t keylen, const uint32_t vb,
323 ItemMetaData *itemMeta, uint64_t cas_for_delete,
324 bool skipConflictResolution, bool includeExtMeta,
325 int64_t adjustedTime, uint8_t conflictResMode,
326 const void *cookie) {
329 ExtendedMetaData *emd = NULL;
330 if (!includeExtMeta) {
331 blen = skipConflictResolution ? 28 : 24;
332 ext = new char[blen];
333 encodeWithMetaExt(ext, itemMeta);
335 if (skipConflictResolution) {
336 uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
338 memcpy(ext + 24, (char*)&flag, sizeof(flag));
342 ext = new char[blen];
343 encodeWithMetaExt(ext, itemMeta);
344 emd = new ExtendedMetaData(adjustedTime, conflictResMode);
345 // nmeta added to ext below
348 protocol_binary_request_header *pkt;
350 std::pair<const char*, uint16_t> meta = emd->getExtMeta();
351 uint16_t nmeta = htons(meta.second);
352 memcpy(ext + 24, (char*)&nmeta, sizeof(nmeta));
353 pkt = createPacket(PROTOCOL_BINARY_CMD_DEL_WITH_META, vb, cas_for_delete,
354 ext, blen, key, keylen, NULL, 0, 0x00, meta.first,
358 pkt = createPacket(PROTOCOL_BINARY_CMD_DEL_WITH_META, vb, cas_for_delete,
359 ext, blen, key, keylen, NULL, 0, 0x00);
361 check(h1->unknown_command(h, cookie, pkt, add_response_set_del_meta) == ENGINE_SUCCESS,
362 "Expected to be able to delete with meta");
366 void evict_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
367 uint16_t vbucketId, const char *msg, bool expectError) {
368 int nonResidentItems = get_int_stat(h, h1, "ep_num_non_resident");
369 int numEjectedItems = get_int_stat(h, h1, "ep_num_value_ejects");
370 protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_EVICT_KEY, 0, 0,
371 NULL, 0, key, strlen(key));
372 pkt->request.vbucket = htons(vbucketId);
374 check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
375 "Failed to evict key.");
378 check(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
379 "Expected exists when evicting key.");
381 if (last_body != "Already ejected.") {
385 check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
386 "Expected success evicting key.");
389 check(get_int_stat(h, h1, "ep_num_non_resident") == nonResidentItems,
390 "Incorrect number of non-resident items");
391 check(get_int_stat(h, h1, "ep_num_value_ejects") == numEjectedItems,
392 "Incorrect number of ejected items");
394 if (msg != NULL && last_body != msg) {
395 fprintf(stderr, "Expected evict to return ``%s'', but it returned ``%s''\n",
396 msg, last_body.c_str());
401 size_t estimateVBucketMove(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
402 uint16_t vbid, const char* tap_name) {
403 std::stringstream ss;
404 ss << "tap-vbtakeover " << vbid;
406 ss << " " << tap_name;
408 return get_int_stat(h, h1, "estimate", ss.str().c_str());
411 ENGINE_ERROR_CODE checkpointPersistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
412 uint64_t checkpoint_id, uint16_t vb) {
413 checkpoint_id = htonll(checkpoint_id);
414 protocol_binary_request_header *request;
415 request = createPacket(PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE, vb, 0, NULL, 0, NULL, 0,
416 (const char *)&checkpoint_id, sizeof(uint64_t));
417 ENGINE_ERROR_CODE rv = h1->unknown_command(h, NULL, request, add_response);
422 ENGINE_ERROR_CODE seqnoPersistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
423 uint16_t vbucket, uint64_t seqno) {
424 seqno = htonll(seqno);
426 memcpy(buffer, &seqno, sizeof(uint64_t));
427 protocol_binary_request_header* request =
428 createPacket(PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE, vbucket, 0, buffer, 8);
430 ENGINE_ERROR_CODE rv = h1->unknown_command(h, NULL, request, add_response);
435 void gat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
436 uint16_t vb, uint32_t exp, bool quiet) {
438 uint8_t opcode = quiet ? PROTOCOL_BINARY_CMD_GATQ : PROTOCOL_BINARY_CMD_GAT;
439 uint32_t keylen = key ? strlen(key) : 0;
440 protocol_binary_request_header *request;
442 request = createPacket(opcode, vb, 0, ext, 4, key, keylen);
444 check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
445 "Failed to call gat");
449 bool get_item_info(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, item_info *info,
450 const char* key, uint16_t vb) {
452 if (h1->get(h, NULL, &i, key, strlen(key), vb) != ENGINE_SUCCESS) {
456 if (!h1->get_item_info(h, NULL, i, info)) {
457 h1->release(h, NULL, i);
458 fprintf(stderr, "get_item_info failed\n");
462 h1->release(h, NULL, i);
466 bool get_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, item *i,
471 if (!h1->get_item_info(h, NULL, i, &info)) {
472 fprintf(stderr, "get_item_info failed\n");
476 key.assign((const char*)info.key, info.nkey);
480 void getl(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
481 uint16_t vb, uint32_t lock_timeout) {
483 uint32_t keylen = key ? strlen(key) : 0;
484 protocol_binary_request_header *request;
485 encodeExt(ext, lock_timeout);
486 request = createPacket(PROTOCOL_BINARY_CMD_GET_LOCKED, vb, 0, ext, 4, key, keylen);
488 check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
489 "Failed to call getl");
493 bool get_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
496 protocol_binary_request_header *req;
499 req = createPacket(PROTOCOL_BINARY_CMD_GET_META, 0, 0,
500 (char*)&ext, sizeof(ext), key, strlen(key));
502 req = createPacket(PROTOCOL_BINARY_CMD_GET_META, 0, 0,
503 NULL, 0, key, strlen(key));
506 ENGINE_ERROR_CODE ret = h1->unknown_command(h, NULL, req,
507 add_response_get_meta);
508 check(ret == ENGINE_SUCCESS, "Expected get_meta call to be successful");
509 if (last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
515 void observe(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
516 std::map<std::string, uint16_t> obskeys) {
517 std::stringstream value;
518 std::map<std::string, uint16_t>::iterator it;
519 for (it = obskeys.begin(); it != obskeys.end(); ++it) {
520 uint16_t vb = htons(it->second);
521 uint16_t keylen = htons(it->first.length());
522 value.write((char*) &vb, sizeof(uint16_t));
523 value.write((char*) &keylen, sizeof(uint16_t));
524 value.write(it->first.c_str(), it->first.length());
527 protocol_binary_request_header *request;
528 request = createPacket(PROTOCOL_BINARY_CMD_OBSERVE, 0, 0, NULL, 0, NULL, 0,
529 value.str().data(), value.str().length());
530 check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
531 "Observe call failed");
535 void observe_seqno(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
536 uint16_t vb_id, uint64_t uuid) {
537 protocol_binary_request_header *request;
538 uint64_t vb_uuid = htonll(uuid);
539 std::stringstream data;
540 data.write((char *) &vb_uuid, sizeof(uint64_t));
542 request = createPacket(PROTOCOL_BINARY_CMD_OBSERVE_SEQNO, vb_id, 0, NULL, 0,
543 NULL, 0, data.str().data(), data.str().length());
544 check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
545 "Observe_seqno call failed");
549 void get_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
551 protocol_binary_request_header *pkt;
552 pkt = createPacket(PROTOCOL_BINARY_CMD_GET_REPLICA, vbid, 0, NULL, 0, key, strlen(key));
553 check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
554 "Get Replica Failed");
558 protocol_binary_request_header* prepare_get_replica(ENGINE_HANDLE *h,
559 ENGINE_HANDLE_V1 *h1,
560 vbucket_state_t state,
561 bool makeinvalidkey) {
563 const char *key = "k0";
564 protocol_binary_request_header *pkt;
565 pkt = createPacket(PROTOCOL_BINARY_CMD_GET_REPLICA, id, 0, NULL, 0, key, strlen(key));
567 if (!makeinvalidkey) {
569 check(store(h, h1, NULL, OPERATION_SET, key, "replicadata", &i, 0, id)
570 == ENGINE_SUCCESS, "Get Replica Failed");
571 h1->release(h, NULL, i);
573 check(set_vbucket_state(h, h1, id, state),
574 "Failed to set vbucket active state, Get Replica Failed");
580 bool set_param(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, protocol_binary_engine_param_t paramtype,
581 const char *param, const char *val) {
583 protocol_binary_request_header *pkt;
584 encodeExt(ext, static_cast<uint32_t>(paramtype));
585 pkt = createPacket(PROTOCOL_BINARY_CMD_SET_PARAM, 0, 0, ext, sizeof(protocol_binary_engine_param_t), param,
586 strlen(param), val, strlen(val));
588 if (h1->unknown_command(h, NULL, pkt, add_response) != ENGINE_SUCCESS) {
592 return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
595 bool set_vbucket_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
596 uint16_t vb, vbucket_state_t state) {
599 protocol_binary_request_header *pkt;
600 encodeExt(ext, static_cast<uint32_t>(state));
601 pkt = createPacket(PROTOCOL_BINARY_CMD_SET_VBUCKET, vb, 0, ext, 4);
603 if (h1->unknown_command(h, NULL, pkt, add_response) != ENGINE_SUCCESS) {
608 return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
611 bool get_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
612 vbucket_state_t state, const void *cookie) {
613 protocol_binary_request_header *pkt;
615 char ext[sizeof(vbucket_state_t)];
616 encodeExt(ext, static_cast<uint32_t>(state));
617 pkt = createPacket(PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS, 0, 0, ext,
618 sizeof(vbucket_state_t));
620 pkt = createPacket(PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS);
623 check(h1->unknown_command(h, cookie, pkt, add_response) ==
624 ENGINE_SUCCESS, "Error in getting all vb info");
627 return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
630 void verify_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
631 uint16_t vb_start, uint16_t vb_end) {
632 const int per_vb_resp_size = sizeof(uint16_t) + sizeof(uint64_t);
633 const int high_seqno_offset = sizeof(uint16_t);
635 std::string seqno_body = last_body;
637 /* Check if the total response length is as expected. We expect 10 bytes
638 (2 for vb_id + 8 for seqno) */
639 checkeq((size_t)((vb_end - vb_start + 1) * per_vb_resp_size),
640 last_body.size(), "Failed to get all vb info.");
641 /* Check if the contents are correct */
642 for (int i = 0; i < (vb_end - vb_start + 1); i++) {
643 /* Check for correct vb_id */
644 checkeq(static_cast<const uint16_t>(vb_start + i),
645 ntohs(*(reinterpret_cast<const uint16_t*>(seqno_body.data() +
646 per_vb_resp_size * i))),
648 /* Check for correct high_seqno */
649 std::string vb_stat_seqno("vb_" + std::to_string(vb_start + i) +
651 uint64_t high_seqno_vb = get_ull_stat(h, h1, vb_stat_seqno.c_str(),
653 checkeq(high_seqno_vb,
654 ntohll(*(reinterpret_cast<const uint64_t*>(seqno_body.data() +
655 per_vb_resp_size * i +
656 high_seqno_offset))),
657 "high_seqno mismatch");
661 void set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
662 const size_t keylen, const char *val, const size_t vallen,
663 const uint32_t vb, ItemMetaData *itemMeta,
664 uint64_t cas_for_set, bool skipConflictResolution,
665 uint8_t datatype, bool includeExtMeta,
666 int64_t adjustedTime, uint8_t conflictResMode,
667 const void *cookie) {
670 ExtendedMetaData *emd = NULL;
671 if (!includeExtMeta) {
672 blen = skipConflictResolution ? 28 : 24;
673 ext = new char[blen];
674 encodeWithMetaExt(ext, itemMeta);
676 if (skipConflictResolution) {
677 uint32_t flag = SKIP_CONFLICT_RESOLUTION_FLAG;
679 memcpy(ext + 24, (char*)&flag, sizeof(flag));
683 ext = new char[blen];
684 encodeWithMetaExt(ext, itemMeta);
685 emd = new ExtendedMetaData(adjustedTime, conflictResMode);
686 // nmeta added to ext below
689 protocol_binary_request_header *pkt;
691 std::pair<const char*, uint16_t> meta = emd->getExtMeta();
692 uint16_t nmeta = htons(meta.second);
693 memcpy(ext + 24, (char*)&nmeta, sizeof(nmeta));
694 pkt = createPacket(PROTOCOL_BINARY_CMD_SET_WITH_META, vb, cas_for_set, ext,
695 blen, key, keylen, val, vallen, datatype, meta.first,
699 pkt = createPacket(PROTOCOL_BINARY_CMD_SET_WITH_META, vb, cas_for_set, ext,
700 blen, key, keylen, val, vallen, datatype);
703 check(h1->unknown_command(h, cookie, pkt, add_response_set_del_meta) == ENGINE_SUCCESS,
704 "Expected to be able to store with meta");
708 void return_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
709 const size_t keylen, const char *val, const size_t vallen,
710 const uint32_t vb, const uint64_t cas, const uint32_t flags,
711 const uint32_t exp, const uint32_t type, uint8_t datatype,
712 const void *cookie) {
714 encodeExt(ext, type);
715 encodeExt(ext + 4, flags);
716 encodeExt(ext + 8, exp);
717 protocol_binary_request_header *pkt;
718 pkt = createPacket(PROTOCOL_BINARY_CMD_RETURN_META, vb, cas, ext, 12, key, keylen, val,
720 check(h1->unknown_command(h, cookie, pkt, add_response_ret_meta)
721 == ENGINE_SUCCESS, "Expected to be able to store ret meta");
725 void set_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
726 const size_t keylen, const char *val, const size_t vallen,
727 const uint32_t vb, const uint64_t cas, const uint32_t flags,
728 const uint32_t exp, uint8_t datatype, const void *cookie) {
729 return_meta(h, h1, key, keylen, val, vallen, vb, cas, flags, exp,
730 SET_RET_META, datatype, cookie);
733 void add_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
734 const size_t keylen, const char *val, const size_t vallen,
735 const uint32_t vb, const uint64_t cas, const uint32_t flags,
736 const uint32_t exp, uint8_t datatype, const void *cookie) {
737 return_meta(h, h1, key, keylen, val, vallen, vb, cas, flags, exp,
738 ADD_RET_META, datatype, cookie);
741 void del_ret_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
742 const size_t keylen, const uint32_t vb, const uint64_t cas,
743 const void *cookie) {
744 return_meta(h, h1, key, keylen, NULL, 0, vb, cas, 0, 0,
745 DEL_RET_META, 0x00, cookie);
748 void disable_traffic(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
749 protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC);
750 check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
751 "Failed to send data traffic command to the server");
752 check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
753 "Failed to disable data traffic");
757 void enable_traffic(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
758 protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC);
759 check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
760 "Failed to send data traffic command to the server");
761 check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
762 "Failed to enable data traffic");
766 void start_persistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
767 protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_START_PERSISTENCE);
768 check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
769 "Failed to stop persistence.");
770 check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
771 "Error starting persistence.");
775 void stop_persistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
776 useconds_t sleepTime = 128;
778 if (get_str_stat(h, h1, "ep_flusher_state", 0) == "running") {
781 decayingSleep(&sleepTime);
784 protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_STOP_PERSISTENCE);
785 check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
786 "Failed to stop persistence.");
787 check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
788 "Error stopping persistence.");
792 ENGINE_ERROR_CODE store(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
793 const void *cookie, ENGINE_STORE_OPERATION op,
794 const char *key, const char *value, item **outitem,
795 uint64_t casIn, uint16_t vb, uint32_t exp,
797 return storeCasVb11(h, h1, cookie, op, key, value, strlen(value),
798 9258, outitem, casIn, vb, exp, datatype);
801 ENGINE_ERROR_CODE storeCasVb11(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
802 const void *cookie, ENGINE_STORE_OPERATION op,
803 const char *key, const char *value, size_t vlen,
804 uint32_t flags, item **outitem, uint64_t casIn,
805 uint16_t vb, uint32_t exp, uint8_t datatype) {
809 ENGINE_ERROR_CODE rv = h1->allocate(h, cookie, &it,
813 if (rv != ENGINE_SUCCESS) {
819 if (!h1->get_item_info(h, cookie, it, &info)) {
823 cb_assert(info.value[0].iov_len == vlen);
824 memcpy(info.value[0].iov_base, value, vlen);
825 h1->item_set_cas(h, cookie, it, casIn);
827 rv = h1->store(h, cookie, it, &cas, op, vb);
832 h1->release(h, NULL, it);
838 void touch(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
839 uint16_t vb, uint32_t exp) {
841 uint32_t keylen = key ? strlen(key) : 0;
842 protocol_binary_request_header *request;
844 request = createPacket(PROTOCOL_BINARY_CMD_TOUCH, vb, 0, ext, 4, key, keylen);
846 check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
847 "Failed to call touch");
851 void unl(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
852 uint16_t vb, uint64_t cas) {
853 uint32_t keylen = key ? strlen(key) : 0;
854 protocol_binary_request_header *request;
855 request = createPacket(PROTOCOL_BINARY_CMD_UNLOCK_KEY, vb, cas, NULL, 0, key, keylen);
857 check(h1->unknown_command(h, NULL, request, add_response) == ENGINE_SUCCESS,
858 "Failed to call unl");
862 void compact_db(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
863 const uint16_t vbucket_id,
864 const uint64_t purge_before_ts,
865 const uint64_t purge_before_seq,
866 const uint8_t drop_deletes) {
867 protocol_binary_request_compact_db req;
868 memset(&req, 0, sizeof(req));
869 req.message.body.purge_before_ts = htonll(purge_before_ts);
870 req.message.body.purge_before_seq = htonll(purge_before_seq);
871 req.message.body.drop_deletes = drop_deletes;
873 const char *args = (const char *)&(req.message.body);
874 uint32_t argslen = 24;
876 protocol_binary_request_header *pkt =
877 createPacket(PROTOCOL_BINARY_CMD_COMPACT_DB, vbucket_id, 0, args, argslen, NULL, 0,
879 check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
880 "Failed to request compact vbucket");
883 void vbucketDelete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vb,
885 uint32_t argslen = args ? strlen(args) : 0;
886 protocol_binary_request_header *pkt =
887 createPacket(PROTOCOL_BINARY_CMD_DEL_VBUCKET, vb, 0, NULL, 0, NULL, 0,
889 check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
890 "Failed to request delete bucket");
893 ENGINE_ERROR_CODE verify_key(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
894 const char* key, uint16_t vbucket) {
896 ENGINE_ERROR_CODE rv = h1->get(h, NULL, &i, key, strlen(key), vbucket);
897 if (rv == ENGINE_SUCCESS) {
898 h1->release(h, NULL, i);
903 bool verify_vbucket_missing(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
906 snprintf(vbid, sizeof(vbid), "vb_%d", vb);
907 std::string vbstr(vbid);
909 // Try up to three times to verify the bucket is missing. Bucket
910 // state changes are async.
912 check(h1->get_stats(h, NULL, NULL, 0, add_stats) == ENGINE_SUCCESS,
913 "Failed to get stats.");
915 if (vals.find(vbstr) == vals.end()) {
919 std::cerr << "Expected bucket missing, got " << vals[vbstr] << std::endl;
924 bool verify_vbucket_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vb,
925 vbucket_state_t expected, bool mute) {
926 protocol_binary_request_header *pkt;
927 pkt = createPacket(PROTOCOL_BINARY_CMD_GET_VBUCKET, vb, 0);
929 ENGINE_ERROR_CODE errcode = h1->unknown_command(h, NULL, pkt, add_response);
930 if (errcode != ENGINE_SUCCESS) {
932 fprintf(stderr, "Error code when getting vbucket %d\n", errcode);
937 if (last_status != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
939 fprintf(stderr, "Last protocol status was %d (%s)\n",
941 last_body.size() > 0 ? last_body.c_str() : "unknown");
946 vbucket_state_t state;
947 memcpy(&state, last_body.data(), sizeof(state));
948 state = static_cast<vbucket_state_t>(ntohl(state));
949 return state == expected;
952 void sendDcpAck(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
953 const void* cookie, protocol_binary_command opcode,
954 protocol_binary_response_status status, uint32_t opaque) {
955 protocol_binary_response_header pkt;
956 pkt.response.magic = PROTOCOL_BINARY_RES;
957 pkt.response.opcode = opcode;
958 pkt.response.status = htons(status);
959 pkt.response.opaque = opaque;
961 check(h1->dcp.response_handler(h, cookie, &pkt) == ENGINE_SUCCESS,
965 int get_int_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname,
966 const char *statkey) {
968 check(h1->get_stats(h, NULL, statkey, statkey == NULL ? 0 : strlen(statkey),
969 add_stats) == ENGINE_SUCCESS, "Failed to get stats.");
970 std::string s = vals[statname];
971 return atoi(s.c_str());
974 float get_float_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname,
975 const char *statkey) {
977 check(h1->get_stats(h, NULL, statkey, statkey == NULL ? 0 : strlen(statkey),
978 add_stats) == ENGINE_SUCCESS, "Failed to get stats.");
979 std::string s = vals[statname];
980 return atof(s.c_str());
983 uint64_t get_ull_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname,
984 const char *statkey) {
986 check(h1->get_stats(h, NULL, statkey, statkey == NULL ? 0 : strlen(statkey),
987 add_stats) == ENGINE_SUCCESS, "Failed to get stats.");
988 std::string s = vals[statname];
989 return strtoull(s.c_str(), NULL, 10);
992 std::string get_str_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
993 const char *statname, const char *statkey) {
995 check(h1->get_stats(h, NULL, statkey, statkey == NULL ? 0 : strlen(statkey),
996 add_stats) == ENGINE_SUCCESS, "Failed to get stats.");
997 std::string s = vals[statname];
1001 void verify_curr_items(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, int exp,
1003 int curr_items = get_int_stat(h, h1, "curr_items");
1004 if (curr_items != exp) {
1005 std::cerr << "Expected "<< exp << " curr_items after " << msg
1006 << ", got " << curr_items << std::endl;
1011 /** Helper class used when waiting on statistics to reach a certain value -
1012 * aggregates how long we have been waiting and aborts if the maximum wait time
1015 template <typename T>
1016 class WaitTimeAccumulator
1019 WaitTimeAccumulator(const char* compare_name,
1020 const char* stat_, const char* stat_key,
1021 const T final_, const time_t wait_time_in_secs)
1022 : compareName(compare_name),
1026 maxWaitTime(wait_time_in_secs * 1000 * 1000),
1027 totalSleepTime(0) {}
1029 void incrementAndAbortIfLimitReached(const useconds_t sleep_time)
1031 totalSleepTime += sleep_time;
1032 if (totalSleepTime >= maxWaitTime) {
1033 std::cerr << "Exceeded maximum wait time of " << maxWaitTime
1034 << "s waiting for stat '" << stat;
1035 if (statKey != NULL) {
1036 std::cerr << "(" << statKey << ")";
1038 std::cerr << "' " << compareName << " " << final << " - aborting."
1045 const char* compareName;
1047 const char* statKey;
1049 const useconds_t maxWaitTime;
1050 useconds_t totalSleepTime;
1054 void wait_for_stat_change(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1055 const char *stat, int initial,
1056 const char *statkey, const time_t wait_time) {
1057 useconds_t sleepTime = 128;
1058 WaitTimeAccumulator<int> accumulator("to change from", stat, statkey,
1059 initial, wait_time);
1060 while (get_int_stat(h, h1, stat, statkey) == initial) {
1061 accumulator.incrementAndAbortIfLimitReached(sleepTime);
1062 decayingSleep(&sleepTime);
1066 void wait_for_stat_to_be(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1067 const char *stat, int final, const char* stat_key,
1068 const time_t wait_time) {
1069 useconds_t sleepTime = 128;
1070 WaitTimeAccumulator<int> accumulator("to be", stat, stat_key, final,
1072 while (get_int_stat(h, h1, stat, stat_key) != final) {
1073 accumulator.incrementAndAbortIfLimitReached(sleepTime);
1074 decayingSleep(&sleepTime);
1078 void wait_for_stat_to_be_gte(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1079 const char *stat, int final,
1080 const char* stat_key, const time_t wait_time) {
1081 useconds_t sleepTime = 128;
1082 WaitTimeAccumulator<int> accumulator("to be greater or equal than", stat,
1083 stat_key, final, wait_time);
1084 while (get_int_stat(h, h1, stat, stat_key) < final) {
1085 accumulator.incrementAndAbortIfLimitReached(sleepTime);
1086 decayingSleep(&sleepTime);
1090 void wait_for_stat_to_be_lte(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1091 const char *stat, int final,
1092 const char* stat_key,
1093 const time_t max_wait_time_in_secs) {
1094 useconds_t sleepTime = 128;
1095 WaitTimeAccumulator<int> accumulator("to be less than or equal to", stat,
1097 max_wait_time_in_secs);
1098 while (get_int_stat(h, h1, stat, stat_key) > final) {
1099 accumulator.incrementAndAbortIfLimitReached(sleepTime);
1100 decayingSleep(&sleepTime);
1104 void wait_for_str_stat_to_be(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1105 const char *stat, const char* final,
1106 const char* stat_key, const time_t wait_time) {
1107 useconds_t sleepTime = 128;
1108 WaitTimeAccumulator<const char*> accumulator("to be", stat, stat_key,
1110 while (get_str_stat(h, h1, stat, stat_key).compare(final) != 0) {
1111 accumulator.incrementAndAbortIfLimitReached(sleepTime);
1112 decayingSleep(&sleepTime);
1116 void wait_for_memory_usage_below(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1117 int mem_threshold, const time_t wait_time) {
1118 useconds_t sleepTime = 128;
1119 WaitTimeAccumulator<int> accumulator("to be below", "mem_used", NULL,
1120 mem_threshold, wait_time);
1121 while (get_int_stat(h, h1, "mem_used") > mem_threshold) {
1122 accumulator.incrementAndAbortIfLimitReached(sleepTime);
1123 decayingSleep(&sleepTime);
1127 bool wait_for_warmup_complete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1128 useconds_t sleepTime = 128;
1129 while (h1->get_stats(h, NULL, "warmup", 6, add_stats) == ENGINE_SUCCESS) {
1130 std::string s = vals["ep_warmup_thread"];
1131 if (strcmp(s.c_str(), "complete") == 0) {
1134 decayingSleep(&sleepTime);
1140 void wait_for_flusher_to_settle(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1141 useconds_t sleepTime = 128;
1142 while (get_int_stat(h, h1, "ep_queue_size") > 0) {
1143 decayingSleep(&sleepTime);
1147 void wait_for_persisted_value(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1148 const char *key, const char *val,
1149 uint16_t vbucketId) {
1152 int commitNum = get_int_stat(h, h1, "ep_commit_num");
1153 check(ENGINE_SUCCESS == store(h, h1, NULL, OPERATION_SET, key, val, &i, 0,
1155 "Failed to store an item.");
1157 // Wait for persistence...
1158 wait_for_flusher_to_settle(h, h1);
1159 wait_for_stat_change(h, h1, "ep_commit_num", commitNum);
1160 h1->release(h, NULL, i);
1163 void dcp_step(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const void* cookie) {
1164 struct dcp_message_producers* producers = get_dcp_producers();
1165 ENGINE_ERROR_CODE err = h1->dcp.step(h, cookie, producers);
1166 check(err == ENGINE_SUCCESS || err == ENGINE_WANT_MORE,
1167 "Expected success or engine_want_more");
1168 if (err == ENGINE_SUCCESS) {
1174 void set_degraded_mode(ENGINE_HANDLE *h,
1175 ENGINE_HANDLE_V1 *h1,
1179 protocol_binary_request_header *pkt;
1181 pkt = createPacket(PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC, 0, 0);
1183 pkt = createPacket(PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC, 0, 0);
1186 ENGINE_ERROR_CODE errcode = h1->unknown_command(h, NULL, pkt, add_response);
1187 if (errcode != ENGINE_SUCCESS) {
1188 std::cerr << "Failed to set degraded mode to " << enable
1189 << ". api call return engine code: " << errcode << std::endl;
1193 if (last_status != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
1194 std::cerr << "Failed to set degraded mode to " << enable
1195 << ". protocol code: " << last_status << std::endl;
1196 if (last_body.size() > 0) {
1197 std::cerr << "\tBody: [" << last_body << "]" << std::endl;