MB-16656: Stream a full (disk+mem) snapshot from DCP producer on replica vb 16/57816/7
authorManu Dhundi <manu@couchbase.com>
Mon, 21 Dec 2015 21:42:07 +0000 (13:42 -0800)
committerManu Dhundi <manu@couchbase.com>
Wed, 23 Dec 2015 07:44:09 +0000 (07:44 +0000)
A replica vbucket receives items from an active vbucket, and until a full
snapshot is received the data on the replica vbucket is not consistent due
to de-duplication and other reasons. Hence while streaming items to a DCP
client from a replica vbucket we need to combine backfill and in memory
snapshots and send items in one snapshot. A caveat here is the replica vb
might not have received all the items in the latest (memory) snapshot, so the
DCP client streaming from replica will have to wait till the replica gets
all the items in the latest snapshot from the active.

Change-Id: I4db622f967316d120506dc9b125211578194bb60
Reviewed-on: http://review.couchbase.org/57816
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Manu Dhundi <manu@couchbase.com>
src/dcp-stream.cc
src/dcp-stream.h
src/ep_engine.cc
tests/ep_testsuite.cc

index 4e1ab9a..99f2e66 100644 (file)
@@ -275,7 +275,7 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
        takeoverState(vbucket_state_pending), backfillRemaining(0),
        itemsFromBackfill(0), itemsFromMemory(0), firstMarkerSent(false),
        waitForSnapshot(0), engine(e), producer(p),
-       isBackfillTaskRunning(false) {
+       isBackfillTaskRunning(false), lastSentSnapEndSeqno(0) {
 
     const char* type = "";
     if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
@@ -283,6 +283,18 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
         end_seqno_ = dcpMaxSeqno;
     }
 
+    RCPtr<VBucket> vbucket = engine->getVBucket(vb);
+    if (vbucket) {
+        ReaderLockHolder rlh(vbucket->getStateLock());
+        if (vbucket->getState() == vbucket_state_replica) {
+            uint64_t snapshot_start, snapshot_end;
+            vbucket->getCurrentSnapshot(snapshot_start, snapshot_end);
+            if (snapshot_end > en_seqno) {
+                end_seqno_ = snapshot_end;
+            }
+        }
+    }
+
     if (start_seqno_ >= end_seqno_) {
         endStream(END_STREAM_OK);
         itemsReady.store(true);
@@ -337,6 +349,8 @@ DcpResponse* ActiveStream::next() {
 
 void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
     LockHolder lh(streamMutex);
+    uint64_t chkCursorSeqno = endSeqno;
+
     if (state_ != STREAM_BACKFILLING) {
         return;
     }
@@ -344,21 +358,39 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
     startSeqno = std::min(snap_start_seqno_, startSeqno);
     firstMarkerSent = true;
 
+    RCPtr<VBucket> vb = engine->getVBucket(vb_);
+    if (vb) {
+        ReaderLockHolder rlh(vb->getStateLock());
+        if (vb->getState() == vbucket_state_replica) {
+            if (end_seqno_ > endSeqno) {
+                /* We possibly have items in the open checkpoint
+                   (incomplete snapshot) */
+                LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Merging backfill and "
+                    "memory snapshot for a replica vbucket, start seqno "
+                    "%" PRIu64 " and end seqno %" PRIu64, producer->logHeader(),
+                    vb_, startSeqno, endSeqno);
+                endSeqno = end_seqno_;
+            }
+        }
+    }
+
     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Sending disk snapshot with start "
         "seqno %llu and end seqno %llu", producer->logHeader(), vb_, startSeqno,
         endSeqno);
     pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
-                                   MARKER_FLAG_DISK));
-    RCPtr<VBucket> vb = engine->getVBucket(vb_);
+                                    MARKER_FLAG_DISK));
+    lastSentSnapEndSeqno = endSeqno;
+
     if (!vb) {
         endStream(END_STREAM_STATE);
     } else {
         if (endSeqno > end_seqno_) {
-            endSeqno = end_seqno_;
+            chkCursorSeqno = end_seqno_;
         }
         // Only re-register the cursor if we still need to get memory snapshots
         CursorRegResult result =
-            vb->checkpointManager.registerTAPCursorBySeqno(name_, endSeqno);
+            vb->checkpointManager.registerTAPCursorBySeqno(name_,
+                                                           chkCursorSeqno);
         curChkSeqno = result.first;
     }
 
@@ -643,25 +675,29 @@ void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
         return;
     }
 
-    uint32_t flags = MARKER_FLAG_MEMORY;
-    uint64_t snapStart = items.front()->getBySeqno();
-    uint64_t snapEnd = items.back()->getBySeqno();
+    if (isCurrentSnapshotCompleted()) {
+        uint32_t flags = MARKER_FLAG_MEMORY;
+        uint64_t snapStart = items.front()->getBySeqno();
+        uint64_t snapEnd = items.back()->getBySeqno();
 
-    if (mark) {
-        flags |= MARKER_FLAG_CHK;
-    }
+        if (mark) {
+            flags |= MARKER_FLAG_CHK;
+        }
 
-    if (state_ == STREAM_TAKEOVER_SEND) {
-        waitForSnapshot++;
-        flags |= MARKER_FLAG_ACK;
-    }
+        if (state_ == STREAM_TAKEOVER_SEND) {
+            waitForSnapshot++;
+            flags |= MARKER_FLAG_ACK;
+        }
 
-    if (!firstMarkerSent) {
-        snapStart = std::min(snap_start_seqno_, snapStart);
-        firstMarkerSent = true;
+        if (!firstMarkerSent) {
+            snapStart = std::min(snap_start_seqno_, snapStart);
+            firstMarkerSent = true;
+        }
+        pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
+                                        flags));
+        lastSentSnapEndSeqno = snapEnd;
     }
 
-    pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd, flags));
     std::deque<MutationResponse*>::iterator itemItr;
     for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
         pushToReadyQ(*itemItr);
@@ -714,6 +750,7 @@ void ActiveStream::scheduleBackfill() {
         CursorRegResult result =
             vbucket->checkpointManager.registerTAPCursorBySeqno(name_,
                                                                 lastReadSeqno);
+
         curChkSeqno = result.first;
         bool isFirstItem = result.second;
 
@@ -874,6 +911,20 @@ const char* ActiveStream::logHeader()
     return producer->logHeader();
 }
 
+bool ActiveStream::isCurrentSnapshotCompleted() const
+{
+    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+    if (vbucket) {
+        ReaderLockHolder rlh(vbucket->getStateLock());
+        if (vbucket_state_replica == vbucket->getState()) {
+            if (lastSentSnapEndSeqno >= lastReadSeqno) {
+                return false;
+            }
+        }
+    }
+    return true;
+}
+
 NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
                                const std::string &name, uint32_t flags,
                                uint32_t opaque, uint16_t vb, uint64_t st_seqno,
index c4b8d1b..e94e472 100644 (file)
@@ -231,6 +231,8 @@ private:
 
     const char* getEndStreamStatusStr(end_stream_status_t status);
 
+    bool isCurrentSnapshotCompleted() const;
+
     //! The last sequence number queued from disk or memory
     uint64_t lastReadSeqno;
     //! The last sequence number sent to the network layer
@@ -254,6 +256,9 @@ private:
     EventuallyPersistentEngine* engine;
     dcp_producer_t producer;
     AtomicValue<bool> isBackfillTaskRunning;
+
+    //! Last snapshot end seqno sent to the DCP client
+    uint64_t lastSentSnapEndSeqno;
 };
 
 class NotifierStream : public Stream {
index d90d8cf..5a1903d 100644 (file)
@@ -4219,8 +4219,12 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doSeqnoStats(const void *cookie,
         }
 
         uint64_t relHighSeqno = vb->getHighSeqno();
+
+        ReaderLockHolder rlh(vb->getStateLock());
         if (vb->getState() != vbucket_state_active) {
-            relHighSeqno = vb->checkpointManager.getLastClosedChkBySeqno();
+            uint64_t snapshot_start, snapshot_end;
+            vb->getCurrentSnapshot(snapshot_start, snapshot_end);
+            relHighSeqno = snapshot_end;
         }
 
         char buffer[32];
@@ -4242,8 +4246,11 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doSeqnoStats(const void *cookie,
         RCPtr<VBucket> vb = getVBucket(*itr);
         if (vb) {
             uint64_t relHighSeqno = vb->getHighSeqno();
+            ReaderLockHolder rlh(vb->getStateLock());
             if (vb->getState() != vbucket_state_active) {
-                relHighSeqno = vb->checkpointManager.getLastClosedChkBySeqno();
+                uint64_t snapshot_start, snapshot_end;
+                vb->getCurrentSnapshot(snapshot_start, snapshot_end);
+                relHighSeqno = snapshot_end;
             }
 
             char buffer[32];
index 2086b9a..df01893 100644 (file)
@@ -3450,8 +3450,8 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
 
     char stats_end_seqno[50];
     snprintf(stats_end_seqno, sizeof(stats_end_seqno),"eq_dcpq:%s:stream_0_end_seqno", name);
-    check((uint64_t)get_ull_stat(h, h1, stats_end_seqno, "dcp")
-          == end, "End Seqno didn't match");
+    checkeq(end, (uint64_t)get_ull_stat(h, h1, stats_end_seqno, "dcp"),
+            "End Seqno didn't match");
 
     char stats_vb_uuid[50];
     snprintf(stats_vb_uuid, sizeof(stats_vb_uuid),"eq_dcpq:%s:stream_0_vb_uuid", name);
@@ -5167,6 +5167,141 @@ static enum test_result test_dcp_consumer_noop(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+static void dcp_stream_to_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+                                  const void *cookie, uint32_t opaque,
+                                  uint16_t vbucket, uint32_t flags,
+                                  uint64_t start, uint64_t end,
+                                  uint64_t snap_start_seqno,
+                                  uint64_t snap_end_seqno,
+                                  uint8_t cas = 0, uint8_t datatype = 1,
+                                  uint32_t exprtime = 0, uint32_t lockTime = 0,
+                                  uint64_t revSeqno = 0)
+{
+    /* Send snapshot marker */
+    checkeq(ENGINE_SUCCESS, h1->dcp.snapshot_marker(h, cookie, opaque, vbucket,
+                                                    snap_start_seqno,
+                                                    snap_end_seqno, flags),
+            "Failed to send marker!");
+    const std::string data("data");
+    /* Send DCP mutations */
+    for (int i = start; i <= end; i++) {
+        std::stringstream key;
+        key << "key" << i;
+        checkeq(ENGINE_SUCCESS, h1->dcp.mutation(h, cookie, opaque,
+                                                 key.str().c_str(),
+                                                 key.str().length(),
+                                                 data.c_str(), data.length(),
+                                                 cas, vbucket, flags, datatype,
+                                                 i, revSeqno, exprtime,
+                                                 lockTime, NULL, 0, 0),
+                "Failed dcp mutate.");
+    }
+}
+
+static enum test_result test_dcp_replica_stream_in_memory(ENGINE_HANDLE *h,
+                                                          ENGINE_HANDLE_V1 *h1)
+{
+    check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
+          "Failed to set vbucket state.");
+
+    const void *cookie = testHarness.create_cookie();
+    uint32_t opaque = 0xFFFF0000;
+    uint32_t seqno = 0;
+    uint32_t flags = 0;
+    const int num_items = 100;
+    const char *name = "unittest";
+    uint16_t nname = strlen(name);
+
+    /* Open an DCP consumer connection */
+    checkeq(ENGINE_SUCCESS,
+            h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname),
+            "Failed dcp producer open connection.");
+
+    std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
+    checkeq(0, type.compare("consumer"), "Consumer not found");
+
+    opaque = add_stream_for_consumer(h, h1, cookie, opaque, 0, 0,
+                                     PROTOCOL_BINARY_RESPONSE_SUCCESS);
+
+    /* Send DCP mutations with in memory flag (0x01) */
+    dcp_stream_to_replica(h, h1, cookie, opaque, 0, 0x01, 1, num_items, 0,
+                          num_items);
+
+    /* Stream in memory mutations from replica */
+    wait_for_flusher_to_settle(h, h1);
+    wait_for_stat_to_be(h, h1, "vb_0:high_seqno", num_items,
+                        "vbucket-seqno");
+    uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+
+    const void *cookie1 = testHarness.create_cookie();
+    dcp_stream(h, h1, "unittest1", cookie1, 0, 0, 0, num_items, vb_uuid, 0, 0,
+               num_items, 0, 1, 0, 2);
+
+    testHarness.destroy_cookie(cookie1);
+    testHarness.destroy_cookie(cookie);
+    return SUCCESS;
+}
+
+static enum test_result test_dcp_replica_stream_all(ENGINE_HANDLE *h,
+                                                    ENGINE_HANDLE_V1 *h1) {
+    check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
+          "Failed to set vbucket state.");
+
+    const void *cookie = testHarness.create_cookie();
+    uint32_t opaque = 0xFFFF0000;
+    uint32_t seqno = 0;
+    uint32_t flags = 0;
+    const int num_items = 100;
+    const char *name = "unittest";
+    uint16_t nname = strlen(name);
+
+    /* Open an DCP consumer connection */
+    checkeq(ENGINE_SUCCESS,
+            h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname),
+            "Failed dcp producer open connection.");
+
+    std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
+    checkeq(0, type.compare("consumer"), "Consumer not found");
+
+    opaque = add_stream_for_consumer(h, h1, cookie, opaque, 0, 0,
+                                     PROTOCOL_BINARY_RESPONSE_SUCCESS);
+
+    /* Send DCP mutations with in memory flag (0x01) */
+    dcp_stream_to_replica(h, h1, cookie, opaque, 0, 0x01, 1, num_items, 0,
+                          num_items);
+
+    /* Send 100 more DCP mutations with checkpoint creation flag (0x04) */
+    uint64_t start = num_items;
+    dcp_stream_to_replica(h, h1, cookie, opaque, 0, 0x04, start + 1,
+                          start + 100, start, start + 100);
+
+    wait_for_flusher_to_settle(h, h1);
+    stop_persistence(h, h1);
+    checkeq(2 * num_items, get_int_stat(h, h1, "vb_replica_curr_items"),
+            "wrong number of items in replica vbucket");
+
+    /* Add 100 more items to the replica node on a new checkpoint */
+    /* Send with flag (0x04) indicating checkpoint creation */
+    start = 2 * num_items;
+    dcp_stream_to_replica(h, h1, cookie, opaque, 0, 0x04, start + 1,
+                          start + 100, start, start + 100);
+
+    /* Disk backfill + in memory stream from replica */
+    /* Wait for a checkpoint to be removed */
+    wait_for_stat_to_be(h, h1, "vb_0:num_checkpoints", 2, "checkpoint");
+
+    uint64_t end = get_int_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
+    uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    const void *cookie1 = testHarness.create_cookie();
+    dcp_stream(h, h1, "unittest1", cookie1, 0, 0, 0, end, vb_uuid, 0, 0, 300, 0,
+               1, 0, 2);
+
+    testHarness.destroy_cookie(cookie1);
+    testHarness.destroy_cookie(cookie);
+
+    return SUCCESS;
+}
+
 static enum test_result test_tap_rcvr_mutate(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     char eng_specific[9];
     memset(eng_specific, 0, sizeof(eng_specific));
@@ -12232,6 +12367,12 @@ engine_test_t* get_tests(void) {
                  teardown, NULL, prepare, cleanup),
         TestCase("test dcp consumer noop", test_dcp_consumer_noop, test_setup,
                  teardown, NULL, prepare, cleanup),
+        TestCase("test dcp replica stream in-memory",
+                 test_dcp_replica_stream_in_memory, test_setup, teardown,
+                 "chk_remover_stime=1;max_checkpoints=2", prepare, cleanup),
+        TestCase("test dcp replica stream all", test_dcp_replica_stream_all,
+                 test_setup, teardown, "chk_remover_stime=1;max_checkpoints=2",
+                 prepare, cleanup),
         TestCase("test producer stream request (partial)",
                  test_dcp_producer_stream_req_partial, test_setup, teardown,
                  "chk_remover_stime=1;chk_max_items=100", prepare, cleanup),