MB-22451: Ensure isBackfillTaskRunning is correctly set 10/73310/10
authorDaniel Owen <owend@couchbase.com>
Fri, 3 Feb 2017 14:33:43 +0000 (14:33 +0000)
committerJim Walker <jim@couchbase.com>
Tue, 14 Feb 2017 13:45:08 +0000 (13:45 +0000)
In ActiveStream::completeBackfill if in STREAM_BACKFILLING state and
pendingBackfill is true then we will schedule another backfill.  This
will cause isBackfillTaskRunning to be set to true.  The flag should
remain true on exit of the completeBackfill function.

Change-Id: If8219a7f87b65af46d37a800eebf2257917cc555
Reviewed-on: http://review.couchbase.org/73310
Well-Formed: Build Bot <build@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Jim Walker <jim@couchbase.com>
src/dcp/stream.cc
src/dcp/stream.h
tests/mock/mock_stream.h [new file with mode: 0644]
tests/module_tests/dcp_test.cc
tests/module_tests/evp_store_single_threaded_test.cc
tests/module_tests/evp_store_test.cc
tests/module_tests/evp_store_test.h

index 2dbc973..d76277e 100644 (file)
@@ -166,12 +166,12 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
                            uint64_t snap_start_seqno, uint64_t snap_end_seqno)
     :  Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
               snap_start_seqno, snap_end_seqno),
+       isBackfillTaskRunning(false), pendingBackfill(false),
        lastReadSeqnoUnSnapshotted(st_seqno), lastReadSeqno(st_seqno),
        lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
        takeoverState(vbucket_state_pending), backfillRemaining(0),
        itemsFromMemoryPhase(0), firstMarkerSent(false), waitForSnapshot(0),
-       engine(e), producer(p), isBackfillTaskRunning(false),
-       pendingBackfill(false),
+       engine(e), producer(p),
        payloadType((flags & DCP_ADD_STREAM_FLAG_NO_VALUE) ? KEY_ONLY :
                                                             KEY_VALUE),
        lastSentSnapEndSeqno(0), chkptItemsExtractionInProgress(false) {
@@ -388,6 +388,20 @@ void ActiveStream::completeBackfill() {
                 scheduleBackfill_UNLOCKED(true);
                 pendingBackfill = false;
             }
+
+            bool expected = false;
+            if (itemsReady.compare_exchange_strong(expected, true)) {
+                producer->notifyStreamReady(vb_);
+            }
+
+            /**
+              * MB-22451: It is important that we return here because
+              * scheduleBackfill_UNLOCKED(true) can set
+              * isBackfillTaskRunning to true.  Therefore if we don't return we
+              * will set isBackfillTaskRunning prematurely back to false, (see
+              * below).
+              */
+            return;
         }
     }
 
index 38b7eeb..470eef0 100644 (file)
@@ -263,10 +263,24 @@ protected:
 
     DcpResponse* nextQueuedItem();
 
-private:
-
+    /* The transitionState function is protected (as opposed to private) for
+     * testing purposes.
+     */
     void transitionState(stream_state_t newState);
 
+    /* Indicates that a backfill has been scheduled and has not yet completed.
+     * Is protected (as opposed to private) for testing purposes.
+     */
+    std::atomic<bool> isBackfillTaskRunning;
+
+    /* Indicates if another backfill must be scheduled following the completion
+     * of current running backfill.  Guarded by streamMutex.
+     * Is protected (as opposed to private) for testing purposes.
+     */
+    bool pendingBackfill;
+
+private:
+
     DcpResponse* backfillPhase();
 
     DcpResponse* inMemoryPhase();
@@ -340,13 +354,6 @@ private:
 
     EventuallyPersistentEngine* engine;
     dcp_producer_t producer;
-    AtomicValue<bool> isBackfillTaskRunning;
-
-    /* Indicates if another backfill must be scheduled
-     * following the completion of current running backfill.
-     * Guarded by streamMutex
-     */
-    bool pendingBackfill;
 
     struct {
         AtomicValue<uint32_t> bytes;
diff --git a/tests/mock/mock_stream.h b/tests/mock/mock_stream.h
new file mode 100644 (file)
index 0000000..8b5158e
--- /dev/null
@@ -0,0 +1,89 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2017 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#pragma once
+
+#include "dcp/stream.h"
+
+/*
+ * Mock of the ActiveStream class. Wraps the real ActiveStream, but exposes
+ * normally protected methods publically for test purposes.
+ */
+class MockActiveStream : public ActiveStream {
+public:
+    MockActiveStream(EventuallyPersistentEngine* e,
+                     dcp_producer_t p,
+                     const std::string& name,
+                     uint32_t flags,
+                     uint32_t opaque,
+                     uint16_t vb,
+                     uint64_t st_seqno,
+                     uint64_t en_seqno,
+                     uint64_t vb_uuid,
+                     uint64_t snap_start_seqno,
+                     uint64_t snap_end_seqno)
+        : ActiveStream(e,
+                       p,
+                       name,
+                       flags,
+                       opaque,
+                       vb,
+                       st_seqno,
+                       en_seqno,
+                       vb_uuid,
+                       snap_start_seqno,
+                       snap_end_seqno) {
+    }
+
+    // Expose underlying protected ActiveStream methods as public
+    void public_getOutstandingItems(RCPtr<VBucket>& vb,
+                                    std::vector<queued_item>& items) {
+        getOutstandingItems(vb, items);
+    }
+
+    void public_processItems(std::vector<queued_item>& items) {
+        processItems(items);
+    }
+
+    bool public_nextCheckpointItem() {
+        return nextCheckpointItem();
+    }
+
+    const std::queue<DcpResponse*>& public_readyQ() {
+        return readyQ;
+    }
+
+    DcpResponse* public_nextQueuedItem() {
+        return nextQueuedItem();
+    }
+
+    void public_setBackfillTaskRunning(bool b) {
+        isBackfillTaskRunning = b;
+    }
+
+    bool public_isBackfillTaskRunning() const {
+        return isBackfillTaskRunning;
+    }
+
+    void public_transitionState(stream_state_t newState) {
+        transitionState(newState);
+    }
+
+    bool public_getPendingBackfill() const {
+        return pendingBackfill;
+    }
+};
index 519b6aa..7d7c0aa 100644 (file)
 #include "../mock/mock_dcp.h"
 #include "../mock/mock_dcp_producer.h"
 #include "../mock/mock_dcp_consumer.h"
+#include "../mock/mock_stream.h"
 
 #include <gtest/gtest.h>
 
-// Mock of the ActiveStream class. Wraps the real ActiveStream, but exposes
-// normally protected methods publically for test purposes.
-class MockActiveStream : public ActiveStream {
-public:
-    MockActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
-                     const std::string &name, uint32_t flags, uint32_t opaque,
-                     uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
-                     uint64_t vb_uuid, uint64_t snap_start_seqno,
-                     uint64_t snap_end_seqno)
-    : ActiveStream(e, p, name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
-                   snap_start_seqno, snap_end_seqno) {}
-
-    // Expose underlying protected ActiveStream methods as public
-    void public_getOutstandingItems(RCPtr<VBucket> &vb,
-                                    std::vector<queued_item> &items) {
-        getOutstandingItems(vb, items);
-    }
-
-    void public_processItems(std::vector<queued_item>& items) {
-        processItems(items);
-    }
-
-    bool public_nextCheckpointItem() {
-        return nextCheckpointItem();
-    }
-
-    const std::queue<DcpResponse*>& public_readyQ() {
-        return readyQ;
-    }
-
-    DcpResponse* public_nextQueuedItem() {
-        return nextQueuedItem();
-    }
-};
-
 /*
  * Mock of the DcpConnMap class.  Wraps the real DcpConnMap, but exposes
  * normally protected methods publically for test purposes.
index e2eecb9..fe8034d 100644 (file)
@@ -21,6 +21,7 @@
 #include "taskqueue.h"
 #include "../mock/mock_dcp_producer.h"
 #include "../mock/mock_dcp_consumer.h"
+#include "../mock/mock_stream.h"
 #include "programs/engine_testapp/mock_server.h"
 
 #include <thread>
@@ -118,6 +119,64 @@ protected:
     SingleThreadedExecutorPool* task_executor;
 };
 
+
+/**
+ * Regression test for MB-22451: When handleSlowStream is called and in
+ * STREAM_BACKFILLING state and currently have a backfill scheduled (or running)
+ * ensure that when the backfill completes the new backfill is scheduled and
+ * the backfilling flag remains true.
+ */
+TEST_F(SingleThreadedEPStoreTest, test_mb22451) {
+    // Make vbucket active.
+    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
+    // Store a single Item
+    store_item(vbid, "key", "value");
+    // Ensure that it has persisted to disk
+    flush_vbucket_to_disk(vbid);
+
+    // Create a Mock Dcp producer
+    dcp_producer_t producer = new MockDcpProducer(*engine,
+                                                  cookie,
+                                                  "test_producer",
+                                                  /*notifyOnly*/false);
+    // Create a Mock Active Stream
+    stream_t stream = new MockActiveStream(
+            static_cast<EventuallyPersistentEngine*>(engine.get()),
+            producer,
+            producer->getName(),
+            /*flags*/0,
+            /*opaque*/0, vbid,
+            /*st_seqno*/0,
+            /*en_seqno*/~0,
+            /*vb_uuid*/0xabcd,
+            /*snap_start_seqno*/0,
+            /*snap_end_seqno*/~0);
+
+    MockActiveStream* mock_stream =
+            static_cast<MockActiveStream*>(stream.get());
+
+    /**
+      * The core of the test follows:
+      * Call completeBackfill whilst we are in the state of STREAM_BACKFILLING
+      * and the pendingBackfill flag is set to true.
+      * We expect that on leaving completeBackfill the isBackfillRunning flag is
+      * set to true.
+      */
+    mock_stream->public_setBackfillTaskRunning(true);
+    mock_stream->public_transitionState(STREAM_BACKFILLING);
+    mock_stream->handleSlowStream();
+    // The call to handleSlowStream should result in setting pendingBackfill
+    // flag to true
+    EXPECT_TRUE(mock_stream->public_getPendingBackfill())
+        << "pendingBackfill is not true";
+    mock_stream->completeBackfill();
+    EXPECT_TRUE(mock_stream->public_isBackfillTaskRunning())
+        << "isBackfillRunning is not true";
+
+    // Required to ensure that the backfillMgr is deleted
+    producer->closeAllStreams();
+}
+
 /* Regression / reproducer test for MB-19695 - an exception is thrown
  * (and connection disconnected) if a couchstore file hasn't been re-created
  * yet when doTapVbTakeoverStats() is called as part of
index d5c1f66..f11018b 100644 (file)
@@ -37,6 +37,8 @@
 #include "replicationthrottle.h"
 
 #include "programs/engine_testapp/mock_server.h"
+
+#include <chrono>
 #include <platform/dirutils.h>
 #include <thread>
 
@@ -160,6 +162,40 @@ Item EventuallyPersistentStoreTest::store_item(uint16_t vbid,
     return item;
 }
 
+void EventuallyPersistentStoreTest::flush_vbucket_to_disk(uint16_t vbid) {
+    int result;
+    const auto time_limit = std::chrono::seconds(10);
+    const auto deadline = std::chrono::steady_clock::now() + time_limit;
+
+    // Need to retry as warmup may not have completed.
+    bool flush_successful = false;
+    do {
+        result = store->flushVBucket(vbid);
+        if (result != RETRY_FLUSH_VBUCKET) {
+            flush_successful = true;
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::microseconds(100));
+    } while (std::chrono::steady_clock::now() < deadline);
+
+    ASSERT_TRUE(flush_successful)
+        << "Hit timeout (" << time_limit.count() << " seconds) waiting for "
+           "warmup to complete while flushing VBucket.";
+
+    ASSERT_EQ(1, result) << "Failed to flush the one item we have stored.";
+
+    /**
+     * Although a flushVBucket writes the item to the underlying store,
+     * the item is not marked clean until an explicit commit is called
+     * If the underlying store is couchstore, a commit is called with
+     * a flushVBucket but in the case of forestdb, a commit is not
+     * always called, hence call an explicit commit.
+     */
+    uint16_t numShards = store->getVbMap().getNumShards();
+
+    store->commit(vbid % numShards);
+}
+
 
 // Verify that when handling a bucket delete with open DCP
 // connections, we don't deadlock when notifying the front-end
index 2945e90..7cb0879 100644 (file)
@@ -101,6 +101,11 @@ protected:
     Item store_item(uint16_t vbid, const std::string& key,
                     const std::string& value);
 
+    /* Flush the given vbucket to disk, so any outstanding dirty items are
+     * written (and are clean).
+     */
+    void flush_vbucket_to_disk(uint16_t vbid);
+
     static const char test_dbname[];
 
     std::string config_string;