MB-20852 [15/N]: Accurately track meta items within checkpoints 20/69020/8
authorDave Rigby <daver@couchbase.com>
Tue, 11 Oct 2016 16:14:03 +0000 (17:14 +0100)
committerDave Rigby <daver@couchbase.com>
Tue, 1 Nov 2016 07:56:58 +0000 (07:56 +0000)
Instead of assuming that a Checkpoint only contains 1 (Open) or 2
(Closed) meta-items, maintain a count of items within each Checkpoint,
and track how many meta-items a CheckpointCursor has read.

This allows us to support an arbitrary number of meta-items within a
Checkpoint, and in any sequence. This feature will be used to add
support for set_vbstate meta-items in a subsquent patches.

Change-Id: I8fb3040cbe64e316aae1f693afee65001b2b4b17
Reviewed-on: http://review.couchbase.org/69020
Reviewed-by: Trond Norbye <trond.norbye@gmail.com>
Tested-by: buildbot <build@couchbase.com>
src/checkpoint.cc
src/checkpoint.h
src/item.h

index 689abb7..0acc56f 100644 (file)
@@ -92,13 +92,18 @@ MustSendCheckpointEnd CheckpointCursor::shouldSendCheckpointEndMetaItem() const
     return sendCheckpointEndMetaItem;
 }
 
+size_t CheckpointCursor::getCurrentCkptMetaItemsRead() const {
+    return ckptMetaItemsRead;
+}
+
 std::ostream& operator<<(std::ostream& os, const CheckpointCursor& c) {
     os << "CheckpointCursor[" << &c << "] with"
        << " name:" << c.name
        << " currentCkpt:{id:" << (*c.currentCheckpoint)->getId()
        << " state:" << to_string((*c.currentCheckpoint)->getState())
        << "} currentPos:" << (*c.currentPos)->getBySeqno()
-       << " offset:" << c.offset.load();
+       << " offset:" << c.offset.load()
+       << " ckptMetaItemsRead:" << c.getCurrentCkptMetaItemsRead();
     return os;
 }
 
@@ -116,11 +121,7 @@ Checkpoint::~Checkpoint() {
 }
 
 size_t Checkpoint::getNumMetaItems() const {
-    if (checkpointState == CHECKPOINT_OPEN) {
-        return 1;
-    } else {
-        return 2;
-    }
+    return numMetaItems;
 }
 
 void Checkpoint::setState(checkpoint_state state) {
@@ -152,6 +153,12 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
 
     // Check if the item is a meta item
     if (qi->isCheckPointMetaItem()) {
+        // empty items act only as a dummy element for the start of the
+        // checkpoint (and are not read by clients), we do not include them in
+        // numMetaItems.
+        if (qi->isNonEmptyCheckpointMetaItem()) {
+            ++numMetaItems;
+        }
         rv = NEW_ITEM;
         toWrite.push_back(qi);
     } else {
@@ -441,6 +448,7 @@ bool CheckpointManager::addNewCheckpoint_UNLOCKED(uint64_t id,
             /* checkpoint_end meta item is expected by TAP cursors. Hence skip
                it only for persitence and DCP cursors */
             ++(cursor.offset);
+            cursor.incrMetaItemOffset(1);
             ++(cursor.currentPos); // cursor now reaches to the checkpoint end
         }
 
@@ -534,18 +542,27 @@ CursorRegResult CheckpointManager::registerCursorBySeqno(
         uint64_t st = (*itr)->getLowSeqno();
 
         if (startBySeqno < st) {
+            // Requested sequence number is before the start of this
+            // checkpoint, position cursor at the checkpoint start.
             connCursors[name] = CheckpointCursor(name, itr, (*itr)->begin(),
-                                                 skipped, false,
+                                                 skipped, /*meta_offset*/0,
+                                                 false,
                                                  needsCheckPointEndMetaItem);
             (*itr)->registerCursorName(name);
             result.first = (*itr)->getLowSeqno();
             break;
         } else if (startBySeqno <= en) {
+            // Requested sequence number lies within this checkpoint.
+            // Calculate which item to position the cursor at.
+            size_t ckpt_meta_skipped{0};
             std::list<queued_item>::iterator iitr = (*itr)->begin();
             while (++iitr != (*itr)->end() &&
                     (startBySeqno >=
                      static_cast<uint64_t>((*iitr)->getBySeqno()))) {
                 skipped++;
+                if ((*iitr)->isNonEmptyCheckpointMetaItem()) {
+                    ckpt_meta_skipped++;
+                }
             }
 
             if (iitr == (*itr)->end()) {
@@ -557,7 +574,7 @@ CursorRegResult CheckpointManager::registerCursorBySeqno(
             }
 
             connCursors[name] = CheckpointCursor(name, itr, iitr, skipped,
-                                                 false,
+                                                 ckpt_meta_skipped, false,
                                                  needsCheckPointEndMetaItem);
             (*itr)->registerCursorName(name);
             break;
@@ -641,19 +658,19 @@ bool CheckpointManager::registerCursor_UNLOCKED(
         }
 
         size_t offset = 0;
-        std::list<Checkpoint*>::iterator pos = checkpointList.begin();
-        for (; pos != it; ++pos) {
+        for (auto pos = checkpointList.begin(); pos != it; ++pos) {
             // Increment offset for all previous (closed) checkpoints, adding
             // in the meta items.
             offset += (*pos)->getNumItems() + (*pos)->getNumMetaItems();
         }
 
         connCursors[name] = CheckpointCursor(name, it, (*it)->begin(), offset,
+                                             /*meta_offset*/0,
                                              resetOnCollapse,
                                              needsCheckpointEndMetaItem);
         (*it)->registerCursorName(name);
     } else {
-        size_t offset = 0;
+        size_t offset = 0, meta_offset = 0;
         std::list<queued_item>::iterator curr;
 
         LOG(EXTENSION_LOG_DEBUG,
@@ -669,6 +686,7 @@ bool CheckpointManager::registerCursor_UNLOCKED(
             // its current position.
             curr = map_it->second.currentPos;
             offset = map_it->second.offset;
+            meta_offset = map_it->second.ckptMetaItemsRead;
         } else {
             // Set the cursor's position to the beginning of the checkpoint to
             // start with
@@ -682,6 +700,7 @@ bool CheckpointManager::registerCursor_UNLOCKED(
         }
 
         connCursors[name] = CheckpointCursor(name, it, curr, offset,
+                                             meta_offset,
                                              resetOnCollapse,
                                              needsCheckpointEndMetaItem);
         // Register the cursor's name to the checkpoint.
@@ -1154,6 +1173,9 @@ queued_item CheckpointManager::nextItem(const std::string &name,
 bool CheckpointManager::incrCursor(CheckpointCursor &cursor) {
     if (++(cursor.currentPos) != (*(cursor.currentCheckpoint))->end()) {
         ++(cursor.offset);
+        if ((*cursor.currentPos)->isNonEmptyCheckpointMetaItem()) {
+            cursor.incrMetaItemOffset(1);
+        }
         return true;
     } else if (!moveCursorToNextCheckpoint(cursor)) {
         --(cursor.currentPos);
@@ -1207,6 +1229,7 @@ void CheckpointManager::resetCursors(bool resetPersistenceCursor) {
         cit.second.currentCheckpoint = checkpointList.begin();
         cit.second.currentPos = checkpointList.front()->begin();
         cit.second.offset = 0;
+        cit.second.setMetaItemOffset(0);
         checkpointList.front()->registerCursorName(cit.second.name);
     }
 }
@@ -1239,6 +1262,10 @@ bool CheckpointManager::moveCursorToNextCheckpoint(CheckpointCursor &cursor) {
     cursor.currentPos = (*(cursor.currentCheckpoint))->begin();
     // Register the cursor's name to its new current checkpoint.
     (*(cursor.currentCheckpoint))->registerCursorName(cursor.name);
+
+    // Reset metaItemOffset as we're entering a new checkpoint.
+    cursor.setMetaItemOffset(0);
+
     return true;
 }
 
@@ -1293,36 +1320,24 @@ size_t CheckpointManager::getNumItemsForCursor_UNLOCKED(const std::string &name)
 size_t CheckpointManager::getNumOfMetaItemsFromCursor(const CheckpointCursor &cursor) const {
     // Get the number of meta items that can be skipped by a given cursor.
     size_t meta_items = 0;
-    for (auto curr_chk = cursor.currentCheckpoint;
-         curr_chk != checkpointList.end();
-         ++curr_chk) {
-
-        // If this is the cursor's current checkpoint, meta items remaining
-        // depends on how far along the current cursor position is.
-        if (curr_chk == cursor.currentCheckpoint) {
-            auto curr_pos = cursor.currentPos;
-            ++curr_pos;
-            if (curr_pos == (*curr_chk)->end()) {
-                // At the end of this checkpoint, no more meta items.
-                continue;
 
-            } else if ((*curr_pos)->getOperation() == queue_op::checkpoint_start) {
-                // At start marker.
-                meta_items += 1;
-            }
-
-            // Within a checkpoint, not at the end. Will be an additional item
-            // if this checkpoint is closed (checkpoint_end).
-            if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
-                meta_items += 1;
-            }
 
-        } else {
-            // Not current checkpoint, meta items is total for that checkpoint.
-            meta_items += (*curr_chk)->getNumMetaItems();
-        }
+    // For current checkpoint, number of meta item is the total meta items
+    // for this checkpoint minus how many the cursor has already processed.
+    std::list<Checkpoint*>::const_iterator ckpt_it = cursor.currentCheckpoint;
+    if (cursor.currentCheckpoint != checkpointList.end()) {
+        meta_items = (*cursor.currentCheckpoint)->getNumMetaItems() -
+                cursor.getCurrentCkptMetaItemsRead();
     }
-    return meta_items;
+
+    // For remaining checkpoint(s), number of meta items is simply the total
+    // meta items for that checkpoint.
+    ++ckpt_it;
+    auto result =  std::accumulate(ckpt_it, checkpointList.end(), meta_items,
+                           [&cursor](size_t a, const Checkpoint* b) {
+        return a + b->getNumMetaItems();
+    });
+    return result;
 }
 
 void CheckpointManager::decrCursorFromCheckpointEnd(const std::string &name) {
@@ -1559,11 +1574,15 @@ putCursorsInCollapsedChk(CursorIdToPositionMap& cursors,
     Checkpoint *chk = *chkItr;
     auto cit = chk->begin();
     auto last = chk->begin();
+
+    // The count of meta_items at the /last/ cursor position.
+    size_t last_meta_item_count = 0;
     // Stage 1 - iterate over the checkpoint items, checking if any of the
     // cursors were positioned at that item.
     for (i = 0; cit != chk->end(); ++i, ++cit) {
         uint64_t id = chk->getMutationIdForKey((*cit)->getKey(),
                                                (*cit)->isCheckPointMetaItem());
+
         auto mit = cursors.begin();
         while (mit != cursors.end()) {
             auto cursor_pos = mit->second;
@@ -1581,6 +1600,8 @@ putCursorsInCollapsedChk(CursorIdToPositionMap& cursors,
                 cc->second.currentCheckpoint = chkItr;
                 cc->second.currentPos = last;
                 cc->second.offset = (i > 0) ? i - 1 : 0;
+                cc->second.setMetaItemOffset(last_meta_item_count);
+
                 chk->registerCursorName(cc->second.name);
                 cursors.erase(mit++);
             } else {
@@ -1589,6 +1610,10 @@ putCursorsInCollapsedChk(CursorIdToPositionMap& cursors,
         }
 
         last = cit;
+        if ((*cit)->isNonEmptyCheckpointMetaItem()) {
+            last_meta_item_count++;
+        }
+
         if (cursors.empty()) {
             break;
         }
@@ -1606,9 +1631,11 @@ putCursorsInCollapsedChk(CursorIdToPositionMap& cursors,
         if (cc->second.fromBeginningOnChkCollapse) {
             cc->second.currentPos = chk->begin();
             cc->second.offset = 0;
+            cc->second.setMetaItemOffset(0);
         } else {
             cc->second.currentPos = last;
             cc->second.offset = (i > 0) ? i - 1 : 0;
+            cc->second.setMetaItemOffset(chk->getNumMetaItems());
         }
         chk->registerCursorName(cc->second.name);
     }
index 7ee84aa..2b53bef 100644 (file)
@@ -137,17 +137,27 @@ public:
           currentCheckpoint(),
           currentPos(),
           offset(0),
+          ckptMetaItemsRead(0),
           fromBeginningOnChkCollapse(false),
           sendCheckpointEndMetaItem(MustSendCheckpointEnd::YES) { }
 
+    /**
+     * @param offset_ Count of items (normal+meta) already read for *all*
+     *                checkpoints in the series.
+     * @param meta_items_read Count of meta_items already read for the
+     *                        given checkpoint.
+     */
     CheckpointCursor(const std::string &n,
                      std::list<Checkpoint*>::iterator checkpoint,
                      std::list<queued_item>::iterator pos,
-                     size_t os,
+                     size_t offset_,
+                     size_t meta_items_read,
                      bool beginningOnChkCollapse,
                      MustSendCheckpointEnd needsCheckpointEndMetaItem) :
         name(n), currentCheckpoint(checkpoint), currentPos(pos),
-        offset(os), fromBeginningOnChkCollapse(beginningOnChkCollapse),
+        offset(offset_),
+        ckptMetaItemsRead(meta_items_read),
+        fromBeginningOnChkCollapse(beginningOnChkCollapse),
         sendCheckpointEndMetaItem(needsCheckpointEndMetaItem) { }
 
     // We need to define the copy construct explicitly due to the fact
@@ -155,6 +165,7 @@ public:
     CheckpointCursor(const CheckpointCursor &other) :
         name(other.name), currentCheckpoint(other.currentCheckpoint),
         currentPos(other.currentPos), offset(other.offset.load()),
+        ckptMetaItemsRead(other.ckptMetaItemsRead),
         fromBeginningOnChkCollapse(other.fromBeginningOnChkCollapse),
         sendCheckpointEndMetaItem(other.sendCheckpointEndMetaItem) { }
 
@@ -163,11 +174,16 @@ public:
         currentCheckpoint = other.currentCheckpoint;
         currentPos = other.currentPos;
         offset.store(other.offset.load());
+        setMetaItemOffset(other.ckptMetaItemsRead);
         fromBeginningOnChkCollapse = other.fromBeginningOnChkCollapse;
         sendCheckpointEndMetaItem = other.sendCheckpointEndMetaItem;
         return *this;
     }
 
+    /**
+     * Decrement the offsets for this cursor.
+     * @param items Count of all items (meta and non-meta) to decrement by.
+     */
     void decrOffset(size_t decr);
 
     void decrPos();
@@ -177,16 +193,36 @@ public:
        the consumer. DCP cursors don't have this constraint */
     MustSendCheckpointEnd shouldSendCheckpointEndMetaItem() const;
 
+    /**
+     * Return the count of meta items processed (i.e. moved past) for the
+     * current checkpoint.
+     * This value is reset to zero when a new checkpoint
+     * is entered.
+     */
+    size_t getCurrentCkptMetaItemsRead() const;
+
+protected:
+    void incrMetaItemOffset(size_t incr) {
+        ckptMetaItemsRead += incr;
+    }
+
+    void setMetaItemOffset(size_t val) {
+        ckptMetaItemsRead = val;
+    }
+
 private:
     std::string                      name;
     std::list<Checkpoint*>::iterator currentCheckpoint;
     std::list<queued_item>::iterator currentPos;
 
     // The offset (in terms of items) this cursor is from the start of the
-    // cursors' current checkpoint. Used to calculate how many items this
-    // cursor has remaining by subtracting
+    // checkpoint list. Includes meta and non-meta items. Used to calculate
+    // how many items this cursor has remaining by subtracting
     // offset from CheckpointManager::numItems.
     AtomicValue<size_t>              offset;
+    // Count of the number of meta items which have been read (processed) for
+    // the *current* checkpoint.
+    size_t ckptMetaItemsRead;
     bool                             fromBeginningOnChkCollapse;
     MustSendCheckpointEnd            sendCheckpointEndMetaItem;
 
@@ -298,7 +334,10 @@ public:
                uint16_t vbid) :
         stats(st), checkpointId(id), snapStartSeqno(snapStart),
         snapEndSeqno(snapEnd), vbucketId(vbid), creationTime(ep_real_time()),
-        checkpointState(CHECKPOINT_OPEN), numItems(0), memOverhead(0),
+        checkpointState(CHECKPOINT_OPEN),
+        numItems(0),
+        numMetaItems(0),
+        memOverhead(0),
         effectiveMemUsage(0) {
         stats.memOverhead.fetch_add(memorySize());
         if (stats.memOverhead.load() >= GIGANTOR) {
@@ -334,12 +373,16 @@ public:
     }
 
     /**
-     * Return the number of items belonging to this checkpoint.
+     * Return the number of non-meta items belonging to this checkpoint.
      */
     size_t getNumItems() const {
         return numItems;
     }
 
+    /**
+     * Return the number of meta items (as defined by Item::isNonEmptyCheckpointMetaItem)
+     * in this checkpoint.
+     */
     size_t getNumMetaItems() const;
 
     /**
@@ -513,7 +556,10 @@ private:
     uint16_t                       vbucketId;
     rel_time_t                     creationTime;
     checkpoint_state               checkpointState;
+    /// Number of non-meta items (see Item::isCheckPointMetaItem).
     size_t                         numItems;
+    /// Number of meta items (see Item::isCheckPointMetaItem).
+    size_t numMetaItems;
     std::set<std::string>          cursors; // List of cursors with their unique names.
     // List is used for queueing mutations as vector incurs shift operations for deduplication.
     std::list<queued_item>         toWrite;
@@ -881,6 +927,9 @@ private:
     queued_item createCheckpointItem(uint64_t id, uint16_t vbid,
                                      queue_op checkpoint_op);
 
+    /**
+     * Return the number of meta Items remaining for this cursor.
+     */
     size_t getNumOfMetaItemsFromCursor(const CheckpointCursor &cursor) const;
 
     EPStats                 &stats;
index db87574..e08646a 100644 (file)
@@ -689,6 +689,11 @@ public:
         return false;
     }
 
+    /// Returns true if this Item is a meta item, excluding queue_op::empty.
+    bool isNonEmptyCheckpointMetaItem() const {
+        return isCheckPointMetaItem() && (op != queue_op::empty);
+    }
+
     void setNRUValue(uint8_t nru_value) {
         nru = nru_value;
     }