return SUCCESS;
}
+struct mb19982_ctx {
+ mb19982_ctx(ENGINE_HANDLE* h, ENGINE_HANDLE_V1* h1, int iterations) {
+ this->h = h;
+ this->h1 = h1;
+ this->iterations = iterations;
+ }
+ ENGINE_HANDLE* h;
+ ENGINE_HANDLE_V1* h1;
+ int iterations;
+};
+
+void mb19982_add_stat(const char *key, const uint16_t klen, const char *val,
+ const uint32_t vlen, const void *cookie) {
+ // do nothing
+}
+
+extern "C" {
+ static void mb19982_thread_func(void *args) {
+ struct mb19982_ctx *ctx = static_cast<mb19982_ctx *>(args);
+ for (int ii = 0; ii < ctx->iterations; ii++) {
+ checkeq(ctx->h1->get_stats(ctx->h, NULL, "dcp", 3, &mb19982_add_stat),
+ ENGINE_SUCCESS, "failed get_stats(dcp)");
+ }
+ }
+}
+
+/*
+ * This test creates a DCP consumer on a replica VB and then from a second thread
+ * fires get_stats("dcp") whilst the main thread changes VB state from
+ * replica->active->replica (and so on).
+ * MB-19982 idenified a lock inversion between these two functional paths and this
+ * test proves and protects the issue.
+ */
+static enum test_result test_mb19982(ENGINE_HANDLE *h,
+ ENGINE_HANDLE_V1 *h1) {
+
+ // Load up vb0 with num_items
+ int num_items = 1000;
+ int iterations = 1000; // how many stats calls
+
+ struct mb19982_ctx ctx(h, h1, iterations);
+ cb_thread_t cp_thread;
+ const void *cookie = testHarness.create_cookie();
+ uint32_t opaque = 0xFFFF0000;
+ uint32_t flags = 0;
+ std::string name = "unittest";
+ // Switch to replica
+ check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
+ "Failed to set vbucket state.");
+
+ // Open consumer connection
+ checkeq(h1->dcp.open(h, cookie, opaque, 0, flags,
+ (void*)name.c_str(), name.length()),
+ ENGINE_SUCCESS,
+ "Failed dcp Consumer open connection.");
+
+ add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
+ PROTOCOL_BINARY_RESPONSE_SUCCESS);
+
+ cb_assert(cb_create_thread(&cp_thread,
+ mb19982_thread_func,
+ &ctx, 0) == 0);
+
+ uint32_t stream_opaque = get_int_stat(h, h1,
+ "eq_dcpq:unittest:stream_0_opaque",
+ "dcp");
+
+ for (int i = 1; i <= num_items; i++) {
+ std::stringstream ss;
+ ss << "key-" << i;
+ checkeq(h1->dcp.snapshot_marker(h, cookie,
+ stream_opaque, 0/*vbid*/,
+ num_items, num_items + i, 2),
+ ENGINE_SUCCESS,
+ "Failed to send snapshot marker");
+ checkeq(h1->dcp.mutation(h, cookie, stream_opaque,
+ ss.str().c_str(), ss.str().length(),
+ "value", 5, i * 3, 0, 0, 0,
+ i + num_items, i + num_items,
+ 0, 0, "", 0, INITIAL_NRU_VALUE),
+ ENGINE_SUCCESS,
+ "Failed to send dcp mutation");
+
+ // And flip VB state (this can have a lock inversion with stats)
+ checkeq(h1->dcp.set_vbucket_state(h, cookie, stream_opaque, 0, vbucket_state_active),
+ ENGINE_SUCCESS, "failed to change to active");
+ checkeq(h1->dcp.set_vbucket_state(h, cookie, stream_opaque, 0, vbucket_state_replica),
+ ENGINE_SUCCESS, "failed to change to replica");
+ }
+
+ cb_assert(cb_join_thread(cp_thread) == 0);
+ testHarness.destroy_cookie(cookie);
+ return SUCCESS;
+}
+
static enum test_result prepare(engine_test_t *test) {
#ifdef __sun
// Some of the tests doesn't work on Solaris.. Don't know why yet..
test_mb19635_upgrade_from_25x, test_setup, teardown, NULL,
prepare, cleanup),
+ TestCase("test MB-19982", test_mb19982,
+ test_setup, teardown, NULL, prepare, cleanup),
+
TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
};