buffer.bytesRead -= bytes;
if (buffer.full) {
- uint32_t bufferSize = buffer.bytesRead;
- bool canFitNext = buffer.maxBytes - bufferSize >= buffer.nextReadSize;
- bool enoughCleared = bufferSize < (buffer.maxBytes * 3 / 4);
+ /* We can have buffer.bytesRead > buffer.maxBytes */
+ size_t unfilledBufferSize = (buffer.maxBytes > buffer.bytesRead)
+ ? buffer.maxBytes - buffer.bytesRead
+ : buffer.maxBytes;
+
+ /* If buffer.bytesRead == 0 we want to fit the next read into the
+ backfill buffer irrespective of its size */
+ bool canFitNext = (buffer.bytesRead == 0) ||
+ (unfilledBufferSize >= buffer.nextReadSize);
+
+ /* <= implicitly takes care of the case where
+ buffer.bytesRead == (buffer.maxBytes * 3 / 4) == 0 */
+ bool enoughCleared = buffer.bytesRead <= (buffer.maxBytes * 3 / 4);
if (canFitNext && enoughCleared) {
buffer.nextReadSize = 0;
buffer.full = false;
public:
BackfillManager(EventuallyPersistentEngine& e);
- ~BackfillManager();
+ virtual ~BackfillManager();
void addStats(connection_t conn, ADD_STAT add_stat, const void *c);
void wakeUpTask();
+protected:
+ //! The buffer is the total bytes used by all backfills for this connection
+ struct {
+ size_t bytesRead;
+ size_t maxBytes;
+ size_t nextReadSize;
+ bool full;
+ } buffer;
+
private:
void moveToActiveQueue();
size_t maxBytes;
size_t maxItems;
} scanBuffer;
-
- //! The buffer is the total bytes used by all backfills for this connection
- struct {
- size_t bytesRead;
- size_t maxBytes;
- size_t nextReadSize;
- bool full;
- } buffer;
};
#endif // SRC_DCP_BACKFILL_MANAGER_H_
*/
std::atomic<size_t> backfillRemaining;
+ DcpResponse* backfillPhase(std::lock_guard<std::mutex>& lh);
+
private:
DcpResponse* next(std::lock_guard<std::mutex>& lh);
- DcpResponse* backfillPhase(std::lock_guard<std::mutex>& lh);
-
DcpResponse* inMemoryPhase();
DcpResponse* takeoverSendPhase();
return SUCCESS;
}
+static enum test_result test_dcp_producer_disk_backfill_buffer_limits(
+ ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+ const int num_items = 3;
+ write_items(h, h1, num_items);
+
+ wait_for_flusher_to_settle(h, h1);
+ verify_curr_items(h, h1, num_items, "Wrong amount of items");
+
+ /* Wait for the checkpoint to be removed so that upon DCP connection
+ backfill is scheduled */
+ wait_for_stat_to_be(h, h1, "ep_items_rm_from_checkpoints", num_items);
+
+ const void *cookie = testHarness.create_cookie();
+
+ DcpStreamCtx ctx;
+ ctx.vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+ ctx.seqno = {0, num_items};
+ ctx.exp_mutations = 3;
+ ctx.exp_markers = 1;
+
+ TestDcpConsumer tdc("unittest", cookie);
+ tdc.addStreamCtx(ctx);
+ tdc.run(h, h1);
+
+ testHarness.destroy_cookie(cookie);
+
+ return SUCCESS;
+}
+
static enum test_result test_dcp_producer_stream_req_mem(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
const int num_items = 300, batch_items = 100;
"dcp_scan_item_limit=100;dcp_scan_byte_limit=100",
prepare,
cleanup),
+ TestCase("test producer disk backfill buffer limits",
+ test_dcp_producer_disk_backfill_buffer_limits,
+ test_setup,
+ teardown,
+ /* Set buffer size to a very low value (less than the size
+ of a mutation) */
+ "dcp_backfill_byte_limit=1;chk_remover_stime=1;"
+ "chk_max_items=3",
+ prepare,
+ cleanup),
TestCase("test producer stream request (memory only)",
test_dcp_producer_stream_req_mem, test_setup, teardown,
"chk_remover_stime=1;chk_max_items=100", prepare, cleanup),
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2017 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "dcp/backfill-manager.h"
+
+/*
+ * Mock of the BackfillManager class. Wraps the real BackfillManager, but
+ * exposes normally protected methods publically for test purposes.
+ */
+class MockDcpBackfillManager : public BackfillManager {
+public:
+ MockDcpBackfillManager(EventuallyPersistentEngine& theEngine)
+ : BackfillManager(theEngine) {
+ }
+
+ void setBackfillBufferSize(size_t newSize) {
+ buffer.maxBytes = newSize;
+ }
+
+ bool getBackfillBufferFullStatus() {
+ return buffer.full;
+ }
+};
#include "dcp/producer.h"
#include "dcp/stream.h"
+#include "mock_dcp_backfill_mgr.h"
/*
* Mock of the DcpProducer class. Wraps the real DcpProducer, but exposes
DcpProducer::MutationType::KeyAndValue)
: DcpProducer(theEngine, cookie, name,
isNotifier, startTask, mutationType) {
+ backfillMgr.reset(new MockDcpBackfillManager(engine_));
}
ENGINE_ERROR_CODE maybeDisconnect() {
}
return {};
}
+
+ /**
+ * Sets the backfill buffer size (max limit) to a particular value
+ */
+ void setBackfillBufferSize(size_t newSize) {
+ dynamic_cast<MockDcpBackfillManager*>(backfillMgr.get())
+ ->setBackfillBufferSize(newSize);
+ }
+
+ bool getBackfillBufferFullStatus() {
+ return dynamic_cast<MockDcpBackfillManager*>(backfillMgr.get())
+ ->getBackfillBufferFullStatus();
+ }
};
usleep(10);
}
}
+
+ /**
+ * Consumes numItems from the stream readyQ
+ */
+ void consumeBackfillItems(int numItems) {
+ std::lock_guard<std::mutex> lh(streamMutex);
+ for (int items = 0; items < numItems;) {
+ auto resp = backfillPhase(lh);
+ if (resp) {
+ delete resp;
+ ++items;
+ }
+ }
+ }
};
/* Mock of the PassiveStream class. Wraps the real PassiveStream, but exposes
/* Wait for removal of the old checkpoint, this also would imply that the
items are persisted (in case of persistent buckets) */
- bool new_ckpt_created;
- std::chrono::microseconds uSleepTime(128);
- while (static_cast<size_t>(numItems) !=
- ckpt_mgr.removeClosedUnrefCheckpoints(*vb0, new_ckpt_created)) {
- uSleepTime = decayingSleep(uSleepTime);
+ {
+ bool new_ckpt_created;
+ std::chrono::microseconds uSleepTime(128);
+ while (static_cast<size_t>(numItems) !=
+ ckpt_mgr.removeClosedUnrefCheckpoints(*vb0, new_ckpt_created)) {
+ uSleepTime = decayingSleep(uSleepTime);
+ }
}
/* Set up a DCP stream for the backfill */
mock_stream->transitionStateToBackfilling();
/* Wait for the backfill task to complete */
- std::chrono::microseconds uSleepTime2(128);
- while (numItems != mock_stream->getLastReadSeqno()) {
- uSleepTime2 = decayingSleep(uSleepTime2);
+ {
+ std::chrono::microseconds uSleepTime(128);
+ while (numItems != mock_stream->getLastReadSeqno()) {
+ uSleepTime = decayingSleep(uSleepTime);
+ }
}
/* Verify that all items are read in the backfill */
mock_stream->waitForStreamClose();
}
+/* Stream items from a DCP backfill with very small backfill buffer.
+ However small the backfill buffer is, backfill must not stop, it must
+ proceed to completion eventually */
+TEST_P(StreamTest, BackfillSmallBuffer) {
+ if (bucketType == "ephemeral") {
+ /* Ephemeral buckets is not memory managed for now. Will be memory
+ managed soon and then this test will be enabled */
+ return;
+ }
+
+ /* Add 2 items */
+ int numItems = 2;
+ for (int i = 0; i < numItems; ++i) {
+ std::string key("key" + std::to_string(i));
+ store_item(vbid, key, "value");
+ }
+
+ /* Create new checkpoint so that we can remove the current checkpoint
+ and force a backfill in the DCP stream */
+ auto& ckpt_mgr = vb0->checkpointManager;
+ ckpt_mgr.createNewCheckpoint();
+
+ /* Wait for removal of the old checkpoint, this also would imply that the
+ items are persisted (in case of persistent buckets) */
+ {
+ bool new_ckpt_created;
+ std::chrono::microseconds uSleepTime(128);
+ while (static_cast<size_t>(numItems) !=
+ ckpt_mgr.removeClosedUnrefCheckpoints(*vb0, new_ckpt_created)) {
+ uSleepTime = decayingSleep(uSleepTime);
+ }
+ }
+
+ /* Set up a DCP stream for the backfill */
+ setup_dcp_stream();
+ MockActiveStream* mock_stream =
+ dynamic_cast<MockActiveStream*>(stream.get());
+
+ /* set the DCP backfill buffer size to a value that is smaller than the
+ size of a mutation */
+ MockDcpProducer* mock_producer =
+ dynamic_cast<MockDcpProducer*>(producer.get());
+ mock_producer->setBackfillBufferSize(1);
+
+ /* We want the backfill task to run in a background thread */
+ ExecutorPool::get()->setNumAuxIO(1);
+ mock_stream->transitionStateToBackfilling();
+
+ /* Backfill can only read 1 as its buffer will become full after that */
+ {
+ std::chrono::microseconds uSleepTime(128);
+ while ((numItems - 1) != mock_stream->getLastReadSeqno()) {
+ uSleepTime = decayingSleep(uSleepTime);
+ }
+ }
+
+ /* Consume the backfill item(s) */
+ mock_stream->consumeBackfillItems(/*snapshot*/ 1 + /*mutation*/ 1);
+
+ /* We should see that buffer full status must be false as we have read
+ the item in the backfill buffer */
+ EXPECT_FALSE(mock_producer->getBackfillBufferFullStatus());
+
+ /* Finish up with the backilling of the remaining item */
+ {
+ std::chrono::microseconds uSleepTime(128);
+ while (numItems != mock_stream->getLastReadSeqno()) {
+ uSleepTime = decayingSleep(uSleepTime);
+ }
+ }
+
+ /* Read the other item */
+ mock_stream->consumeBackfillItems(1);
+}
+
class ConnectionTest : public DCPTest {
protected:
ENGINE_ERROR_CODE set_vb_state(uint16_t vbid, vbucket_state_t state) {