Instead of loading the entire Access log into memory, and then
applying in one (potentially very large) getMulti; process it in
warmup_batch_size chunks (default 10,000 items).
This sigificantly reduces the amount of temporary memory consumed
during warmup, which in turn means this memory can be used for
document values.
Results: in the following workload we manage to load 19.2M items (or
4.2GB) into memory during warmup, where previously only two items
(yes, *two*) were warmed up due to the size of the temporary data
structures - because the temporary data structures consumed ~4.2GB.
* 1 bucket, 10,000MB quota, 1 node.
* 30M items, 300bytes each; giving a residency ratio of ~60%. Dataset
generated by:
cbc-pillowfight -U couchbase://localhost:12000 -I
30000000 -m 300 -M 300 -t16
Change-Id: I511b70d5ea9c9c6b9556249a936030a67bf70c02
Reviewed-on: http://review.couchbase.org/68045
Reviewed-by: Daniel Owen <owend@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
"type": "bool"
},
"warmup_batch_size": {
- "default": "1000",
+ "default": "10000",
"descr": "The size of each batch loaded during warmup.",
"dynamic": false,
"type": "size_t",
"validator": {
"range": {
- "max": 10000,
+ "max": 100000000,
"min": 1
}
}
eviction_policy = FULL_EVICTION;
}
- warmupTask = new Warmup(*this);
+ warmupTask = new Warmup(*this, config);
}
bool EventuallyPersistentStore::initialize() {
return clean;
}
+MutationLog::iterator MutationLogHarvester::loadBatch(
+ const MutationLog::iterator& start, size_t limit) {
+ if (limit == 0) {
+ limit = std::numeric_limits<size_t>::max();
+ }
+ auto it = start;
+ size_t count = 0;
+ committed.clear();
+ for (; it != mlog.end() && count < limit; ++it) {
+ const auto* le = *it;
+ ++itemsSeen[le->type()];
+
+ switch (le->type()) {
+ case ML_NEW:
+ if (vbid_set.find(le->vbucket()) != vbid_set.end()) {
+ committed[le->vbucket()].emplace(le->key());
+ count++;
+ }
+ break;
+
+ case ML_COMMIT2:
+ // We ignore COMMIT2 for Access log, was only relevent to the
+ // 'proper' mutation log.
+ break;
+
+ default:
+ // Just ignore anything else also.
+ break;
+ }
+ }
+ return it;
+}
+
void MutationLogHarvester::apply(void *arg, mlCallback mlc) {
for (const uint16_t vb : vbid_set) {
for (const auto& key : committed[vb]) {
bool load();
/**
+ * Load a batch of entries from the file, starting from the given iterator.
+ * Loaded entries are inserted into `committed`, which is cleared at the
+ * start of each call.
+ *
+ * @param start Iterator of where to start loading from.
+ * @param limit Limit of now many entries should be loaded. Zero means no
+ * limit.
+ * @return iterator of where to resume in the log (if the end was not
+ * reached), or MutationLog::iterator::end().
+ */
+ MutationLog::iterator loadBatch(const MutationLog::iterator& start,
+ size_t limit);
+
+ /**
* Apply the processed log entries through the given function.
*/
void apply(void *arg, mlCallback mlc);
//////////////////////////////////////////////////////////////////////////////
-Warmup::Warmup(EventuallyPersistentStore& st)
+Warmup::Warmup(EventuallyPersistentStore& st, Configuration& config_)
: state(),
store(st),
+ config(config_),
startTime(0),
metadata(0),
warmup(0),
harvester.setVBucket(it->first);
}
- hrtime_t st = gethrtime();
- if (!harvester.load()) {
- return -1;
- }
- hrtime_t end = gethrtime();
+ // To constrain the number of elements from the access log we have to keep
+ // alive (there may be millions of items per-vBucket), process it
+ // a batch at a time.
+ hrtime_t log_load_duration{};
+ hrtime_t log_apply_duration{};
+ WarmupCookie cookie(&store, cb);
+
+ auto alog_iter = lf.begin();
+ do {
+ // Load a chunk of the access log file
+ hrtime_t start = gethrtime();
+ alog_iter = harvester.loadBatch(alog_iter, config.getWarmupBatchSize());
+ log_load_duration += (gethrtime() - start);
+
+ // .. then apply it to the store.
+ hrtime_t apply_start = gethrtime();
+ if (store.multiBGFetchEnabled()) {
+ harvester.apply(&cookie, &batchWarmupCallback);
+ } else {
+ harvester.apply(&cookie, &warmupCallback);
+ }
+ log_apply_duration += (gethrtime() - apply_start);
+ } while (alog_iter != lf.end());
size_t total = harvester.total();
setEstimatedWarmupCount(total);
LOG(EXTENSION_LOG_DEBUG, "Completed log read in %s with %ld entries",
- hrtime2text(end - st).c_str(), total);
+ hrtime2text(log_load_duration).c_str(), total);
- st = gethrtime();
- WarmupCookie cookie(&store, cb);
- if (store.multiBGFetchEnabled()) {
- harvester.apply(&cookie, &batchWarmupCallback);
- } else {
- harvester.apply(&cookie, &warmupCallback);
- }
- end = gethrtime();
LOG(EXTENSION_LOG_DEBUG,
"Populated log in %s with(l: %ld, s: %ld, e: %ld)",
- hrtime2text(end - st).c_str(), cookie.loaded, cookie.skipped,
+ hrtime2text(log_apply_duration).c_str(), cookie.loaded, cookie.skipped,
cookie.error);
+
return cookie.loaded;
}
class Warmup {
public:
- Warmup(EventuallyPersistentStore& st);
+ Warmup(EventuallyPersistentStore& st, Configuration& config);
void addToTaskSet(size_t taskId);
void removeFromTaskSet(size_t taskId);
WarmupState state;
EventuallyPersistentStore& store;
+ Configuration& config;
// Unordered set to hold the current executing tasks
Mutex taskSetMutex;
EXPECT_EQ(5, count);
}
+TEST_F(MutationLogTest, BatchLoad) {
+ remove(tmp_log_filename.c_str());
+
+ {
+ MutationLog ml(tmp_log_filename.c_str());
+ ml.open();
+
+ // Add a number of items, then check that batch load only returns
+ // the requested number.
+ for (size_t ii = 0; ii < 10; ii++) {
+ ml.newItem(ii % 2, std::string("key") + std::to_string(ii), ii);
+ }
+ ml.commit1();
+ ml.commit2();
+
+ EXPECT_EQ(10, ml.itemsLogged[ML_NEW]);
+ EXPECT_EQ(1, ml.itemsLogged[ML_COMMIT1]);
+ EXPECT_EQ(1, ml.itemsLogged[ML_COMMIT2]);
+ }
+
+ {
+ MutationLog ml(tmp_log_filename.c_str());
+ ml.open();
+ MutationLogHarvester h(ml);
+ h.setVBucket(0);
+ h.setVBucket(1);
+
+ // Ask for 2 items, ensure we get just two.
+ auto next_it = h.loadBatch(ml.begin(), 2);
+ EXPECT_NE(next_it, ml.end());
+
+ std::map<std::string, uint64_t> maps[2];
+ h.apply(&maps, loaderFun);
+ EXPECT_EQ(2, maps[0].size() + maps[1].size());
+
+ // Ask for 10; should get the remainder (8).
+ next_it = h.loadBatch(next_it, 10);
+ cb_assert(next_it == ml.end());
+
+ for (auto& map : maps) {
+ map.clear();
+ }
+ h.apply(&maps, loaderFun);
+ EXPECT_EQ(8, maps[0].size() + maps[1].size());
+ }
+}
+
// @todo
// Test Read Only log
// Test close / open / close / open