}
}
-void write_items(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, int num_items,
- int start_seqno, const char *key_prefix, const char *value,
- uint32_t expiry, uint16_t vb)
-{
+void write_items(ENGINE_HANDLE* h,
+ ENGINE_HANDLE_V1* h1,
+ int num_items,
+ int start_seqno,
+ const char* key_prefix,
+ const char* value,
+ uint32_t expiry,
+ uint16_t vb,
+ DocumentState docState) {
int j = 0;
while (1) {
if (j == num_items) {
}
item *i = nullptr;
std::string key(key_prefix + std::to_string(j + start_seqno));
- ENGINE_ERROR_CODE ret = store(h, h1, nullptr, OPERATION_SET,
- key.c_str(), value, &i, /*cas*/0, vb,
- expiry);
+ ENGINE_ERROR_CODE ret = store(h,
+ h1,
+ nullptr,
+ OPERATION_SET,
+ key.c_str(),
+ value,
+ &i,
+ /*cas*/ 0,
+ vb,
+ expiry,
+ 0,
+ docState);
h1->release(h, nullptr, i);
validate_store_resp(ret, j);
}
* @param value Value for each item
* @param expiry Expiration time for each item.
* @param vb vbucket to use, default to 0
+ * @param docState document state to write
*/
-void write_items(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
- int num_items, int start_seqno = 0,
- const char *key_prefix = "key", const char *value = "data",
- uint32_t expiry = 0, uint16_t vb = 0);
+void write_items(ENGINE_HANDLE* h,
+ ENGINE_HANDLE_V1* h1,
+ int num_items,
+ int start_seqno = 0,
+ const char* key_prefix = "key",
+ const char* value = "data",
+ uint32_t expiry = 0,
+ uint16_t vb = 0,
+ DocumentState docState = DocumentState::Alive);
/* Helper function to write unique items starting from keyXX until memory usage
hits "mem_thresh_perc" (XX is start_seqno) */
live_frontend_client(false),
skip_verification(false),
exp_err(ENGINE_SUCCESS),
- exp_rollback(0)
- {
+ exp_rollback(0),
+ expected_values(0) {
seqno = {0, static_cast<uint64_t>(~0)};
snapshot = {0, static_cast<uint64_t>(~0)};
}
ENGINE_ERROR_CODE exp_err;
/* Expected rollback seqno */
uint64_t exp_rollback;
+ /* Expected number of values (from mutations or deleted_values) */
+ size_t expected_values;
};
class TestDcpConsumer {
last_by_seqno(0),
extra_takeover_ops(0),
exp_disk_snapshot(false),
- exp_conflict_res(0) { }
+ exp_conflict_res(0),
+ num_values(0) {
+ }
size_t num_mutations;
size_t num_deletions;
size_t extra_takeover_ops;
bool exp_disk_snapshot;
uint8_t exp_conflict_res;
+ size_t num_values;
};
/* Connection name */
dcp_last_opaque);
}
+ if (!dcp_last_value.empty()) {
+ stats.num_values++;
+ }
+
break;
case PROTOCOL_BINARY_CMD_DCP_DELETION:
cb_assert(vbid != static_cast<uint16_t>(-1));
dcp_last_opaque);
}
+ if (!dcp_last_value.empty()) {
+ stats.num_values++;
+ }
+
break;
case PROTOCOL_BINARY_CMD_DCP_STREAM_END:
cb_assert(vbid != static_cast<uint16_t>(-1));
get_ull_stat(h, h1, stats_ready_queue_memory.str().c_str(), "dcp"),
"readyQ size did not go to zero");
}
+ if (ctx.expected_values) {
+ checkeq(ctx.expected_values,
+ stats.num_values,
+ "Expected values didn't match");
+ }
}
}
return SUCCESS;
}
+/*
+ * Test that deleted items (with values) backfill correctly
+ */
+static enum test_result test_dcp_producer_deleted_item_backfill(
+ ENGINE_HANDLE* h, ENGINE_HANDLE_V1* h1) {
+ const int deletions = 10;
+ write_items(h,
+ h1,
+ deletions,
+ 0,
+ "del",
+ "value",
+ 0 /*exp*/,
+ 0 /*vb*/,
+ DocumentState::Deleted);
+ wait_for_flusher_to_settle(h, h1);
+
+ const void* cookie = testHarness.create_cookie();
+
+ DcpStreamCtx ctx;
+ ctx.vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+ ctx.seqno = {0, deletions};
+ ctx.exp_deletions = deletions;
+ ctx.expected_values = deletions;
+ ctx.flags |= DCP_ADD_STREAM_FLAG_DISKONLY;
+ ctx.exp_markers = 1;
+
+ TestDcpConsumer tdc("unittest", cookie);
+ tdc.addStreamCtx(ctx);
+ tdc.run(h, h1);
+
+ testHarness.destroy_cookie(cookie);
+
+ return SUCCESS;
+}
+
static enum test_result test_dcp_producer_stream_req_backfill(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
const int num_items = 400, batch_items = 200;
TestCase("test_set_dcp_param",
test_set_dcp_param, test_setup, teardown, NULL,
prepare, cleanup),
+ TestCase("test MB-23863 backfill deleted value",
+ test_dcp_producer_deleted_item_backfill, test_setup, teardown,
+ NULL, prepare_ep_bucket, cleanup),
+
TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
};