MB-20852 [6/N]: Simplify {start,stop}Flusher & flushVBucket, move to C++11 90/67890/8
authorDave Rigby <daver@couchbase.com>
Thu, 29 Sep 2016 14:24:38 +0000 (15:24 +0100)
committerDave Rigby <daver@couchbase.com>
Mon, 31 Oct 2016 15:24:36 +0000 (15:24 +0000)
Move to more C++11 style, and add some comments as to what certain
members mean.

Change-Id: Ibdb900cfe63a9b1352fa1020f9dc58839e3e446d
Reviewed-on: http://review.couchbase.org/67890
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Trond Norbye <trond.norbye@gmail.com>
Reviewed-by: Jim Walker <jim@couchbase.com>
src/ep.cc
src/vbucket.h

index 023d383..843bc75 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -512,9 +512,8 @@ Warmup* EventuallyPersistentStore::getWarmup(void) const {
 }
 
 bool EventuallyPersistentStore::startFlusher() {
-    for (uint16_t i = 0; i < vbMap.shards.size(); ++i) {
-        Flusher *flusher = vbMap.shards[i]->getFlusher();
-        flusher->start();
+    for (auto* shard : vbMap.shards) {
+        shard->getFlusher()->start();
     }
     return true;
 }
@@ -3323,11 +3322,13 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
             vb->rejectQueue.pop();
         }
 
-        const std::string cursor(CheckpointManager::pCursorName);
+        // Append any 'backfill' items (mutations added by a TAP stream).
         vb->getBackfillItems(items);
 
+        // Append all items outstanding for the persistence cursor.
         snapshot_range_t range;
-        range = vb->checkpointManager.getAllItemsForCursor(cursor, items);
+        range = vb->checkpointManager.getAllItemsForCursor(
+                CheckpointManager::pCursorName, items);
 
         if (!items.empty()) {
             while (!rwUnderlying->begin()) {
@@ -3343,30 +3344,31 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
             uint64_t maxCas = 0;
             uint64_t maxDeletedRevSeqno = 0;
             std::list<PersistenceCallback*>& pcbs = rwUnderlying->getPersistenceCbList();
-            std::vector<queued_item>::iterator it = items.begin();
 
-            for(; it != items.end(); ++it) {
+            for (const auto& item : items) {
 
-                if (!(*it)->shouldPersist()) {
+                if (!item->shouldPersist()) {
                     continue;
-                } else if (!prev || prev->getKey() != (*it)->getKey()) {
-                    prev = (*it).get();
+                } else if (!prev || prev->getKey() != item->getKey()) {
+                    prev = item.get();
                     ++items_flushed;
-                    PersistenceCallback *cb = flushOneDelOrSet(*it, vb);
+                    PersistenceCallback *cb = flushOneDelOrSet(item, vb);
                     if (cb) {
                         pcbs.push_back(cb);
                     }
 
-                    maxSeqno = std::max(maxSeqno, (uint64_t)(*it)->getBySeqno());
-                    maxCas = std::max(maxCas, (uint64_t)(*it)->getCas());
-                    if ((*it)->isDeleted()) {
+                    maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
+                    maxCas = std::max(maxCas, item->getCas());
+                    if (item->isDeleted()) {
                         maxDeletedRevSeqno = std::max(maxDeletedRevSeqno,
-                                                      (uint64_t)(*it)->getRevSeqno());
+                                                      item->getRevSeqno());
                     }
                     ++stats.flusher_todo;
                 } else {
+                    // Item is the same key as the previous one - don't need
+                    // to flush to disk.
                     stats.decrDiskQueueSize(1);
-                    vb->doStatsForFlushing(*(*it), (*it)->size());
+                    vb->doStatsForFlushing(*item, item->size());
                 }
             }
 
index eabbebb..d3c54a2 100644 (file)
@@ -475,6 +475,9 @@ public:
 
     HashTable         ht;
     CheckpointManager checkpointManager;
+
+    // Struct for managing 'backfill' items - Items which have been added by
+    // an incoming TAP stream and need to be persisted to disk.
     struct {
         Mutex mutex;
         std::queue<queued_item> items;