MB-22178: Don't use opencheckpointid to determine if in backfill phase 13/73813/3
authorDaniel Owen <owend@couchbase.com>
Thu, 8 Dec 2016 15:59:44 +0000 (15:59 +0000)
committerJim Walker <jim@couchbase.com>
Thu, 23 Feb 2017 19:15:57 +0000 (19:15 +0000)
The opencheckpointid of a bucket can be zero after a rollback.
If an opencheckpointid was zero it was assumed that the vbucket was in
backfilling state.  This caused the producer stream request to be stuck
waiting for backfilling to complete.

Ths patch uses the vb->isBackfillPhase() to determine if the vbucket is
in a backfill state as opposed to using an opencheckpointid of zero.

Change-Id: Ia977d6bf90e54fd1ceb8db4a9088b19d94d4bc8c
Reviewed-on: http://review.couchbase.org/70810
Reviewed-by: Manu Dhundi <manu@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Jim Walker <jim@couchbase.com>
Reviewed-on: http://review.couchbase.org/73813
Well-Formed: Build Bot <build@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
src/dcp/producer.cc
src/dcp/stream.cc
src/ep.cc
tests/module_tests/dcp_test.cc
tests/module_tests/evp_store_rollback_test.cc

index 2b3cf4f..5295986 100644 (file)
@@ -199,7 +199,7 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
         return ENGINE_NOT_MY_VBUCKET;
     }
 
-    if (vb->checkpointManager.getOpenCheckpointId() == 0) {
+    if (vb->isBackfillPhase()) {
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
             "this vbucket is in backfill state", logHeader(), vbucket);
         return ENGINE_TMPFAIL;
index d76277e..93e95b8 100644 (file)
@@ -1805,11 +1805,21 @@ void PassiveStream::processMarker(SnapshotMarker* marker) {
     if (vb) {
         if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
             vb->setBackfillPhase(true);
+            // calling checkpointManager.setBackfillPhase sets the
+            // openCheckpointId to zero
             vb->checkpointManager.setBackfillPhase(cur_snapshot_start.load(),
                                                    cur_snapshot_end.load());
         } else {
-            if (marker->getFlags() & MARKER_FLAG_CHK ||
-                vb->checkpointManager.getOpenCheckpointId() == 0) {
+            if (marker->getFlags() & MARKER_FLAG_CHK || vb->isBackfillPhase()) {
+                    // Instead of checking isBackfillPhase() an earlier patch
+                    // http://review.couchbase.org/#/c/39960/ changed the check
+                    // to (vb->checkpointManager.getOpenCheckpointId() == 0).
+                    // This change has been reverted in favor of using the
+                    // isBackfillPhase() check as it is believed to be safer and
+                    // makes the logic easier to understand.  However if an
+                    // issues such as that seen in MB-11786 occurs then it maybe
+                    // worth investigating the impact of changing the check to
+                    // use (vb->checkpointManager.getOpenCheckpointId() == 0)
                 vb->checkpointManager.createSnapshot(cur_snapshot_start.load(),
                                                      cur_snapshot_end.load());
             } else {
index a44a47a..5541f35 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -4022,6 +4022,7 @@ EventuallyPersistentStore::rollback(uint16_t vbid,
                 vb->checkpointManager.clear(vb, result.highSeqno);
                 vb->setPersistedSnapshot(result.snapStartSeqno, result.snapEndSeqno);
                 vb->incrRollbackItemCount(prevHighSeqno - result.highSeqno);
+                vb->setBackfillPhase(false);
                 return ENGINE_SUCCESS;
             }
         }
index 7d7c0aa..43235b6 100644 (file)
@@ -261,7 +261,12 @@ TEST_F(StreamTest, test_mb18625) {
         << "Expected no more messages in the readyQ";
 }
 
-class ConnectionTest : public DCPTest {};
+class ConnectionTest : public DCPTest {
+protected:
+    ENGINE_ERROR_CODE set_vb_state(uint16_t vbid, vbucket_state_t state) {
+        return engine->getEpStore()->setVBucketState(vbid, state, true);
+    }
+};
 
 ENGINE_ERROR_CODE mock_noop_return_engine_e2big(const void* cookie,uint32_t opaque) {
     return ENGINE_E2BIG;
@@ -698,6 +703,59 @@ TEST_F(ConnectionTest, test_mb20716_connmap_notify_on_delete_consumer) {
     destroy_mock_cookie(cookie);
 }
 
+/*
+ * The following tests that once a vbucket has been put into a backfillphase
+ * the openCheckpointID is 0.  In addition it checks that a subsequent
+ * snapshotMarker results in a new checkpoint being created.
+ */
+TEST_F(ConnectionTest, test_mb21784) {
+    // Make vbucket replica so can add passive stream
+    ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
+
+    const void *cookie = create_mock_cookie();
+    /*
+     * Create a Mock Dcp consumer. Since child class subobj of MockDcpConsumer
+     *  obj are accounted for by SingleThreadedRCPtr, use the same here
+     */
+    connection_t conn = new MockDcpConsumer(*engine, cookie, "test_consumer");
+    MockDcpConsumer* consumer = static_cast<MockDcpConsumer*>(conn.get());
+
+    // Add passive stream
+    ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
+                                                  /*flags*/0));
+    // Get the checkpointManager
+    auto& manager = engine->getEpStore()->getVBucket(vbid)->checkpointManager;
+
+    // Because the vbucket was previously active it will have an
+    // openCheckpointId of 2
+    EXPECT_EQ(2, manager.getOpenCheckpointId());
+
+    // Send a snapshotMarker to move the vbucket into a backfilling state
+    consumer->snapshotMarker(/*opaque*/1,
+                             /*vbucket*/0,
+                             /*start_seqno*/0,
+                             /*end_seqno*/0,
+                             /*flags set to MARKER_FLAG_DISK*/0x2);
+
+    // A side effect of moving the vbucket into a backfill state is that
+    // the openCheckpointId is set to 0
+    EXPECT_EQ(0, manager.getOpenCheckpointId());
+
+    consumer->snapshotMarker(/*opaque*/1,
+                             /*vbucket*/0,
+                             /*start_seqno*/0,
+                             /*end_seqno*/0,
+                             /*flags*/0);
+
+    // Check that a new checkpoint was created, which means the
+    // opencheckpointid increases to 1
+    EXPECT_EQ(1, manager.getOpenCheckpointId());
+
+    // Close stream
+    ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(/*opaque*/0, vbid));
+    destroy_mock_cookie(cookie);
+}
+
 class NotifyTest : public DCPTest {
 protected:
     void SetUp() {
index 9faedcd..ca8db9b 100644 (file)
@@ -19,7 +19,9 @@
  * Tests for Rollback functionality in EPStore.
  */
 
+#include "connmap.h"
 #include "evp_store_test.h"
+#include "programs/engine_testapp/mock_server.h"
 
 class RollbackTest : public EventuallyPersistentStoreTest,
                      public ::testing::WithParamInterface<std::string>
@@ -47,6 +49,15 @@ class RollbackTest : public EventuallyPersistentStoreTest,
     }
 
 protected:
+    /*
+     * Fake callback emulating dcp_add_failover_log
+     */
+    static ENGINE_ERROR_CODE fakeDcpAddFailoverLog(vbucket_failover_t* entry,
+                                                   size_t nentries,
+                                                   const void *cookie) {
+        return ENGINE_SUCCESS;
+    }
+
     /**
      * Test rollback after deleting an item.
      * @param flush_before_rollback: Should the vbuckt be flushed to disk just
@@ -221,6 +232,48 @@ TEST_P(RollbackTest, RollbackToMiddleOfACheckpointNoFlush) {
     rollback_to_middle_test(false);
 }
 
+/*
+ * The opencheckpointid of a bucket can be zero after a rollback.
+ * From MB21784 if an opencheckpointid was zero it was assumed that the
+ * vbucket was in backfilling state.  This caused the producer stream
+ * request to be stuck waiting for backfilling to complete.
+ */
+TEST_P(RollbackTest, MB21784) {
+    // Make the vbucket a replica
+    store->setVBucketState(vbid, vbucket_state_replica, false);
+    // Perform a rollback
+    EXPECT_EQ(ENGINE_SUCCESS, store->rollback(vbid, initial_seqno))
+        << "rollback did not return ENGINE_SUCCESS";
+
+    // Assert the checkpointmanager clear function (called during rollback)
+    // has set the opencheckpointid to zero
+    auto vb = store->getVbMap().getBucket(vbid);
+    auto& ckpt_mgr = vb->checkpointManager;
+    EXPECT_EQ(0, ckpt_mgr.getOpenCheckpointId()) << "opencheckpointId not zero";
+
+    // Create a new Dcp producer, reserving its cookie.
+    get_mock_server_api()->cookie->reserve(cookie);
+    dcp_producer_t producer = engine->getDcpConnMap().newProducer(
+            cookie, "test_producer", /*notifyOnly*/false);
+
+    uint64_t rollbackSeqno;
+    auto err = producer->streamRequest(/*flags*/0,
+                                       /*opaque*/0,
+                                       /*vbucket*/vbid,
+                                       /*start_seqno*/0,
+                                       /*end_seqno*/0,
+                                       /*vb_uuid*/0,
+                                       /*snap_start*/0,
+                                       /*snap_end*/0,
+                                       &rollbackSeqno,
+                                       RollbackTest::fakeDcpAddFailoverLog);
+    EXPECT_EQ(ENGINE_SUCCESS, err)
+        << "stream request did not return ENGINE_SUCCESS";
+    // Close stream
+    ASSERT_EQ(ENGINE_SUCCESS, producer->closeStream(/*opaque*/0, vbid));
+    engine->handleDisconnect(cookie);
+}
+
 // Test cases which run in both Full and Value eviction
 INSTANTIATE_TEST_CASE_P(FullAndValueEviction,
                         RollbackTest,