[Ephemeral]: MB-23734: Handle DCP backfill failures 15/77215/5
authorManu Dhundi <manu@couchbase.com>
Fri, 21 Apr 2017 22:56:59 +0000 (15:56 -0700)
committerDave Rigby <daver@couchbase.com>
Mon, 24 Apr 2017 09:42:10 +0000 (09:42 +0000)
When a DCP backfill task is run there could be failures. We must handle
the failures gracefully.

This commit handles any failures in DCP backfill in Ephemeral buckets.
Upon a failure we close the stream and the DCP client can retry at
a later time.

Change-Id: I2aeffb9baf7d5a8ac367b129470741af7806e710
Reviewed-on: http://review.couchbase.org/77215
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
src/dcp/backfill_memory.cc
src/dcp/stream.cc
src/dcp/stream.h
tests/mock/mock_stream.h
tests/module_tests/dcp_test.cc

index 520396e..29ec721 100644 (file)
@@ -33,6 +33,8 @@ DCPBackfillMemory::DCPBackfillMemory(EphemeralVBucketPtr evb,
 backfill_status_t DCPBackfillMemory::run() {
     auto evb = weakVb.lock();
     if (!evb) {
+        /* We don't have to close the stream here. Task doing vbucket state
+           change should handle stream closure */
         LOG(EXTENSION_LOG_WARNING,
             "DCPBackfillMemory::run(): "
             "(vb:%d) running backfill ended prematurely as weakVb can't be "
@@ -46,6 +48,8 @@ backfill_status_t DCPBackfillMemory::run() {
     /* Get vb state lock */
     ReaderLockHolder rlh(evb->getStateLock());
     if (evb->getState() == vbucket_state_dead) {
+        /* We don't have to close the stream here. Task doing vbucket state
+           change should handle stream closure */
         LOG(EXTENSION_LOG_WARNING,
             "DCPBackfillMemory::run(): "
             "(vb:%d) running backfill ended prematurely with vb in dead state; "
@@ -63,15 +67,18 @@ backfill_status_t DCPBackfillMemory::run() {
 
     /* Handle any failures */
     if (status != ENGINE_SUCCESS) {
-        /* [EPHE TODO]: Should we close stream ?? */
         LOG(EXTENSION_LOG_WARNING,
             "DCPBackfillMemory::run(): "
             "(vb:%d) running backfill failed with error %d ; "
-            "start seqno:%" PRIi64 ", end seqno:%" PRIi64,
+            "start seqno:%" PRIi64 ", end seqno:%" PRIi64
+            ". "
+            "Hence closing the stream",
             getVBucketId(),
             status,
             startSeqno,
             endSeqno);
+        /* Close the stream, DCP clients can retry */
+        stream->setDead(END_STREAM_BACKFILL_FAIL);
         return backfill_finished;
     }
 
index 85f96c0..64daeb9 100644 (file)
@@ -1271,6 +1271,8 @@ const char* ActiveStream::getEndStreamStatusStr(end_stream_status_t status)
         return "The stream closed early because the conn was disconnected";
     case END_STREAM_SLOW:
         return "The stream was closed early because it was too slow";
+    case END_STREAM_BACKFILL_FAIL:
+        return "The stream closed early due to backfill failure";
     }
     std::string msg("Status unknown: " + std::to_string(status) +
                     "; this should not have happened!");
index 3be5778..45e51a4 100644 (file)
@@ -48,7 +48,9 @@ enum end_stream_status_t {
     END_STREAM_DISCONNECTED,
     //! The stream was closed early because it was too slow (currently unused,
     //! but not deleted because it is part of the externally-visible API)
-    END_STREAM_SLOW
+    END_STREAM_SLOW,
+    //! The stream closed early due to backfill failure
+    END_STREAM_BACKFILL_FAIL
 };
 
 enum process_items_error_t {
index 5e26a17..ec89831 100644 (file)
@@ -105,6 +105,12 @@ public:
             queued_item& item) {
         return makeResponseFromItem(item);
     }
+
+    void waitForStreamClose() {
+        while (getState() != StreamState::Dead) {
+            usleep(10);
+        }
+    }
 };
 
 /* Mock of the PassiveStream class. Wraps the real PassiveStream, but exposes
index 0249d80..65b2227 100644 (file)
@@ -449,6 +449,43 @@ TEST_P(StreamTest, BackfillOnly) {
                items are read correctly */
 }
 
+/* Test a backfill fail scenario */
+TEST_P(StreamTest, BackfillFail) {
+    if (bucketType == "persistent") {
+        /* This test simulates a backfill failure for an ephemeral bucket
+           only.
+           [TODO]: Write a test case for disk backfill failures as well */
+        setup_dcp_stream(); /* [TODO, Legacy]: Check why TearDown() crashes
+                                               without this */
+        return;
+    }
+
+    /* Add 3 items */
+    int numItems = 3;
+    for (int i = 0; i < numItems; ++i) {
+        std::string key("key" + std::to_string(i));
+        store_item(vbid, key, "value");
+    }
+
+    /* Set up a DCP stream for the backfill */
+    setup_dcp_stream();
+    MockActiveStream* mock_stream =
+            dynamic_cast<MockActiveStream*>(stream.get());
+
+    /* We want the backfill task to run in a background thread */
+    ExecutorPool::get()->setNumAuxIO(1);
+
+    /* Schedule a backfill task with an incorrect param:
+       here backfill_start > vb_highSeqno */
+    producer->scheduleBackfillManager(*vb0,
+                                      mock_stream,
+                                      /*backfillStart*/numItems + 5,
+                                      /*backfillEnd*/numItems + 10);
+
+    /* Expect stream to be closed */
+    mock_stream->waitForStreamClose();
+}
+
 class ConnectionTest : public DCPTest {
 protected:
     ENGINE_ERROR_CODE set_vb_state(uint16_t vbid, vbucket_state_t state) {