return SUCCESS;
}
+// Regression test for MB-17517 - ensure that if an item is locked when TAP
+// attempts to stream it, it doesn't get a CAS of -1.
+static enum test_result test_mb17517_tap_with_locked_key(ENGINE_HANDLE *h,
+ ENGINE_HANDLE_V1 *h1) {
+ const uint16_t vbid = 0;
+ // Store an item and immediately lock it.
+ item *it = NULL;
+ std::string key("key");
+ checkeq(store(h, h1, NULL, OPERATION_SET, key.c_str(), "value",
+ &it, 0, vbid, 3600, PROTOCOL_BINARY_RAW_BYTES),
+ ENGINE_SUCCESS,
+ "Failed to store an item.");
+ h1->release(h, NULL, it);
+
+ uint32_t lock_timeout = 10;
+ getl(h, h1, key.c_str(), vbid, lock_timeout);
+ checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status,
+ "Expected to be able to getl on first try");
+
+ wait_for_flusher_to_settle(h, h1);
+
+ // Create the TAP connection and try to get the items.
+ const void *cookie = testHarness.create_cookie();
+ std::string name("test_mb17517_tap_with_locked_key");
+ TAP_ITERATOR iter = h1->get_tap_iterator(h, cookie, name.c_str(),
+ name.length(),
+ TAP_CONNECT_FLAG_DUMP, NULL, 0);
+ check(iter != NULL, "Failed to create a tap iterator");
+
+ void *engine_specific;
+ uint16_t nengine_specific;
+ uint8_t ttl;
+ uint16_t flags;
+ uint32_t seqno;
+ uint16_t vbucket;
+ tap_event_t event;
+
+ uint16_t unlikely_vbucket_identifier = 17293;
+
+ do {
+ vbucket = unlikely_vbucket_identifier;
+ event = iter(h, cookie, &it, &engine_specific,
+ &nengine_specific, &ttl, &flags,
+ &seqno, &vbucket);
+
+ switch (event) {
+ case TAP_PAUSE:
+ testHarness.waitfor_cookie(cookie);
+ break;
+ case TAP_OPAQUE:
+ case TAP_NOOP:
+ break;
+ case TAP_MUTATION: {
+ testHarness.unlock_cookie(cookie);
+
+ item_info info;
+ info.nvalue = 1;
+ if (!h1->get_item_info(h, NULL, it, &info)) {
+ fprintf(stderr, "test_mb17517_tap_with_locked_key: "
+ "get_item_info failed\n");
+ return FAIL;
+ }
+
+ // Check the CAS.
+ if (info.cas == ~0ull) {
+ fprintf(stderr, "test_mb17517_tap_with_locked_key: "
+ "Got CAS of -1 in TAP_MUTATION\n");
+ return FAIL;
+ }
+
+ testHarness.lock_cookie(cookie);
+ break;
+ }
+ case TAP_DISCONNECT:
+ break;
+ default:
+ std::cerr << "Unexpected event: " << event << std::endl;
+ return FAIL;
+ }
+
+ } while (event != TAP_DISCONNECT);
+
+ testHarness.unlock_cookie(cookie);
+ testHarness.destroy_cookie(cookie);
+
+ return SUCCESS;
+}
+
+static void force_vbstate_to_25x(std::string dbname, int vbucket) {
+ std::string filename = dbname +
+ DIRECTORY_SEPARATOR_CHARACTER +
+ std::to_string(vbucket) +
+ ".couch.1";
+ Db* handle;
+ couchstore_error_t err = couchstore_open_db(filename.c_str(),
+ COUCHSTORE_OPEN_FLAG_CREATE,
+ &handle);
+
+ checkeq(COUCHSTORE_SUCCESS, err, "Failed to open new database");
+
+ // Create 2.5 _local/vbstate
+ std::string vbstate2_5_x ="{\"state\": \"active\","
+ " \"checkpoint_id\": \"1\","
+ " \"max_deleted_seqno\": \"0\"}";
+ LocalDoc vbstate;
+ vbstate.id.buf = (char *)"_local/vbstate";
+ vbstate.id.size = sizeof("_local/vbstate") - 1;
+ vbstate.json.buf = (char *)vbstate2_5_x.c_str();
+ vbstate.json.size = vbstate2_5_x.size();
+ vbstate.deleted = 0;
+
+ err = couchstore_save_local_document(handle, &vbstate);
+ checkeq(COUCHSTORE_SUCCESS, err, "Failed to write local document");
+ couchstore_commit(handle);
+ couchstore_close_db(handle);
+}
+
+// Regression test for MB-19635
+// Check that warming up from a 2.x couchfile doesn't end up with a UUID of 0
+// we warmup 2 vbuckets and ensure they get unique IDs.
+static enum test_result test_mb19635_upgrade_from_25x(ENGINE_HANDLE *h,
+ ENGINE_HANDLE_V1 *h1) {
+ std::string dbname = dbname_env;
+
+ force_vbstate_to_25x(dbname, 0);
+ force_vbstate_to_25x(dbname, 1);
+
+ // Now shutdown engine force and restart to warmup from the 2.5.x data.
+ testHarness.reload_engine(&h, &h1,
+ testHarness.engine_path,
+ testHarness.get_current_testcase()->cfg,
+ true, false);
+ wait_for_warmup_complete(h, h1);
+ uint64_t vb_uuid0 = get_ull_stat(h, h1, "vb_0:uuid", "vbucket-details");
+ uint64_t vb_uuid1 = get_ull_stat(h, h1, "vb_1:uuid", "vbucket-details");
+ checkne(vb_uuid0, vb_uuid1, "UUID is not unique");
+ return SUCCESS;
+}
+
+static enum test_result test_set_dcp_param(ENGINE_HANDLE *h,
+ ENGINE_HANDLE_V1 *h1)
+{
+ auto func = [h, h1](std::string key, size_t newValue, bool expectedSetParam){
+ std::string statKey = "ep_" + key;
+ size_t param = get_int_stat(h,
+ h1,
+ statKey.c_str());
+ std::string value = std::to_string(newValue);
+ check(expectedSetParam == set_param(h, h1,
+ protocol_binary_engine_param_dcp,
+ key.c_str(),
+ value.c_str()),
+ "Set param not expected");
+ check(newValue != param,
+ "Forcing failure as nothing will change");
+
+ if (expectedSetParam) {
+ checkeq(newValue,
+ size_t(get_int_stat(h,
+ h1,
+ statKey.c_str())),
+ "Incorrect dcp param value after calling set_param");
+ }
+ };
+
+ func("dcp_consumer_process_buffered_messages_yield_limit", 1000, true);
+ func("dcp_consumer_process_buffered_messages_batch_size", 1000, true);
+ func("dcp_consumer_process_buffered_messages_yield_limit", 0, false);
+ func("dcp_consumer_process_buffered_messages_batch_size", 0, false);
+ return SUCCESS;
+}
+
+
+/*
+ * Test MB-18452
+ * Drive DCP consumer by halting all NONIO tasks
+ * Writing numItems mutations (they get buffered)
+ * Then trigger the NONIO tasks, which will trigger the DCP consumer
+ * to consume the buffered items.
+ * If the DCP consumer is friendly and not hogging the NONIO threads
+ * we should see it being scheduled many times.
+ * This test function returns the number of times the processor task was
+ * dispatched.
+ */
+static int test_mb18452(ENGINE_HANDLE *h,
+ ENGINE_HANDLE_V1 *h1,
+ size_t numItems,
+ size_t yieldValue,
+ size_t batchSize) {
+
+ // 1. Setup the consumer params.
+ std::string value = std::to_string(yieldValue);
+ set_param(h, h1, protocol_binary_engine_param_dcp,
+ "dcp_consumer_process_buffered_messages_yield_limit",
+ value.c_str());
+ value = std::to_string(batchSize);
+ set_param(h, h1, protocol_binary_engine_param_dcp,
+ "dcp_consumer_process_buffered_messages_batch_size",
+ value.c_str());
+
+ const uint16_t vbid = 0;
+ const uint32_t opaque = 0xFFFF0000;
+ const uint32_t flags = 0;
+ const void* cookie = testHarness.create_cookie();
+
+ // 2. We need to use a replica
+ check(set_vbucket_state(h, h1, vbid, vbucket_state_replica),
+ "Failed to set vbucket state.");
+
+ // 3. Force the engine to not run any NONIO tasks whilst we 'load up'
+ set_param(h, h1, protocol_binary_engine_param_flush,
+ "max_num_nonio",
+ "0");
+
+ // 4. Create a consumer and one stream for the vbucket
+ std::string consumer("unittest");
+ checkeq(h1->dcp.open(h,
+ cookie,
+ opaque,
+ 0/*seqno*/,
+ flags,
+ (void*)consumer.c_str(),
+ consumer.length()),
+ ENGINE_SUCCESS,
+ "Failed dcp Consumer open connection.");
+ add_stream_for_consumer(h, h1, cookie, opaque + 1, vbid, flags,
+ PROTOCOL_BINARY_RESPONSE_SUCCESS);
+
+ uint32_t stream_opaque = get_int_stat(h, h1,
+ "eq_dcpq:unittest:stream_0_opaque",
+ "dcp");
+ checkeq(ENGINE_SUCCESS,
+ h1->dcp.snapshot_marker(h,
+ cookie,
+ stream_opaque,
+ vbid,
+ 1,//snap start
+ numItems,//snap end
+ 2), //flags
+ "Failed to send snapshot marker");
+
+ for (uint64_t seqno = 1; seqno <= numItems; seqno++) {
+ std::string key = "key" + std::to_string(seqno);
+ checkeq(ENGINE_SUCCESS,
+ h1->dcp.mutation(h,
+ cookie,
+ stream_opaque,
+ key.c_str(),
+ key.length(),
+ "value", // item value
+ sizeof("value"), // item value length
+ seqno, // cas
+ vbid, // vbucket
+ 0, // flags
+ PROTOCOL_BINARY_RAW_BYTES,
+ seqno, // bySeqno
+ 1, // revSeqno
+ 0, // expiration
+ 0, // locktime
+ "", //meta
+ 0, // metalen
+ INITIAL_NRU_VALUE),
+ "Failed to send dcp mutation");
+
+ // At n - 1, enable NONIO tasks, the nth mutation will wake up the task.
+ if (seqno == (numItems - 1)) {
+ set_param(h, h1, protocol_binary_engine_param_flush,
+ "max_num_nonio",
+ "1");
+ }
+ }
+
+ wait_for_stat_to_be(h, h1, "vb_replica_curr_items", numItems);
+
+ // 3. Force the engine to not run any NONIO tasks whilst we 'count up'
+ set_param(h, h1, protocol_binary_engine_param_flush,
+ "max_num_nonio",
+ "0");
+
+ // Now we should count how many times the NONIO task ran
+ // This is slighly racy, but if the task is yielding we expect many
+ // runs from it, not a small number
+ check(h1->get_stats(h, NULL, "dispatcher",
+ strlen("dispatcher"), add_stats) == ENGINE_SUCCESS,
+ "Failed to get worker stats");
+
+ // Count up how many times the Processing task was logged
+ int count = 0;
+ const std::string key1 = "nonio_worker_";
+ const std::string key2 = "Processing buffered items for eq_dcpq:unittest";
+ for (auto kv : vals) {
+ if (kv.first.find(key1) != std::string::npos &&
+ kv.second.find(key2) != std::string::npos) {
+ count++;
+ }
+ }
+
+ // 4. Re-enable NONIO so we can shutdown
+ set_param(h, h1, protocol_binary_engine_param_flush,
+ "max_num_nonio",
+ "1");
+ return count;
+}
+
+/**
+ * Test the behaviour of DCP consumer under load.
+ * The consumer use a NONIO task to process data from an input buffer.
+ * This task when given lots of data should voluntarily yield if it finds
+ * itself running for n iterations...
+ */
+static enum test_result test_mb18452_smallYield(ENGINE_HANDLE* h,
+ ENGINE_HANDLE_V1* h1) {
+ const int batchSize = 10;
+ const int numItems = 1000;
+ const int yield = 10;
+
+ int processorRuns = test_mb18452(h, h1, numItems, yield, batchSize);
+
+ // Before the ep-engine updates, the processor run count was usually 1 or 2
+ // with the fix it's up around 80 (appears to saturate the log).
+
+ // So we check that it ran the same or more times than the numItems/(yield*batch)
+ check(processorRuns >= (numItems / (yield * batchSize)),
+ "DCP Processor ran less times than expected.");
+ return SUCCESS;
+}
+
+static enum test_result test_mb18452_largeYield(ENGINE_HANDLE* h,
+ ENGINE_HANDLE_V1* h1) {
+ const int batchSize = 10;
+ const int numItems = 10000;
+ const int yield = 10000;
+ int processorRuns = test_mb18452(h, h1, numItems, yield, batchSize);
+ // Here we expect very few yields, so very few runs (definitely not enough to fill
+ // the task log (TASK_LOG_SIZE)
+ check(processorRuns < 80,
+ "DCP Processor ran more times than expected.");
+
+
+ return SUCCESS;
+}
+
+ /**
+ * This test demonstrates bucket shutdown when there is a rogue
+ * backfill (whose producer and stream are already closed).
+ */
+ static enum test_result test_mb19153(ENGINE_HANDLE *h,
+ ENGINE_HANDLE_V1 *h1) {
+
+ putenv(strdup("ALLOW_NO_STATS_UPDATE=yeah"));
+
+ // Set max num AUX IO to 0, so no backfill would start
+ // immediately
+ set_param(h, h1, protocol_binary_engine_param_flush,
+ "max_num_auxio", "0");
+
+ int num_items = 10000;
+
+ for (int j = 0; j < num_items; ++j) {
+ item *i = NULL;
+ std::stringstream ss;
+ ss << "key-" << j;
+ check(store(h, h1, NULL, OPERATION_SET,
+ ss.str().c_str(), "data", &i, 0, 0, 0, 0)
+ == ENGINE_SUCCESS, "Failed to store a value");
+
+ h1->release(h, NULL, i);
+ }
+
+ const void *cookie = testHarness.create_cookie();
+ uint32_t flags = DCP_OPEN_PRODUCER;
+ const char *name = "unittest";
+
+ uint32_t opaque = 1;
+ uint64_t start = 0;
+ uint64_t end = num_items;
+
+ // Setup a producer connection
+ checkeq(ENGINE_SUCCESS,
+ h1->dcp.open(h, cookie, ++opaque, 0, flags,
+ (void*)name, strlen(name)),
+ "Failed dcp Consumer open connection.");
+
+ // Initiate a stream request
+ uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+ uint64_t rollback = 0;
+ checkeq(ENGINE_SUCCESS,
+ h1->dcp.stream_req(h, cookie, 0, opaque, 0, start, end,
+ vb_uuid, 0, 0,
+ &rollback, mock_dcp_add_failover_log),
+ "Expected success");
+
+ // Disconnect the producer
+ testHarness.destroy_cookie(cookie);
+
+ // Wait for ConnManager to clear out dead connections from dcpConnMap
+ wait_for_stat_to_be(h, h1, "ep_dcp_dead_conn_count", 0, "dcp");
+
+ // Set auxIO threads to 1, so the backfill for the closed producer
+ // is picked up, and begins to run.
+ set_param(h, h1, protocol_binary_engine_param_flush,
+ "max_num_auxio", "1");
+
+ // Terminate engine
+ return SUCCESS;
+ }
+
static enum test_result prepare(engine_test_t *test) {
#ifdef __sun
// Some of the tests doesn't work on Solaris.. Don't know why yet..
TestCase("test MB-16357", test_mb16357,
test_setup, teardown, "compaction_exp_mem_threshold=85",
prepare, cleanup),
+ TestCase("test MB-19153", test_mb19153,
+ test_setup, teardown, NULL, prepare, cleanup),
+
TestCase("test dcp early termination", test_dcp_early_termination,
test_setup, teardown, NULL, prepare, cleanup),
+
+ TestCase("test MB-17517 CAS -1 DCP", test_mb17517_cas_minus_1_dcp,
+ test_setup, teardown, NULL, prepare, cleanup),
+
+ TestCase("test MB-17517 CAS -1 TAP", test_mb17517_cas_minus_1_tap,
+ test_setup, teardown, NULL, prepare, cleanup),
+
+ TestCase("test_mb17517_tap_with_locked_key",
+ test_mb17517_tap_with_locked_key, test_setup, teardown, NULL,
+ prepare, cleanup),
+
+ TestCase("test_mb19635_upgrade_from_25x",
+ test_mb19635_upgrade_from_25x, test_setup, teardown, NULL,
+ prepare, cleanup),
+
+ TestCase("test_set_dcp_param",
+ test_set_dcp_param, test_setup, teardown, NULL,
+ prepare, cleanup),
+
+ TestCase("test_mb18452_largeYield",
+ test_mb18452_largeYield, test_setup, teardown, "max_num_nonio=1",
+ prepare, cleanup),
+
+ TestCase("test_mb18452_smallYield",
+ test_mb18452_smallYield, test_setup, teardown, "max_num_nonio=1",
+ prepare, cleanup),
+
TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
};