uint64_t snap_start_seqno, uint64_t snap_end_seqno)
: Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
snap_start_seqno, snap_end_seqno),
+ isBackfillTaskRunning(false), pendingBackfill(false),
lastReadSeqnoUnSnapshotted(st_seqno), lastReadSeqno(st_seqno),
lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
takeoverState(vbucket_state_pending), backfillRemaining(0),
itemsFromMemoryPhase(0), firstMarkerSent(false), waitForSnapshot(0),
- engine(e), producer(p), isBackfillTaskRunning(false),
- pendingBackfill(false),
+ engine(e), producer(p),
payloadType((flags & DCP_ADD_STREAM_FLAG_NO_VALUE) ? KEY_ONLY :
KEY_VALUE),
lastSentSnapEndSeqno(0), chkptItemsExtractionInProgress(false) {
scheduleBackfill_UNLOCKED(true);
pendingBackfill = false;
}
+
+ bool expected = false;
+ if (itemsReady.compare_exchange_strong(expected, true)) {
+ producer->notifyStreamReady(vb_);
+ }
+
+ /**
+ * MB-22451: It is important that we return here because
+ * scheduleBackfill_UNLOCKED(true) can set
+ * isBackfillTaskRunning to true. Therefore if we don't return we
+ * will set isBackfillTaskRunning prematurely back to false, (see
+ * below).
+ */
+ return;
}
}
DcpResponse* nextQueuedItem();
-private:
-
+ /* The transitionState function is protected (as opposed to private) for
+ * testing purposes.
+ */
void transitionState(stream_state_t newState);
+ /* Indicates that a backfill has been scheduled and has not yet completed.
+ * Is protected (as opposed to private) for testing purposes.
+ */
+ std::atomic<bool> isBackfillTaskRunning;
+
+ /* Indicates if another backfill must be scheduled following the completion
+ * of current running backfill. Guarded by streamMutex.
+ * Is protected (as opposed to private) for testing purposes.
+ */
+ bool pendingBackfill;
+
+private:
+
DcpResponse* backfillPhase();
DcpResponse* inMemoryPhase();
EventuallyPersistentEngine* engine;
dcp_producer_t producer;
- AtomicValue<bool> isBackfillTaskRunning;
-
- /* Indicates if another backfill must be scheduled
- * following the completion of current running backfill.
- * Guarded by streamMutex
- */
- bool pendingBackfill;
struct {
AtomicValue<uint32_t> bytes;
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2017 Couchbase, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "dcp/stream.h"
+
+/*
+ * Mock of the ActiveStream class. Wraps the real ActiveStream, but exposes
+ * normally protected methods publically for test purposes.
+ */
+class MockActiveStream : public ActiveStream {
+public:
+ MockActiveStream(EventuallyPersistentEngine* e,
+ dcp_producer_t p,
+ const std::string& name,
+ uint32_t flags,
+ uint32_t opaque,
+ uint16_t vb,
+ uint64_t st_seqno,
+ uint64_t en_seqno,
+ uint64_t vb_uuid,
+ uint64_t snap_start_seqno,
+ uint64_t snap_end_seqno)
+ : ActiveStream(e,
+ p,
+ name,
+ flags,
+ opaque,
+ vb,
+ st_seqno,
+ en_seqno,
+ vb_uuid,
+ snap_start_seqno,
+ snap_end_seqno) {
+ }
+
+ // Expose underlying protected ActiveStream methods as public
+ void public_getOutstandingItems(RCPtr<VBucket>& vb,
+ std::vector<queued_item>& items) {
+ getOutstandingItems(vb, items);
+ }
+
+ void public_processItems(std::vector<queued_item>& items) {
+ processItems(items);
+ }
+
+ bool public_nextCheckpointItem() {
+ return nextCheckpointItem();
+ }
+
+ const std::queue<DcpResponse*>& public_readyQ() {
+ return readyQ;
+ }
+
+ DcpResponse* public_nextQueuedItem() {
+ return nextQueuedItem();
+ }
+
+ void public_setBackfillTaskRunning(bool b) {
+ isBackfillTaskRunning = b;
+ }
+
+ bool public_isBackfillTaskRunning() const {
+ return isBackfillTaskRunning;
+ }
+
+ void public_transitionState(stream_state_t newState) {
+ transitionState(newState);
+ }
+
+ bool public_getPendingBackfill() const {
+ return pendingBackfill;
+ }
+};
#include "../mock/mock_dcp.h"
#include "../mock/mock_dcp_producer.h"
#include "../mock/mock_dcp_consumer.h"
+#include "../mock/mock_stream.h"
#include <gtest/gtest.h>
-// Mock of the ActiveStream class. Wraps the real ActiveStream, but exposes
-// normally protected methods publically for test purposes.
-class MockActiveStream : public ActiveStream {
-public:
- MockActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
- const std::string &name, uint32_t flags, uint32_t opaque,
- uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
- uint64_t vb_uuid, uint64_t snap_start_seqno,
- uint64_t snap_end_seqno)
- : ActiveStream(e, p, name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
- snap_start_seqno, snap_end_seqno) {}
-
- // Expose underlying protected ActiveStream methods as public
- void public_getOutstandingItems(RCPtr<VBucket> &vb,
- std::vector<queued_item> &items) {
- getOutstandingItems(vb, items);
- }
-
- void public_processItems(std::vector<queued_item>& items) {
- processItems(items);
- }
-
- bool public_nextCheckpointItem() {
- return nextCheckpointItem();
- }
-
- const std::queue<DcpResponse*>& public_readyQ() {
- return readyQ;
- }
-
- DcpResponse* public_nextQueuedItem() {
- return nextQueuedItem();
- }
-};
-
/*
* Mock of the DcpConnMap class. Wraps the real DcpConnMap, but exposes
* normally protected methods publically for test purposes.
#include "taskqueue.h"
#include "../mock/mock_dcp_producer.h"
#include "../mock/mock_dcp_consumer.h"
+#include "../mock/mock_stream.h"
#include "programs/engine_testapp/mock_server.h"
#include <thread>
SingleThreadedExecutorPool* task_executor;
};
+
+/**
+ * Regression test for MB-22451: When handleSlowStream is called and in
+ * STREAM_BACKFILLING state and currently have a backfill scheduled (or running)
+ * ensure that when the backfill completes the new backfill is scheduled and
+ * the backfilling flag remains true.
+ */
+TEST_F(SingleThreadedEPStoreTest, test_mb22451) {
+ // Make vbucket active.
+ setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
+ // Store a single Item
+ store_item(vbid, "key", "value");
+ // Ensure that it has persisted to disk
+ flush_vbucket_to_disk(vbid);
+
+ // Create a Mock Dcp producer
+ dcp_producer_t producer = new MockDcpProducer(*engine,
+ cookie,
+ "test_producer",
+ /*notifyOnly*/false);
+ // Create a Mock Active Stream
+ stream_t stream = new MockActiveStream(
+ static_cast<EventuallyPersistentEngine*>(engine.get()),
+ producer,
+ producer->getName(),
+ /*flags*/0,
+ /*opaque*/0, vbid,
+ /*st_seqno*/0,
+ /*en_seqno*/~0,
+ /*vb_uuid*/0xabcd,
+ /*snap_start_seqno*/0,
+ /*snap_end_seqno*/~0);
+
+ MockActiveStream* mock_stream =
+ static_cast<MockActiveStream*>(stream.get());
+
+ /**
+ * The core of the test follows:
+ * Call completeBackfill whilst we are in the state of STREAM_BACKFILLING
+ * and the pendingBackfill flag is set to true.
+ * We expect that on leaving completeBackfill the isBackfillRunning flag is
+ * set to true.
+ */
+ mock_stream->public_setBackfillTaskRunning(true);
+ mock_stream->public_transitionState(STREAM_BACKFILLING);
+ mock_stream->handleSlowStream();
+ // The call to handleSlowStream should result in setting pendingBackfill
+ // flag to true
+ EXPECT_TRUE(mock_stream->public_getPendingBackfill())
+ << "pendingBackfill is not true";
+ mock_stream->completeBackfill();
+ EXPECT_TRUE(mock_stream->public_isBackfillTaskRunning())
+ << "isBackfillRunning is not true";
+
+ // Required to ensure that the backfillMgr is deleted
+ producer->closeAllStreams();
+}
+
/* Regression / reproducer test for MB-19695 - an exception is thrown
* (and connection disconnected) if a couchstore file hasn't been re-created
* yet when doTapVbTakeoverStats() is called as part of
#include "replicationthrottle.h"
#include "programs/engine_testapp/mock_server.h"
+
+#include <chrono>
#include <platform/dirutils.h>
#include <thread>
return item;
}
+void EventuallyPersistentStoreTest::flush_vbucket_to_disk(uint16_t vbid) {
+ int result;
+ const auto time_limit = std::chrono::seconds(10);
+ const auto deadline = std::chrono::steady_clock::now() + time_limit;
+
+ // Need to retry as warmup may not have completed.
+ bool flush_successful = false;
+ do {
+ result = store->flushVBucket(vbid);
+ if (result != RETRY_FLUSH_VBUCKET) {
+ flush_successful = true;
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::microseconds(100));
+ } while (std::chrono::steady_clock::now() < deadline);
+
+ ASSERT_TRUE(flush_successful)
+ << "Hit timeout (" << time_limit.count() << " seconds) waiting for "
+ "warmup to complete while flushing VBucket.";
+
+ ASSERT_EQ(1, result) << "Failed to flush the one item we have stored.";
+
+ /**
+ * Although a flushVBucket writes the item to the underlying store,
+ * the item is not marked clean until an explicit commit is called
+ * If the underlying store is couchstore, a commit is called with
+ * a flushVBucket but in the case of forestdb, a commit is not
+ * always called, hence call an explicit commit.
+ */
+ uint16_t numShards = store->getVbMap().getNumShards();
+
+ store->commit(vbid % numShards);
+}
+
// Verify that when handling a bucket delete with open DCP
// connections, we don't deadlock when notifying the front-end
Item store_item(uint16_t vbid, const std::string& key,
const std::string& value);
+ /* Flush the given vbucket to disk, so any outstanding dirty items are
+ * written (and are clean).
+ */
+ void flush_vbucket_to_disk(uint16_t vbid);
+
static const char test_dbname[];
std::string config_string;