MB-23734: Do memory mgmt across backfills in Ephemeral backfills 02/77402/5
authorManu Dhundi <manu@couchbase.com>
Wed, 26 Apr 2017 23:19:36 +0000 (16:19 -0700)
committerManu Dhundi <manu@couchbase.com>
Thu, 27 Apr 2017 17:59:35 +0000 (17:59 +0000)
In Ephemeral buckets we currently do not have backfill memory mgmt.
Mainly because upon increased memory usage by backfill items we cannot
easily pause the backfill midway because pausing a backfill will
increase the duplicate items in the ephemeral sequential data structure.

This commit adds memory mgmt across backfills (each vbucket is an
individual backfill). Upon full usage of the backfill buffer we stop
running other backfills until the backfill buffer is empty again.

However once a backfill starts it runs till completion even if its
memory usage goes beyond the buffer size.

Benefit: We will not run new backfills once backfill buffer is full.

Known limitation: We don't stop the currently running backfill even
                  if the backfill buffer is full.

We plan to address this limitation soon.

Change-Id: If5f77561a856b5001de159cd4655eb30c71e222c
Reviewed-on: http://review.couchbase.org/77402
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
src/dcp/backfill-manager.cc
src/dcp/backfill-manager.h
src/dcp/backfill_disk.cc
src/dcp/backfill_memory.cc
src/dcp/producer.cc
src/dcp/producer.h
src/dcp/stream.cc
src/dcp/stream.h
src/ephemeral_bucket.cc
tests/ep_testsuite_dcp.cc

index 1fa28d4..a9acc21 100644 (file)
@@ -162,7 +162,7 @@ void BackfillManager::schedule(VBucket& vb,
     ExecutorPool::get()->schedule(managerTask);
 }
 
-bool BackfillManager::bytesRead(size_t bytes) {
+bool BackfillManager::bytesCheckAndRead(size_t bytes) {
     LockHolder lh(lock);
     if (scanBuffer.itemsRead >= scanBuffer.maxItems) {
         return false;
@@ -192,6 +192,24 @@ bool BackfillManager::bytesRead(size_t bytes) {
     return true;
 }
 
+void BackfillManager::bytesForceRead(size_t bytes) {
+    LockHolder lh(lock);
+
+    /* Irrespective of the scan buffer usage and overall backfill buffer usage
+       we want to complete this backfill */
+    ++scanBuffer.itemsRead;
+    scanBuffer.bytesRead += bytes;
+    buffer.bytesRead += bytes;
+
+    if (buffer.bytesRead > buffer.maxBytes) {
+        /* Setting this flag prevents running other backfills and hence prevents
+           further increase in the memory usage.
+           Note: The current backfill will run to completion and that is desired
+                 here. */
+        buffer.full = true;
+    }
+}
+
 void BackfillManager::bytesSent(size_t bytes) {
     LockHolder lh(lock);
     if (bytes > buffer.bytesRead) {
index 391afb9..0132525 100644 (file)
@@ -68,7 +68,24 @@ public:
                   uint64_t start,
                   uint64_t end);
 
-    bool bytesRead(size_t bytes);
+    /**
+     * Checks if the read size can fit into the backfill buffer and scan
+     * buffer and reads only if the read can fit.
+     *
+     * @param bytes read size
+     *
+     * @return true upon read success
+     *         false if the buffer(s) is(are) full
+     */
+    bool bytesCheckAndRead(size_t bytes);
+
+    /**
+     * Reads the backfill item irrespective of whether backfill buffer or
+     * scan buffer is full.
+     *
+     * @param bytes read size
+     */
+    void bytesForceRead(size_t bytes);
 
     void bytesSent(size_t bytes);
 
index 2e3ba7c..31b2b28 100644 (file)
@@ -80,7 +80,8 @@ void CacheCallback::callback(CacheLookup& lookup) {
             return;
         }
         hbl.getHTLock().unlock();
-        if (!stream_->backfillReceived(std::move(it), BACKFILL_FROM_MEMORY)) {
+        if (!stream_->backfillReceived(
+                    std::move(it), BACKFILL_FROM_MEMORY, /*force*/ false)) {
             setStatus(ENGINE_ENOMEM); // Pause the backfill
         } else {
             setStatus(ENGINE_KEY_EEXISTS);
@@ -108,7 +109,8 @@ void DiskCallback::callback(GetValue& val) {
     }
 
     if (!stream_->backfillReceived(std::unique_ptr<Item>(val.getValue()),
-                                   BACKFILL_FROM_DISK)) {
+                                   BACKFILL_FROM_DISK,
+                                   /*force*/ false)) {
         setStatus(ENGINE_ENOMEM); // Pause the backfill
     } else {
         setStatus(ENGINE_SUCCESS);
index 29ec721..e9ef052 100644 (file)
@@ -91,7 +91,8 @@ backfill_status_t DCPBackfillMemory::run() {
 
     /* Move every item to the stream */
     for (auto& item : items) {
-        stream->backfillReceived(std::move(item), BACKFILL_FROM_MEMORY);
+        stream->backfillReceived(
+                std::move(item), BACKFILL_FROM_MEMORY, /*force*/ true);
     }
 
     /* Indicate completion to the stream */
index 411e0ea..84e279b 100644 (file)
@@ -783,8 +783,12 @@ void DcpProducer::notifyBackfillManager() {
     backfillMgr->wakeUpTask();
 }
 
-bool DcpProducer::recordBackfillManagerBytesRead(size_t bytes) {
-    return backfillMgr->bytesRead(bytes);
+bool DcpProducer::recordBackfillManagerBytesRead(size_t bytes, bool force) {
+    if (force) {
+        backfillMgr->bytesForceRead(bytes);
+        return true;
+    }
+    return backfillMgr->bytesCheckAndRead(bytes);
 }
 
 void DcpProducer::recordBackfillManagerBytesSent(size_t bytes) {
index 0c529ac..1bb97e3 100644 (file)
@@ -138,7 +138,7 @@ enum class MutationType {
     void notifyStreamReady(uint16_t vbucket);
 
     void notifyBackfillManager();
-    bool recordBackfillManagerBytesRead(size_t bytes);
+    bool recordBackfillManagerBytesRead(size_t bytes, bool force);
     void recordBackfillManagerBytesSent(size_t bytes);
     void scheduleBackfillManager(VBucket& vb,
                                  const active_stream_t& s,
index cf3204f..43bd23c 100644 (file)
@@ -419,7 +419,8 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
 }
 
 bool ActiveStream::backfillReceived(std::unique_ptr<Item> itm,
-                                    backfill_source_t backfill_source) {
+                                    backfill_source_t backfill_source,
+                                    bool force) {
     if (!itm) {
         return false;
     }
@@ -430,7 +431,7 @@ bool ActiveStream::backfillReceived(std::unique_ptr<Item> itm,
             queued_item qi(std::move(itm));
             std::unique_ptr<DcpResponse> resp(makeResponseFromItem(qi));
             if (!producer->recordBackfillManagerBytesRead(
-                        resp->getApproximateSize())) {
+                        resp->getApproximateSize(), force)) {
                 // Deleting resp may also delete itm (which is owned by resp)
                 resp.reset();
                 return false;
index d7646a2..63b8dfd 100644 (file)
@@ -249,7 +249,8 @@ public:
     void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
 
     bool backfillReceived(std::unique_ptr<Item> itm,
-                          backfill_source_t backfill_source);
+                          backfill_source_t backfill_source,
+                          bool force);
 
     void completeBackfill();
 
index 3ff1fdc..b9cff44 100644 (file)
@@ -209,17 +209,6 @@ void EphemeralBucket::reconfigureForEphemeral(Configuration& config) {
     config.setWarmup(false);
     // Disable TAP - not supported for Ephemeral.
     config.setTap(false);
-
-    // In-memory backfilling is currently not memory managed. Therefore set the
-    // scan buffer (per backfill buffer for managing backfill memory usage)
-    // byte limit, item limit to max. Otherwise, this may result in incomplete
-    // DCP backfills which may inturn cause hang or data loss in DCP clients
-    // like rebalance, xdcr etc.
-    config.setDcpScanByteLimit(std::numeric_limits<size_t>::max());
-    config.setDcpScanItemLimit(std::numeric_limits<size_t>::max());
-
-    // for the same reason as above set overall DCP backfill limit to max
-    config.setDcpBackfillByteLimit(std::numeric_limits<size_t>::max());
 }
 
 size_t EphemeralBucket::getNumPersistedDeletes(uint16_t vbid) {
index f489c6b..ec34669 100644 (file)
@@ -633,9 +633,13 @@ ENGINE_ERROR_CODE TestDcpConsumer::openStreams(ENGINE_HANDLE *h, ENGINE_HANDLE_V
             std::string stats_num_conn_cursors("vb_" +
                                                std::to_string(ctx.vbucket) +
                                                ":num_conn_cursors");
-            checkeq(1, get_int_stat(h, h1, stats_num_conn_cursors.c_str(),
-                                    "checkpoint"),
-                    "DCP cursors not expected to be registered");
+            /* In case of persistent buckets there will be 1 persistent cursor,
+               in case of ephemeral buckets there will be no cursor */
+            check(get_int_stat(h,
+                               h1,
+                               stats_num_conn_cursors.c_str(),
+                               "checkpoint") <= 1,
+                  "DCP cursors not expected to be registered");
         }
 
         // Init stats used in test
@@ -1965,12 +1969,22 @@ static enum test_result test_dcp_producer_disk_backfill_limits(ENGINE_HANDLE *h,
     tdc.addStreamCtx(ctx);
     tdc.run(h, h1);
 
-    /* Backfill task runs are expected as below:
-       once for backfill_state_init + once for backfill_state_completing +
-       once for backfill_state_done + once post all backfills are run finished.
-       Here since we have dcp_scan_byte_limit = 100, we expect the backfill task
-       to run additional 'num_items' during backfill_state_scanning state. */
-    uint64_t exp_backfill_task_runs = 4 + num_items;
+    uint64_t exp_backfill_task_runs;
+    if (isPersistentBucket(h, h1)) {
+        /* Backfill task runs are expected as below:
+           once for backfill_state_init + once for backfill_state_completing +
+           once for backfill_state_done + once post all backfills are run
+           finished. Here since we have dcp_scan_byte_limit = 100, we expect the
+           backfill task to run additional 'num_items' during
+           backfill_state_scanning state. */
+        exp_backfill_task_runs = 4 + num_items;
+    } else {
+        /* In case of ephemeral bucket there are no backfill states and no
+           limitations on memory used by a single backfill. Hence we expect
+           one backfill run for reading all items + one post all backfilling
+           is complete */
+        exp_backfill_task_runs = 2;
+    }
     checkeq(exp_backfill_task_runs,
             get_histo_stat(h, h1, "BackfillManagerTask", "runtimes",
                            Histo_stat_info::TOTAL_COUNT),
@@ -5943,18 +5957,12 @@ BaseTestCase testsuite_testcases[] = {
         TestCase("test producer stream request (disk only)",
                  test_dcp_producer_stream_req_diskonly, test_setup, teardown,
                  "chk_remover_stime=1;chk_max_items=100",
-                 /* [EPHE TODO]: the test uses DCP_ADD_STREAM_FLAG_DISKONLY flg;
-                    new test case to be written once we decide the intended
-                    effect of DCP_ADD_STREAM_FLAG_DISKONLY in ephemeral */
-                 prepare_skip_broken_under_ephemeral,
+                 prepare,
                  cleanup),
         TestCase("test producer disk backfill limits",
                  test_dcp_producer_disk_backfill_limits, test_setup, teardown,
                  "dcp_scan_item_limit=100;dcp_scan_byte_limit=100",
-                 /* [EPHE TODO]: the test validates the disk backfill task runs;
-                    new test case to be written once we decide the intended
-                    effect of DCP_ADD_STREAM_FLAG_DISKONLY in ephemeral */
-                 prepare_skip_broken_under_ephemeral,
+                 prepare,
                  cleanup),
         TestCase("test producer stream request (memory only)",
                  test_dcp_producer_stream_req_mem, test_setup, teardown,