MB-24089: Handle DCP backfill buffers smaller than item size correctly 04/77304/7
authorManu Dhundi <manu@couchbase.com>
Wed, 26 Apr 2017 00:53:16 +0000 (17:53 -0700)
committerManu Dhundi <manu@couchbase.com>
Fri, 28 Apr 2017 18:19:15 +0000 (18:19 +0000)
When items are drained from DCP readyQ, we free up the DCP backfill
buffer (that is, we decrement its memory usage). This commit addresses
2 bugs in that:
1. We never set the backfill buffer full status to false when the
   next read size is larger than max buffer size. This can result in
   backfill hangs with items larger than backfill buffer size.
2. We may never set the backfill buffer full status to false when
   buffer.bytesRead == (buffer.maxBytes * 3 / 4) == 0. In practice
   this may never happen as buffer.maxBytes is generally > 4 :).

This commit has fix for both 1 and 2. Also adds a functional test
and a unit test reproducing the bug 1.

Change-Id: Icf9512bbe6f21296374958b69cfbe851ec8873b3
Reviewed-on: http://review.couchbase.org/77304
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
src/dcp/backfill-manager.cc
src/dcp/backfill-manager.h
src/dcp/stream.h
tests/ep_testsuite_dcp.cc
tests/mock/mock_dcp_backfill_mgr.h [new file with mode: 0644]
tests/mock/mock_dcp_producer.h
tests/mock/mock_stream.h
tests/module_tests/dcp_test.cc

index a9acc21..0bf3802 100644 (file)
@@ -220,9 +220,19 @@ void BackfillManager::bytesSent(size_t bytes) {
     buffer.bytesRead -= bytes;
 
     if (buffer.full) {
-        uint32_t bufferSize = buffer.bytesRead;
-        bool canFitNext = buffer.maxBytes - bufferSize >= buffer.nextReadSize;
-        bool enoughCleared = bufferSize < (buffer.maxBytes * 3 / 4);
+        /* We can have buffer.bytesRead > buffer.maxBytes */
+        size_t unfilledBufferSize = (buffer.maxBytes > buffer.bytesRead)
+                                            ? buffer.maxBytes - buffer.bytesRead
+                                            : buffer.maxBytes;
+
+        /* If buffer.bytesRead == 0 we want to fit the next read into the
+           backfill buffer irrespective of its size */
+        bool canFitNext = (buffer.bytesRead == 0) ||
+                          (unfilledBufferSize >= buffer.nextReadSize);
+
+        /* <= implicitly takes care of the case where
+           buffer.bytesRead == (buffer.maxBytes * 3 / 4) == 0 */
+        bool enoughCleared = buffer.bytesRead <= (buffer.maxBytes * 3 / 4);
         if (canFitNext && enoughCleared) {
             buffer.nextReadSize = 0;
             buffer.full = false;
index 0132525..baf94b4 100644 (file)
@@ -59,7 +59,7 @@ class BackfillManager : public std::enable_shared_from_this<BackfillManager> {
 public:
     BackfillManager(EventuallyPersistentEngine& e);
 
-    ~BackfillManager();
+    virtual ~BackfillManager();
 
     void addStats(connection_t conn, ADD_STAT add_stat, const void *c);
 
@@ -95,6 +95,15 @@ public:
 
     void wakeUpTask();
 
+protected:
+    //! The buffer is the total bytes used by all backfills for this connection
+    struct {
+        size_t bytesRead;
+        size_t maxBytes;
+        size_t nextReadSize;
+        bool full;
+    } buffer;
+
 private:
 
     void moveToActiveQueue();
@@ -115,14 +124,6 @@ private:
         size_t maxBytes;
         size_t maxItems;
     } scanBuffer;
-
-    //! The buffer is the total bytes used by all backfills for this connection
-    struct {
-        size_t bytesRead;
-        size_t maxBytes;
-        size_t nextReadSize;
-        bool full;
-    } buffer;
 };
 
 #endif  // SRC_DCP_BACKFILL_MANAGER_H_
index 63b8dfd..4a0168a 100644 (file)
@@ -349,12 +349,12 @@ protected:
      */
     std::atomic<size_t> backfillRemaining;
 
+    DcpResponse* backfillPhase(std::lock_guard<std::mutex>& lh);
+
 private:
 
     DcpResponse* next(std::lock_guard<std::mutex>& lh);
 
-    DcpResponse* backfillPhase(std::lock_guard<std::mutex>& lh);
-
     DcpResponse* inMemoryPhase();
 
     DcpResponse* takeoverSendPhase();
index ec34669..ef2784a 100644 (file)
@@ -1995,6 +1995,35 @@ static enum test_result test_dcp_producer_disk_backfill_limits(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+static enum test_result test_dcp_producer_disk_backfill_buffer_limits(
+                                    ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+    const int num_items = 3;
+    write_items(h, h1, num_items);
+
+    wait_for_flusher_to_settle(h, h1);
+    verify_curr_items(h, h1, num_items, "Wrong amount of items");
+
+    /* Wait for the checkpoint to be removed so that upon DCP connection
+       backfill is scheduled */
+    wait_for_stat_to_be(h, h1, "ep_items_rm_from_checkpoints", num_items);
+
+    const void *cookie = testHarness.create_cookie();
+
+    DcpStreamCtx ctx;
+    ctx.vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    ctx.seqno = {0, num_items};
+    ctx.exp_mutations = 3;
+    ctx.exp_markers = 1;
+
+    TestDcpConsumer tdc("unittest", cookie);
+    tdc.addStreamCtx(ctx);
+    tdc.run(h, h1);
+
+    testHarness.destroy_cookie(cookie);
+
+    return SUCCESS;
+}
+
 static enum test_result test_dcp_producer_stream_req_mem(ENGINE_HANDLE *h,
                                                          ENGINE_HANDLE_V1 *h1) {
     const int num_items = 300, batch_items = 100;
@@ -5964,6 +5993,16 @@ BaseTestCase testsuite_testcases[] = {
                  "dcp_scan_item_limit=100;dcp_scan_byte_limit=100",
                  prepare,
                  cleanup),
+        TestCase("test producer disk backfill buffer limits",
+                 test_dcp_producer_disk_backfill_buffer_limits,
+                 test_setup,
+                 teardown,
+                 /* Set buffer size to a very low value (less than the size
+                    of a mutation) */
+                 "dcp_backfill_byte_limit=1;chk_remover_stime=1;"
+                 "chk_max_items=3",
+                 prepare,
+                 cleanup),
         TestCase("test producer stream request (memory only)",
                  test_dcp_producer_stream_req_mem, test_setup, teardown,
                  "chk_remover_stime=1;chk_max_items=100", prepare, cleanup),
diff --git a/tests/mock/mock_dcp_backfill_mgr.h b/tests/mock/mock_dcp_backfill_mgr.h
new file mode 100644 (file)
index 0000000..0c5b447
--- /dev/null
@@ -0,0 +1,39 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2017 Couchbase, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#pragma once
+
+#include "dcp/backfill-manager.h"
+
+/*
+ * Mock of the BackfillManager class.  Wraps the real BackfillManager, but
+ * exposes normally protected methods publically for test purposes.
+ */
+class MockDcpBackfillManager : public BackfillManager {
+public:
+    MockDcpBackfillManager(EventuallyPersistentEngine& theEngine)
+        : BackfillManager(theEngine) {
+    }
+
+    void setBackfillBufferSize(size_t newSize) {
+        buffer.maxBytes = newSize;
+    }
+
+    bool getBackfillBufferFullStatus() {
+        return buffer.full;
+    }
+};
index 0503ceb..36fd619 100644 (file)
@@ -19,6 +19,7 @@
 
 #include "dcp/producer.h"
 #include "dcp/stream.h"
+#include "mock_dcp_backfill_mgr.h"
 
 /*
  * Mock of the DcpProducer class.  Wraps the real DcpProducer, but exposes
@@ -35,6 +36,7 @@ public:
                             DcpProducer::MutationType::KeyAndValue)
         : DcpProducer(theEngine, cookie, name,
                       isNotifier, startTask, mutationType) {
+        backfillMgr.reset(new MockDcpBackfillManager(engine_));
     }
 
     ENGINE_ERROR_CODE maybeDisconnect() {
@@ -104,4 +106,17 @@ public:
         }
         return {};
     }
+
+    /**
+     * Sets the backfill buffer size (max limit) to a particular value
+     */
+    void setBackfillBufferSize(size_t newSize) {
+        dynamic_cast<MockDcpBackfillManager*>(backfillMgr.get())
+                ->setBackfillBufferSize(newSize);
+    }
+
+    bool getBackfillBufferFullStatus() {
+        return dynamic_cast<MockDcpBackfillManager*>(backfillMgr.get())
+                ->getBackfillBufferFullStatus();
+    }
 };
index ec89831..933b8d7 100644 (file)
@@ -111,6 +111,20 @@ public:
             usleep(10);
         }
     }
+
+    /**
+     * Consumes numItems from the stream readyQ
+     */
+    void consumeBackfillItems(int numItems) {
+        std::lock_guard<std::mutex> lh(streamMutex);
+        for (int items = 0; items < numItems;) {
+            auto resp = backfillPhase(lh);
+            if (resp) {
+                delete resp;
+                ++items;
+            }
+        }
+    }
 };
 
 /* Mock of the PassiveStream class. Wraps the real PassiveStream, but exposes
index d9d6e02..9661da0 100644 (file)
@@ -416,11 +416,13 @@ TEST_P(StreamTest, BackfillOnly) {
 
     /* Wait for removal of the old checkpoint, this also would imply that the
        items are persisted (in case of persistent buckets) */
-    bool new_ckpt_created;
-    std::chrono::microseconds uSleepTime(128);
-    while (static_cast<size_t>(numItems) !=
-           ckpt_mgr.removeClosedUnrefCheckpoints(*vb0, new_ckpt_created)) {
-        uSleepTime = decayingSleep(uSleepTime);
+    {
+        bool new_ckpt_created;
+        std::chrono::microseconds uSleepTime(128);
+        while (static_cast<size_t>(numItems) !=
+               ckpt_mgr.removeClosedUnrefCheckpoints(*vb0, new_ckpt_created)) {
+            uSleepTime = decayingSleep(uSleepTime);
+        }
     }
 
     /* Set up a DCP stream for the backfill */
@@ -433,9 +435,11 @@ TEST_P(StreamTest, BackfillOnly) {
     mock_stream->transitionStateToBackfilling();
 
     /* Wait for the backfill task to complete */
-    std::chrono::microseconds uSleepTime2(128);
-    while (numItems != mock_stream->getLastReadSeqno()) {
-        uSleepTime2 = decayingSleep(uSleepTime2);
+    {
+        std::chrono::microseconds uSleepTime(128);
+        while (numItems != mock_stream->getLastReadSeqno()) {
+            uSleepTime = decayingSleep(uSleepTime);
+        }
     }
 
     /* Verify that all items are read in the backfill */
@@ -483,6 +487,81 @@ TEST_P(StreamTest, BackfillFail) {
     mock_stream->waitForStreamClose();
 }
 
+/* Stream items from a DCP backfill with very small backfill buffer.
+   However small the backfill buffer is, backfill must not stop, it must
+   proceed to completion eventually */
+TEST_P(StreamTest, BackfillSmallBuffer) {
+    if (bucketType == "ephemeral") {
+        /* Ephemeral buckets is not memory managed for now. Will be memory
+           managed soon and then this test will be enabled */
+        return;
+    }
+
+    /* Add 2 items */
+    int numItems = 2;
+    for (int i = 0; i < numItems; ++i) {
+        std::string key("key" + std::to_string(i));
+        store_item(vbid, key, "value");
+    }
+
+    /* Create new checkpoint so that we can remove the current checkpoint
+       and force a backfill in the DCP stream */
+    auto& ckpt_mgr = vb0->checkpointManager;
+    ckpt_mgr.createNewCheckpoint();
+
+    /* Wait for removal of the old checkpoint, this also would imply that the
+       items are persisted (in case of persistent buckets) */
+    {
+        bool new_ckpt_created;
+        std::chrono::microseconds uSleepTime(128);
+        while (static_cast<size_t>(numItems) !=
+               ckpt_mgr.removeClosedUnrefCheckpoints(*vb0, new_ckpt_created)) {
+            uSleepTime = decayingSleep(uSleepTime);
+        }
+    }
+
+    /* Set up a DCP stream for the backfill */
+    setup_dcp_stream();
+    MockActiveStream* mock_stream =
+            dynamic_cast<MockActiveStream*>(stream.get());
+
+    /* set the DCP backfill buffer size to a value that is smaller than the
+       size of a mutation */
+    MockDcpProducer* mock_producer =
+            dynamic_cast<MockDcpProducer*>(producer.get());
+    mock_producer->setBackfillBufferSize(1);
+
+    /* We want the backfill task to run in a background thread */
+    ExecutorPool::get()->setNumAuxIO(1);
+    mock_stream->transitionStateToBackfilling();
+
+    /* Backfill can only read 1 as its buffer will become full after that */
+    {
+        std::chrono::microseconds uSleepTime(128);
+        while ((numItems - 1) != mock_stream->getLastReadSeqno()) {
+            uSleepTime = decayingSleep(uSleepTime);
+        }
+    }
+
+    /* Consume the backfill item(s) */
+    mock_stream->consumeBackfillItems(/*snapshot*/ 1 + /*mutation*/ 1);
+
+    /* We should see that buffer full status must be false as we have read
+       the item in the backfill buffer */
+    EXPECT_FALSE(mock_producer->getBackfillBufferFullStatus());
+
+    /* Finish up with the backilling of the remaining item */
+    {
+        std::chrono::microseconds uSleepTime(128);
+        while (numItems != mock_stream->getLastReadSeqno()) {
+            uSleepTime = decayingSleep(uSleepTime);
+        }
+    }
+
+    /* Read the other item */
+    mock_stream->consumeBackfillItems(1);
+}
+
 class ConnectionTest : public DCPTest {
 protected:
     ENGINE_ERROR_CODE set_vb_state(uint16_t vbid, vbucket_state_t state) {