MB-23863: Make DCP backfill read deleted documents 09/76709/4
authorJim Walker <jim@couchbase.com>
Tue, 18 Apr 2017 09:12:31 +0000 (10:12 +0100)
committerDave Rigby <daver@couchbase.com>
Tue, 18 Apr 2017 12:14:05 +0000 (12:14 +0000)
recordDbDump is invoked for KVStore::scan (backfill) and was coded to
skip opening of the document if the docinfo says deleted.

This commit then removes the error case where
couchstore_open_doc_with_docinfo returns -5
(COUCHSTORE_ERROR_DOC_NOT_FOUND) as that just means the document has
no-value and we still need to continue to creating the Item.

Change-Id: I6e2e563ef68f9bc4404c5e59480f8c6fb2dd36e4
Reviewed-on: http://review.couchbase.org/76709
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
src/couch-kvstore/couch-kvstore.cc
tests/ep_test_apis.cc
tests/ep_test_apis.h
tests/ep_testsuite_dcp.cc
tests/mock/mock_dcp.cc

index 873ed66..00c15b3 100644 (file)
@@ -1708,7 +1708,7 @@ int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx) {
 
     auto metadata = MetaDataFactory::createMetaData(docinfo->rev_meta);
 
-    if (sctx->valFilter != ValueFilter::KEYS_ONLY && !docinfo->deleted) {
+    if (sctx->valFilter != ValueFilter::KEYS_ONLY) {
         couchstore_open_options openOptions = 0;
 
         /**
@@ -1748,7 +1748,7 @@ int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx) {
                 // No data, it cannot have a datatype!
                 metadata->setDataType(PROTOCOL_BINARY_RAW_BYTES);
             }
-        } else {
+        } else if (errCode != COUCHSTORE_ERROR_DOC_NOT_FOUND) {
             sctx->logger->log(EXTENSION_LOG_WARNING,
                               "CouchKVStore::recordDbDump: "
                               "couchstore_open_doc_with_docinfo error:%s [%s], "
index 6f1fee4..8ca4092 100644 (file)
@@ -1551,10 +1551,15 @@ void validate_store_resp(ENGINE_ERROR_CODE ret, int& num_items)
     }
 }
 
-void write_items(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, int num_items,
-                 int start_seqno, const char *key_prefix, const char *value,
-                 uint32_t expiry, uint16_t vb)
-{
+void write_items(ENGINE_HANDLE* h,
+                 ENGINE_HANDLE_V1* h1,
+                 int num_items,
+                 int start_seqno,
+                 const char* key_prefix,
+                 const char* value,
+                 uint32_t expiry,
+                 uint16_t vb,
+                 DocumentState docState) {
     int j = 0;
     while (1) {
         if (j == num_items) {
@@ -1562,9 +1567,18 @@ void write_items(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, int num_items,
         }
         item *i = nullptr;
         std::string key(key_prefix + std::to_string(j + start_seqno));
-        ENGINE_ERROR_CODE ret = store(h, h1, nullptr, OPERATION_SET,
-                                      key.c_str(), value, &i, /*cas*/0, vb,
-                                      expiry);
+        ENGINE_ERROR_CODE ret = store(h,
+                                      h1,
+                                      nullptr,
+                                      OPERATION_SET,
+                                      key.c_str(),
+                                      value,
+                                      &i,
+                                      /*cas*/ 0,
+                                      vb,
+                                      expiry,
+                                      0,
+                                      docState);
         h1->release(h, nullptr, i);
         validate_store_resp(ret, j);
     }
index 1ec7b8d..6b679e5 100644 (file)
@@ -494,11 +494,17 @@ uint64_t get_CAS(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
  * @param value Value for each item
  * @param expiry Expiration time for each item.
  * @param vb vbucket to use, default to 0
+ * @param docState document state to write
  */
-void write_items(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
-                 int num_items, int start_seqno = 0,
-                 const char *key_prefix = "key", const char *value = "data",
-                 uint32_t expiry = 0, uint16_t vb = 0);
+void write_items(ENGINE_HANDLE* h,
+                 ENGINE_HANDLE_V1* h1,
+                 int num_items,
+                 int start_seqno = 0,
+                 const char* key_prefix = "key",
+                 const char* value = "data",
+                 uint32_t expiry = 0,
+                 uint16_t vb = 0,
+                 DocumentState docState = DocumentState::Alive);
 
 /* Helper function to write unique items starting from keyXX until memory usage
    hits "mem_thresh_perc" (XX is start_seqno) */
index a6d61d4..f489c6b 100644 (file)
@@ -78,8 +78,8 @@ public:
           live_frontend_client(false),
           skip_verification(false),
           exp_err(ENGINE_SUCCESS),
-          exp_rollback(0)
-    {
+          exp_rollback(0),
+          expected_values(0) {
         seqno = {0, static_cast<uint64_t>(~0)};
         snapshot = {0, static_cast<uint64_t>(~0)};
     }
@@ -125,6 +125,8 @@ public:
     ENGINE_ERROR_CODE exp_err;
     /* Expected rollback seqno */
     uint64_t exp_rollback;
+    /* Expected number of values (from mutations or deleted_values) */
+    size_t expected_values;
 };
 
 class TestDcpConsumer {
@@ -189,7 +191,9 @@ private:
               last_by_seqno(0),
               extra_takeover_ops(0),
               exp_disk_snapshot(false),
-              exp_conflict_res(0) { }
+              exp_conflict_res(0),
+              num_values(0) {
+        }
 
         size_t num_mutations;
         size_t num_deletions;
@@ -202,6 +206,7 @@ private:
         size_t extra_takeover_ops;
         bool exp_disk_snapshot;
         uint8_t exp_conflict_res;
+        size_t num_values;
     };
 
     /* Connection name */
@@ -291,6 +296,10 @@ void TestDcpConsumer::run(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
                                    dcp_last_opaque);
                     }
 
+                    if (!dcp_last_value.empty()) {
+                        stats.num_values++;
+                    }
+
                     break;
                 case PROTOCOL_BINARY_CMD_DCP_DELETION:
                     cb_assert(vbid != static_cast<uint16_t>(-1));
@@ -308,6 +317,10 @@ void TestDcpConsumer::run(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
                                    dcp_last_opaque);
                     }
 
+                    if (!dcp_last_value.empty()) {
+                        stats.num_values++;
+                    }
+
                     break;
                 case PROTOCOL_BINARY_CMD_DCP_STREAM_END:
                     cb_assert(vbid != static_cast<uint16_t>(-1));
@@ -458,6 +471,11 @@ void TestDcpConsumer::run(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
                         get_ull_stat(h, h1, stats_ready_queue_memory.str().c_str(), "dcp"),
                         "readyQ size did not go to zero");
             }
+            if (ctx.expected_values) {
+                checkeq(ctx.expected_values,
+                        stats.num_values,
+                        "Expected values didn't match");
+            }
         }
     }
 
@@ -1824,6 +1842,42 @@ static enum test_result test_dcp_producer_stream_req_full(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+/*
+ * Test that deleted items (with values) backfill correctly
+ */
+static enum test_result test_dcp_producer_deleted_item_backfill(
+        ENGINE_HANDLE* h, ENGINE_HANDLE_V1* h1) {
+    const int deletions = 10;
+    write_items(h,
+                h1,
+                deletions,
+                0,
+                "del",
+                "value",
+                0 /*exp*/,
+                0 /*vb*/,
+                DocumentState::Deleted);
+    wait_for_flusher_to_settle(h, h1);
+
+    const void* cookie = testHarness.create_cookie();
+
+    DcpStreamCtx ctx;
+    ctx.vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    ctx.seqno = {0, deletions};
+    ctx.exp_deletions = deletions;
+    ctx.expected_values = deletions;
+    ctx.flags |= DCP_ADD_STREAM_FLAG_DISKONLY;
+    ctx.exp_markers = 1;
+
+    TestDcpConsumer tdc("unittest", cookie);
+    tdc.addStreamCtx(ctx);
+    tdc.run(h, h1);
+
+    testHarness.destroy_cookie(cookie);
+
+    return SUCCESS;
+}
+
 static enum test_result test_dcp_producer_stream_req_backfill(ENGINE_HANDLE *h,
                                                           ENGINE_HANDLE_V1 *h1) {
     const int num_items = 400, batch_items = 200;
@@ -6106,5 +6160,9 @@ BaseTestCase testsuite_testcases[] = {
         TestCase("test_set_dcp_param",
                  test_set_dcp_param, test_setup, teardown, NULL,
                  prepare, cleanup),
+        TestCase("test MB-23863 backfill deleted value",
+                 test_dcp_producer_deleted_item_backfill, test_setup, teardown,
+                 NULL, prepare_ep_bucket, cleanup),
+
         TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
 };
index 28939f4..55a6590 100644 (file)
@@ -214,7 +214,10 @@ static ENGINE_ERROR_CODE mock_deletion(const void* cookie,
     dcp_last_byseqno = by_seqno;
     dcp_last_revseqno = rev_seqno;
     dcp_last_meta.assign(static_cast<const char*>(meta), nmeta);
-    dcp_last_packet_size = 42 + dcp_last_key.length() + nmeta;
+    dcp_last_packet_size =
+            42 + dcp_last_key.length() + item->getNBytes() + nmeta;
+    dcp_last_value.assign(static_cast<const char*>(item->getData()),
+                          item->getNBytes());
 
     if (engine_handle_v1 && engine_handle) {
         engine_handle_v1->release(engine_handle, nullptr, item);