MB-20623: Warmup: Process the access log in chunks 45/68045/7
authorDave Rigby <daver@couchbase.com>
Tue, 23 Aug 2016 14:28:07 +0000 (15:28 +0100)
committerDave Rigby <daver@couchbase.com>
Fri, 30 Sep 2016 15:39:25 +0000 (15:39 +0000)
Instead of loading the entire Access log into memory, and then
applying in one (potentially very large) getMulti; process it in
warmup_batch_size chunks (default 10,000 items).

This sigificantly reduces the amount of temporary memory consumed
during warmup, which in turn means this memory can be used for
document values.

Results: in the following workload we manage to load 19.2M items (or
4.2GB) into memory during warmup, where previously only two items
(yes, *two*) were warmed up due to the size of the temporary data
structures - because the temporary data structures consumed ~4.2GB.

* 1 bucket, 10,000MB quota, 1 node.
* 30M items, 300bytes each; giving a residency ratio of ~60%. Dataset
  generated by:

    cbc-pillowfight -U couchbase://localhost:12000 -I 30000000 -m 300 -M 300 -t16

Change-Id: I511b70d5ea9c9c6b9556249a936030a67bf70c02
Reviewed-on: http://review.couchbase.org/68045
Reviewed-by: Daniel Owen <owend@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
configuration.json
src/ep.cc
src/mutation_log.cc
src/mutation_log.h
src/warmup.cc
src/warmup.h
tests/module_tests/mutation_log_test.cc

index 8f0873b..95759b6 100644 (file)
             "type": "bool"
         },
         "warmup_batch_size": {
-            "default": "1000",
+            "default": "10000",
             "descr": "The size of each batch loaded during warmup.",
             "dynamic": false,
             "type": "size_t",
             "validator": {
                 "range": {
-                    "max": 10000,
+                    "max": 100000000,
                     "min": 1
                 }
             }
index 7e8b198..09d612f 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -396,7 +396,7 @@ EventuallyPersistentStore::EventuallyPersistentStore(
         eviction_policy = FULL_EVICTION;
     }
 
-    warmupTask = new Warmup(*this);
+    warmupTask = new Warmup(*this, config);
 }
 
 bool EventuallyPersistentStore::initialize() {
index c0cfbd5..efe16f6 100644 (file)
@@ -925,6 +925,39 @@ bool MutationLogHarvester::load() {
     return clean;
 }
 
+MutationLog::iterator MutationLogHarvester::loadBatch(
+        const MutationLog::iterator& start, size_t limit) {
+    if (limit == 0) {
+        limit = std::numeric_limits<size_t>::max();
+    }
+    auto it = start;
+    size_t count = 0;
+    committed.clear();
+    for (; it != mlog.end() && count < limit; ++it) {
+        const auto* le = *it;
+        ++itemsSeen[le->type()];
+
+        switch (le->type()) {
+        case ML_NEW:
+            if (vbid_set.find(le->vbucket()) != vbid_set.end()) {
+                committed[le->vbucket()].emplace(le->key());
+                count++;
+            }
+            break;
+
+        case ML_COMMIT2:
+            // We ignore COMMIT2 for Access log, was only relevent to the
+            // 'proper' mutation log.
+            break;
+
+        default:
+            // Just ignore anything else also.
+            break;
+        }
+    }
+    return it;
+}
+
 void MutationLogHarvester::apply(void *arg, mlCallback mlc) {
     for (const uint16_t vb : vbid_set) {
         for (const auto& key : committed[vb]) {
index d3e2638..d88a1c5 100644 (file)
@@ -611,6 +611,20 @@ public:
     bool load();
 
     /**
+     * Load a batch of entries from the file, starting from the given iterator.
+     * Loaded entries are inserted into `committed`, which is cleared at the
+     * start of each call.
+     *
+     * @param start Iterator of where to start loading from.
+     * @param limit Limit of now many entries should be loaded. Zero means no
+     *              limit.
+     * @return iterator of where to resume in the log (if the end was not
+     *         reached), or MutationLog::iterator::end().
+     */
+    MutationLog::iterator loadBatch(const MutationLog::iterator& start,
+                                        size_t limit);
+
+    /**
      * Apply the processed log entries through the given function.
      */
     void apply(void *arg, mlCallback mlc);
index d74c787..6d1c0e8 100644 (file)
@@ -356,9 +356,10 @@ void LoadValueCallback::callback(CacheLookup &lookup)
 //////////////////////////////////////////////////////////////////////////////
 
 
-Warmup::Warmup(EventuallyPersistentStore& st)
+Warmup::Warmup(EventuallyPersistentStore& st, Configuration& config_)
     : state(),
       store(st),
+      config(config_),
       startTime(0),
       metadata(0),
       warmup(0),
@@ -734,29 +735,40 @@ size_t Warmup::doWarmup(MutationLog &lf, const std::map<uint16_t,
         harvester.setVBucket(it->first);
     }
 
-    hrtime_t st = gethrtime();
-    if (!harvester.load()) {
-        return -1;
-    }
-    hrtime_t end = gethrtime();
+    // To constrain the number of elements from the access log we have to keep
+    // alive (there may be millions of items per-vBucket), process it
+    // a batch at a time.
+    hrtime_t log_load_duration{};
+    hrtime_t log_apply_duration{};
+    WarmupCookie cookie(&store, cb);
+
+    auto alog_iter = lf.begin();
+    do {
+        // Load a chunk of the access log file
+        hrtime_t start = gethrtime();
+        alog_iter = harvester.loadBatch(alog_iter, config.getWarmupBatchSize());
+        log_load_duration += (gethrtime() - start);
+
+        // .. then apply it to the store.
+        hrtime_t apply_start = gethrtime();
+        if (store.multiBGFetchEnabled()) {
+            harvester.apply(&cookie, &batchWarmupCallback);
+        } else {
+            harvester.apply(&cookie, &warmupCallback);
+        }
+        log_apply_duration += (gethrtime() - apply_start);
+    } while (alog_iter != lf.end());
 
     size_t total = harvester.total();
     setEstimatedWarmupCount(total);
     LOG(EXTENSION_LOG_DEBUG, "Completed log read in %s with %ld entries",
-        hrtime2text(end - st).c_str(), total);
+        hrtime2text(log_load_duration).c_str(), total);
 
-    st = gethrtime();
-    WarmupCookie cookie(&store, cb);
-    if (store.multiBGFetchEnabled()) {
-        harvester.apply(&cookie, &batchWarmupCallback);
-    } else {
-        harvester.apply(&cookie, &warmupCallback);
-    }
-    end = gethrtime();
     LOG(EXTENSION_LOG_DEBUG,
         "Populated log in %s with(l: %ld, s: %ld, e: %ld)",
-        hrtime2text(end - st).c_str(), cookie.loaded, cookie.skipped,
+        hrtime2text(log_apply_duration).c_str(), cookie.loaded, cookie.skipped,
         cookie.error);
+
     return cookie.loaded;
 }
 
index 9c665b1..f44130f 100644 (file)
@@ -115,7 +115,7 @@ private:
 
 class Warmup {
 public:
-    Warmup(EventuallyPersistentStore& st);
+    Warmup(EventuallyPersistentStore& st, Configuration& config);
 
     void addToTaskSet(size_t taskId);
     void removeFromTaskSet(size_t taskId);
@@ -191,6 +191,7 @@ private:
     WarmupState state;
 
     EventuallyPersistentStore& store;
+    Configuration& config;
 
     // Unordered set to hold the current executing tasks
     Mutex taskSetMutex;
index a82730c..7e195a3 100644 (file)
@@ -458,6 +458,53 @@ TEST_F(MutationLogTest, Iterator) {
     EXPECT_EQ(5, count);
 }
 
+TEST_F(MutationLogTest, BatchLoad) {
+    remove(tmp_log_filename.c_str());
+
+    {
+        MutationLog ml(tmp_log_filename.c_str());
+        ml.open();
+
+        // Add a number of items, then check that batch load only returns
+        // the requested number.
+        for (size_t ii = 0; ii < 10; ii++) {
+            ml.newItem(ii % 2, std::string("key") + std::to_string(ii), ii);
+        }
+        ml.commit1();
+        ml.commit2();
+
+        EXPECT_EQ(10, ml.itemsLogged[ML_NEW]);
+        EXPECT_EQ(1, ml.itemsLogged[ML_COMMIT1]);
+        EXPECT_EQ(1, ml.itemsLogged[ML_COMMIT2]);
+    }
+
+    {
+        MutationLog ml(tmp_log_filename.c_str());
+        ml.open();
+        MutationLogHarvester h(ml);
+        h.setVBucket(0);
+        h.setVBucket(1);
+
+        // Ask for 2 items, ensure we get just two.
+        auto next_it = h.loadBatch(ml.begin(), 2);
+        EXPECT_NE(next_it, ml.end());
+
+        std::map<std::string, uint64_t> maps[2];
+        h.apply(&maps, loaderFun);
+        EXPECT_EQ(2, maps[0].size() + maps[1].size());
+
+        // Ask for 10; should get the remainder (8).
+        next_it = h.loadBatch(next_it, 10);
+        cb_assert(next_it == ml.end());
+
+        for (auto& map : maps) {
+            map.clear();
+        }
+        h.apply(&maps, loaderFun);
+        EXPECT_EQ(8, maps[0].size() + maps[1].size());
+    }
+}
+
 // @todo
 //   Test Read Only log
 //   Test close / open / close / open