MB-20852 [2/N]: Convert queue_operation to scoped enum 60/68160/12
authorDave Rigby <daver@couchbase.com>
Thu, 29 Sep 2016 13:52:16 +0000 (14:52 +0100)
committerDave Rigby <daver@couchbase.com>
Mon, 31 Oct 2016 15:08:14 +0000 (15:08 +0000)
In preparation for adding new queue_op for setVBucketState, convert to
a typesafe C++11 scoped enum. Improve the documentation around
queue_op, and related classes Checkpoint, CheckpointCursor.

Also improve the output streaming (operator<<) for Checkpoint class.

Change-Id: I8f29b8e9e8915a68e31550b78bf3131b3737e2d7
Reviewed-on: http://review.couchbase.org/68160
Reviewed-by: Jim Walker <jim@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/checkpoint.cc
src/checkpoint.h
src/dcp/stream.cc
src/forest-kvstore/forest-kvstore.cc
src/item.cc
src/item.h
src/tapconnection.cc
src/tapconnection.h
tests/module_tests/checkpoint_test.cc

index 9ef8ff6..a9427d8 100644 (file)
 
 const std::string CheckpointManager::pCursorName("persistence");
 
+const char* to_string(enum checkpoint_state s) {
+    switch (s) {
+        case CHECKPOINT_OPEN: return "CHECKPOINT_OPEN";
+        case CHECKPOINT_CLOSED: return "CHECKPOINT_CLOSED";
+    }
+    return "<unknown>";
+}
+
 /**
  * A listener class to update checkpoint related configs at runtime.
  */
@@ -103,7 +111,7 @@ void Checkpoint::setState(checkpoint_state state) {
 
 void Checkpoint::popBackCheckpointEndItem() {
     if (!toWrite.empty() &&
-        toWrite.back()->getOperation() == queue_op_checkpoint_end) {
+        toWrite.back()->getOperation() == queue_op::checkpoint_end) {
         metaKeyIndex.erase(toWrite.back()->getKey());
         toWrite.pop_back();
     }
@@ -200,8 +208,8 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
     }
 
     // Notify flusher if in case queued item is a checkpoint meta item
-    if (qi->getOperation() == queue_op_checkpoint_start ||
-        qi->getOperation() == queue_op_checkpoint_end) {
+    if (qi->getOperation() == queue_op::checkpoint_start ||
+        qi->getOperation() == queue_op::checkpoint_end) {
         checkpointManager->notifyFlusher();
     }
 
@@ -230,8 +238,8 @@ size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
 
     for (; rit != pPrevCheckpoint->rend(); ++rit) {
         const std::string &key = (*rit)->getKey();
-        if ((*rit)->getOperation() != queue_op_del &&
-            (*rit)->getOperation() != queue_op_set) {
+        if ((*rit)->getOperation() != queue_op::del &&
+            (*rit)->getOperation() != queue_op::set) {
             continue;
         }
         checkpoint_index::iterator it = keyIndex.find(key);
@@ -296,9 +304,15 @@ bool Checkpoint::isEligibleToBeUnreferenced() {
 }
 
 std::ostream& operator <<(std::ostream& os, const Checkpoint& c) {
-    os << "Checkpoint[" << &c << "] with "
-       << "seqno:{" << c.getLowSeqno() << "," << c.getHighSeqno() << "} "
-       << "state:" << c.getState();
+    os << "Checkpoint[" << &c << "] with"
+       << " seqno:{" << c.getLowSeqno() << "," << c.getHighSeqno() << "}"
+       << " state:" << to_string(c.getState())
+       << " items:[" << std::endl;
+    for (const auto& e : c.toWrite) {
+        os << "\t{" << e->getBySeqno() << ","
+           << to_string(e->getOperation()) << "}" << std::endl;
+    }
+    os << "]";
     return os;
 }
 
@@ -382,11 +396,12 @@ bool CheckpointManager::addNewCheckpoint_UNLOCKED(uint64_t id,
     // item in this new checkpoint can be safely shifted left by 1 if the
     // first item is removed
     // and pushed into the tail.
-    queued_item qi = createCheckpointItem(0, 0xffff, queue_op_empty);
+    queued_item qi = createCheckpointItem(0, 0xffff, queue_op::empty);
     checkpoint->queueDirty(qi, this);
+    // Note: We explicitly do /not/ include {empty} ops in numItems.
 
     // This item represents the start of the new checkpoint and is also sent to the slave node.
-    qi = createCheckpointItem(id, vbucketId, queue_op_checkpoint_start);
+    qi = createCheckpointItem(id, vbucketId, queue_op::checkpoint_start);
     checkpoint->queueDirty(qi, this);
     ++numItems;
     checkpointList.push_back(checkpoint);
@@ -406,7 +421,7 @@ bool CheckpointManager::addNewCheckpoint_UNLOCKED(uint64_t id,
         if ((cursor.shouldSendCheckpointEndMetaItem() ==
              MustSendCheckpointEnd::NO) &&
             cursor.currentPos != (*(cursor.currentCheckpoint))->end() &&
-            (*(cursor.currentPos))->getOperation() == queue_op_checkpoint_end) {
+            (*(cursor.currentPos))->getOperation() == queue_op::checkpoint_end) {
             /* checkpoint_end meta item is expected by TAP cursors. Hence skip
                it only for persitence and DCP cursors */
             ++(cursor.offset);
@@ -448,7 +463,7 @@ bool CheckpointManager::closeOpenCheckpoint_UNLOCKED() {
     // This item represents the end of the current open checkpoint and is sent
     // to the slave node.
     queued_item qi = createCheckpointItem(cur_ckpt->getId(), vbucketId,
-                                          queue_op_checkpoint_end);
+                                          queue_op::checkpoint_end);
 
     checkpointList.back()->queueDirty(qi, this);
     ++numItems;
@@ -879,8 +894,8 @@ void CheckpointManager::collapseClosedCheckpoints(
             if (cc == connCursors.end()) {
                 continue;
             }
-            enum queue_operation qop = (*(cc->second.currentPos))->getOperation();
-            if (qop ==  queue_op_empty || qop == queue_op_checkpoint_start) {
+            queue_op qop = (*(cc->second.currentPos))->getOperation();
+            if (qop ==  queue_op::empty || qop == queue_op::checkpoint_start) {
                 return;
             }
         }
@@ -904,7 +919,7 @@ void CheckpointManager::collapseClosedCheckpoints(
                             (*(cc->second.currentPos))->isCheckPointMetaItem();
                 bool cursor_on_chk_start = false;
                 if ((*(cc->second.currentPos))->getOperation() ==
-                    queue_op_checkpoint_start) {
+                    queue_op::checkpoint_start) {
                     cursor_on_chk_start = true;
                 }
                 slowCursors[*nameItr] =
@@ -1069,7 +1084,7 @@ snapshot_range_t CheckpointManager::getAllItemsForCursor(
         queued_item& qi = *(it->second.currentPos);
         items.push_back(qi);
 
-        if (qi->getOperation() == queue_op_checkpoint_end) {
+        if (qi->getOperation() == queue_op::checkpoint_end) {
             range.end = (*it->second.currentCheckpoint)->getSnapshotEndSeqno();
             moveCursorToNextCheckpoint(it->second);
         }
@@ -1091,7 +1106,7 @@ queued_item CheckpointManager::nextItem(const std::string &name,
         "The cursor with name \"%s\" is not found in the checkpoint of vbucket"
         "%d.\n", name.c_str(), vbucketId);
         queued_item qi(new Item(std::string(""), 0xffff,
-                                queue_op_empty, 0, 0));
+                                queue_op::empty, 0, 0));
         return qi;
     }
     if (checkpointList.back()->getId() == 0) {
@@ -1100,7 +1115,7 @@ queued_item CheckpointManager::nextItem(const std::string &name,
             " the cursor to fetch an item from it's current checkpoint",
             vbucketId);
         queued_item qi(new Item(std::string(""), 0xffff,
-                                queue_op_empty, 0, 0));
+                                queue_op::empty, 0, 0));
         return qi;
     }
 
@@ -1111,7 +1126,7 @@ queued_item CheckpointManager::nextItem(const std::string &name,
     } else {
         isLastMutationItem = false;
         queued_item qi(new Item(std::string(""), 0xffff,
-                                queue_op_empty, 0, 0));
+                                queue_op::empty, 0, 0));
         return qi;
     }
 }
@@ -1263,7 +1278,7 @@ size_t CheckpointManager::getNumOfMetaItemsFromCursor(CheckpointCursor &cursor)
             if (curr_pos == (*curr_chk)->end()) {
                 continue;
             }
-            if ((*curr_pos)->getOperation() == queue_op_checkpoint_start) {
+            if ((*curr_pos)->getOperation() == queue_op::checkpoint_start) {
                 if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
                     meta_items += 2;
                 } else {
@@ -1290,7 +1305,7 @@ void CheckpointManager::decrCursorFromCheckpointEnd(const std::string &name) {
     cursor_index::iterator it = connCursors.find(name);
     if (it != connCursors.end() &&
         (*(it->second.currentPos))->getOperation() ==
-        queue_op_checkpoint_end) {
+        queue_op::checkpoint_end) {
         it->second.decrPos();
     }
 }
@@ -1300,7 +1315,7 @@ bool CheckpointManager::isLastMutationItemInCheckpoint(
     std::list<queued_item>::iterator it = cursor.currentPos;
     ++it;
     if (it == (*(cursor.currentCheckpoint))->end() ||
-        (*it)->getOperation() == queue_op_checkpoint_end) {
+        (*it)->getOperation() == queue_op::checkpoint_end) {
         return true;
     }
     return false;
@@ -1477,7 +1492,7 @@ void CheckpointManager::collapseCheckpoints(uint64_t id) {
         const std::string& key = (*(itr->second.currentPos))->getKey();
         bool isMetaItem = (*(itr->second.currentPos))->isCheckPointMetaItem();
         bool cursor_on_chk_start = false;
-        if ((*(itr->second.currentPos))->getOperation() == queue_op_checkpoint_start) {
+        if ((*(itr->second.currentPos))->getOperation() == queue_op::checkpoint_start) {
             cursor_on_chk_start = true;
         }
         cursorMap[itr->first.c_str()] =
@@ -1532,7 +1547,7 @@ putCursorsInCollapsedChk(std::map<std::string, std::pair<uint64_t, bool> > &curs
         while (mit != cursors.end()) {
             std::pair<uint64_t, bool> val = mit->second;
             if (val.first < id || (val.first == id && val.second &&
-                (*last)->getOperation() == queue_op_checkpoint_start)) {
+                (*last)->getOperation() == queue_op::checkpoint_start)) {
 
                 cursor_index::iterator cc = connCursors.find(mit->first);
                 if (cc == connCursors.end() ||
@@ -1592,27 +1607,27 @@ bool CheckpointManager::hasNext(const std::string &name) {
 }
 
 queued_item CheckpointManager::createCheckpointItem(uint64_t id, uint16_t vbid,
-                                          enum queue_operation checkpoint_op) {
+                                          queue_op checkpoint_op) {
     uint64_t bySeqno;
     std::string key;
 
     switch (checkpoint_op) {
-    case queue_op_checkpoint_start:
+    case queue_op::checkpoint_start:
         key = "checkpoint_start";
         bySeqno = lastBySeqno + 1;
         break;
-    case queue_op_checkpoint_end:
+    case queue_op::checkpoint_end:
         key = "checkpoint_end";
         bySeqno = lastBySeqno;
         break;
-    case queue_op_empty:
+    case queue_op::empty:
         key = "dummy_key";
         bySeqno = lastBySeqno;
         break;
     default:
         throw std::invalid_argument("CheckpointManager::createCheckpointItem:"
                         "checkpoint_op (which is " +
-                        std::to_string(checkpoint_op) +
+                        std::to_string(static_cast<std::underlying_type<queue_op>::type>(checkpoint_op)) +
                         ") is not a valid item to create");
     }
 
index b164c75..5f9cc28 100644 (file)
@@ -54,6 +54,8 @@ enum checkpoint_state {
     CHECKPOINT_CLOSED  //!< The checkpoint is not open.
 };
 
+const char* to_string(enum checkpoint_state);
+
 /**
  * A checkpoint index entry.
  */
@@ -98,7 +100,30 @@ class CheckpointConfig;
 class VBucket;
 
 /**
- * A checkpoint cursor
+ * A checkpoint cursor, representing the current position in a Checkpoint
+ * series.
+ *
+ * CheckpointCursors are similar to STL-style iterators but for Checkpoints.
+ * A consumer (DCP, TAP, persistence) will have one CheckpointCursor, initially
+ * positioned at the first item they want. As they read items from the
+ * Checkpoint the Cursor is advanced, allowing them to continue from where
+ * they left off when they next attempt to read items.
+ *
+ * A CheckpointCursor has two main pieces of state:
+ *
+ * - currentCheckpoint - The current Checkpoint the cursor is operating on.
+ * - currentPos - the position with the current Checkpoint.
+ *
+ * When a CheckpointCursor reaches the end of Checkpoint, the CheckpointManager
+ * will move it to the next Checkpoint.
+ *
+ * To assist in accounting how many items remain in a Checkpoint series, a
+ * cursor also records its `offset` - the count of items (non-meta and meta) it
+ * has already 'consumed' from the Checkpoint series. Note that this `offset`
+ * count is not cumulative - when the CheckpointManager removes checkpoints
+ * the offset will be decremented. To put it another way - the number of items
+ * a CheckpointCursor has left to consume can be calcuated as
+ * `CheckpointManager::numItems - CheckpointCursor::offset`.
  */
 class CheckpointCursor {
     friend class CheckpointManager;
@@ -197,6 +222,70 @@ enum queue_dirty_t {
 /**
  * Representation of a checkpoint used in the unified queue for persistence and
  * replication.
+ *
+ * Each Checkpoint consists of an ordered series of queued_item items, each
+ * of which either represents a 'real' user operation
+ * (queue_op::set & queue_op::del), or one of a range of meta-items
+ * (queue_op::checkpoint_start, queue_op::checkpoint_end, ...).
+ *
+ * A checkpoint may either be Open or Closed. Open checkpoints can still have
+ * new items appended to them, whereas Closed checkpoints cannot (and are
+ * logically immutable). A checkpoint begins life as an Open checkpoint, will
+ * have items added to it (including de-duplication if a key is added which
+ * already exists), and then once large/old enough it will be marked as Closed,
+ * and a new Open checkpoint created for new items.
+ *
+ * Consumers read items from Checkpoints by creating a CheckpointCursor
+ * (similar to an STL iterator), which they use to mark how far along the
+ * Checkpoint they are.
+ *
+ *
+ *     Checkpoint (closed)
+ *                               numItems: 5 (1x start, 2x set, 1x del, 1x end)
+ *
+ *              +-------+-------+-------+-------+-------+-------+
+ *              | empty | Start |  Set  |  Set  |  Del  |  End  |
+ *              +-------+-------+-------+-------+-------+-------+
+ *         seqno    0       1       1       2       3       3
+ *
+ *                  ^
+ *                  |
+ *                  |
+ *            CheckpointCursor
+ *             (initial pos)
+ *
+ *     Checkpoint (open)
+ *                               numItems: 4 (1x start, 1x set, 2x set)
+ *
+ *              +-------+-------+-------+-------+-------+
+ *              | empty | Start |  Del  |  Set  |  Set
+ *              +-------+-------+-------+-------+-------+
+ *         seqno    3       4       4       5       6
+ *
+ * A Checkpoint starts with an empty item, followed by a checkpoint_start,
+ * and then 0...N set and del items, finally finishing with a checkpoint_end if
+ * the Checkpoint is closed.
+ * The empty item exists because Checkpoints are structured such that
+ * CheckpointCursors are advanced _before_ dereferencing them, not _after_
+ * (this differs from STL iterators which are typically incremented after
+ * dereferencing them) - i.e. the pseudo-code for reading elements is:
+ *
+ *     getElements(CheckpointCursor& cur) {
+ *         std::vector<...> result;
+ *         while (incrCursorAndCheckValid(cur) {
+ *             result.push_back(*cur);
+ *         };
+ *         return result;
+ *     }
+ *
+ * As such we need to have a dummy element at the start of each Checkpoint, so
+ * after the first call to CheckpointManager::incrCursor() the cursor
+ * dereferences to the checkpoint_start element.
+ *
+ * Note that sequence numbers are only unique for normal operations (set & del) -
+ * for meta-items like checkpoint start/end they share the same sequence number
+ * as the associated op - for checkpoint_start that is the ID of the following
+ * op, for checkpoint_end the ID of the proceeding op.
  */
 class Checkpoint {
 public:
@@ -763,7 +852,7 @@ private:
                                   std::list<Checkpoint*>::iterator chkItr);
 
     queued_item createCheckpointItem(uint64_t id, uint16_t vbid,
-                                     enum queue_operation checkpoint_op);
+                                     queue_op checkpoint_op);
 
     size_t getNumOfMetaItemsFromCursor(CheckpointCursor &cursor);
 
index f1d7d44..c1bce99 100644 (file)
@@ -782,7 +782,7 @@ void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
 void ActiveStream::processItems(std::vector<queued_item>& items) {
     if (!items.empty()) {
         bool mark = false;
-        if (items.front()->getOperation() == queue_op_checkpoint_start) {
+        if (items.front()->getOperation() == queue_op::checkpoint_start) {
             mark = true;
         }
 
@@ -798,7 +798,7 @@ void ActiveStream::processItems(std::vector<queued_item>& items) {
                 mutations.push_back(new MutationResponse(qi, opaque_, nullptr,
                             isSendMutationKeyOnlyEnabled() ? KEY_ONLY :
                                                              KEY_VALUE));
-            } else if (qi->getOperation() == queue_op_checkpoint_start) {
+            } else if (qi->getOperation() == queue_op::checkpoint_start) {
                 /* if there are already other mutations, then they belong to the
                    previous checkpoint and hence we must create a snapshot and
                    put them onto readyQ */
index 0606ff1..3e2d9c6 100644 (file)
@@ -1478,7 +1478,7 @@ RollbackResult ForestKVStore::rollback(uint16_t vbid, uint64_t rollbackSeqno,
        RememberingCallback<GetValue> gcb;
        get(*it, vbid, gcb);
        if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
-           Item *itm = new Item(*it, vbid, queue_op_del, 0, 0);
+           Item *itm = new Item(*it, vbid, queue_op::del, 0, 0);
            gcb.val.setValue(itm);
        }
        cb->callback(gcb.val);
index 58873d2..694c8b5 100644 (file)
 AtomicValue<uint64_t> Item::casCounter(1);
 const uint32_t Item::metaDataSize(2*sizeof(uint32_t) + 2*sizeof(uint64_t) + 2);
 
+
+std::string to_string(queue_op op) {
+    switch(op) {
+        case queue_op::set: return "set";
+        case queue_op::del: return "del";
+        case queue_op::flush: return "flush";
+        case queue_op::empty: return "empty";
+        case queue_op::checkpoint_start: return "checkpoint_start";
+        case queue_op::checkpoint_end: return "checkpoint_end";
+    }
+    return "<" +
+            std::to_string(static_cast<std::underlying_type<queue_op>::type>(op)) +
+            ">";
+
+}
+
 /**
  * Append another item to this item
  *
index 2f33211..704a215 100644 (file)
 #include "objectregistry.h"
 #include "stats.h"
 
-enum queue_operation {
-    queue_op_set,
-    queue_op_del,
-    queue_op_flush,
-    queue_op_empty,
-    queue_op_checkpoint_start,
-    queue_op_checkpoint_end
+/// The set of possible operations which can be queued into a checkpoint.
+enum class queue_op : uint8_t {
+    /// Set a document key to a given value. Sets to the same key can (and
+    /// typically are) de-duplicated - only the most recent queue_op::set in a
+    /// checkpoint will be kept. This means that there's no guarantee that
+    /// clients will see all intermediate values of a key.
+    set,
+
+    /// Delete a key. Deletes can be de-duplicated with respect to queue_op::set -
+    /// set(key) followed by del(key) will result in just del(key).
+    del,
+
+    /// (meta item) Testing only op, used to mark the end of a test.
+    /// TODO: Remove this, it shouldn't be necessary / included just to support
+    /// testing.
+    flush,
+
+    /// (meta item) Dummy op added to the start of checkpoints to simplify
+    /// checkpoint logic.
+    /// This is because our Checkpoints are structured such that
+    /// CheckpointCursors are advanced before dereferencing them, not after -
+    /// see Checkpoint documentation for details. As such we need to have an
+    /// empty/dummy element at the start of each Checkpoint, so after the first
+    /// advance the cursor is pointing at the 'real' first element (normally
+    /// checkpoint_start).
+    ///
+    /// Unlike other operations, queue_op::empty is ignored for the purposes of
+    /// CheckpointManager::numItems - due to it only existing as a placeholder.
+    empty,
+
+    /// (meta item) Marker for the start of a checkpoint.
+    /// All checkpoints (open or closed) will start with an item of this type.
+    /// Like all meta items, this doens't directly match user operations, but
+    /// is used to delineate the start of a checkpoint.
+    checkpoint_start,
+
+    /// (meta item) Marker for the end of a checkpoint. Only exists in closed
+    /// checkpoints, where it is always the last item in the checkpoint.
+    checkpoint_end,
 };
 
+/// Return a string representation of queue_op.
+std::string to_string(queue_op op);
+
 // Max Value for NRU bits
 const uint8_t MAX_NRU_VALUE = 3;
 // Initial value for NRU bits
@@ -327,7 +362,7 @@ public:
         bySeqno(i),
         queuedTime(ep_current_time()),
         vbucketId(vbid),
-        op(queue_op_set),
+        op(queue_op::set),
         nru(nru_value)
     {
         if (bySeqno == 0) {
@@ -357,7 +392,7 @@ public:
         bySeqno(i),
         queuedTime(ep_current_time()),
         vbucketId(vbid),
-        op(queue_op_set),
+        op(queue_op::set),
         nru(nru_value)
     {
         if (bySeqno == 0) {
@@ -368,14 +403,14 @@ public:
     }
 
     Item(const std::string &k, const uint16_t vb,
-         enum queue_operation o, const uint64_t revSeq,
+         queue_op o, const uint64_t revSeq,
          const int64_t bySeq, uint8_t nru_value = INITIAL_NRU_VALUE) :
         metaData(),
         key(k),
         bySeqno(bySeq),
         queuedTime(ep_current_time()),
         vbucketId(vb),
-        op(static_cast<uint16_t>(o)),
+        op(o),
         nru(nru_value)
     {
        if (bySeqno < 0) {
@@ -608,42 +643,41 @@ public:
     }
 
     bool isDeleted() {
-        return op == queue_op_del;
+        return op == queue_op::del;
     }
 
     void setDeleted() {
-        op = queue_op_del;
+        op = queue_op::del;
     }
 
     uint32_t getQueuedTime(void) const { return queuedTime; }
 
-    enum queue_operation getOperation(void) const {
-        return static_cast<enum queue_operation>(op);
+    queue_op getOperation(void) const {
+        return op;
     }
 
     /*
      * Should this item be persisted?
      */
     bool shouldPersist() const {
-        return (op == queue_op_set) ||
-               (op == queue_op_del);
+        return (op == queue_op::set) ||
+               (op == queue_op::del);
     }
 
     /*
      * Should this item be replicated (e.g. by DCP)
      */
     bool shouldReplicate() const {
-        return (op == queue_op_set) ||
-               (op == queue_op_del);
+        return (op == queue_op::set) ||
+               (op == queue_op::del);
     }
 
-    void setOperation(enum queue_operation o) {
-        op = static_cast<uint8_t>(o);
+    void setOperation(queue_op o) {
+        op = o;
     }
 
     bool isCheckPointMetaItem(void) const {
-        queue_operation qOp = static_cast<enum queue_operation>(op);
-        if ((queue_op_set == qOp) || (queue_op_del == qOp)) {
+        if ((queue_op::set == op) || (queue_op::del == op)) {
             return false;
         }
         return true;
@@ -691,7 +725,7 @@ private:
     int64_t bySeqno;
     uint32_t queuedTime;
     uint16_t vbucketId;
-    uint8_t op;
+    queue_op op;
     uint8_t nru  : 2;
 
     static AtomicValue<uint64_t> casCounter;
index 1039838..8da702a 100644 (file)
@@ -1239,8 +1239,8 @@ queued_item TapProducer::nextFgFetched_UNLOCKED(bool &shouldPause) {
             queued_item qi = vb->checkpointManager.nextItem(getName(),
                                                             isLastItem);
             switch(qi->getOperation()) {
-            case queue_op_set:
-            case queue_op_del:
+            case queue_op::set:
+            case queue_op::del:
                 if (supportCheckpointSync_ && isLastItem) {
                     it->second.lastItem = true;
                 } else {
@@ -1248,7 +1248,7 @@ queued_item TapProducer::nextFgFetched_UNLOCKED(bool &shouldPause) {
                 }
                 addEvent_UNLOCKED(qi);
                 break;
-            case queue_op_checkpoint_start:
+            case queue_op::checkpoint_start:
                 {
                     it->second.currentCheckpointId = qi->getRevSeqno();
                     if (supportCheckpointSync_) {
@@ -1257,7 +1257,7 @@ queued_item TapProducer::nextFgFetched_UNLOCKED(bool &shouldPause) {
                     }
                 }
                 break;
-            case queue_op_checkpoint_end:
+            case queue_op::checkpoint_end:
                 if (supportCheckpointSync_) {
                     it->second.state = checkpoint_end;
                     uint32_t seqno_acked;
@@ -1277,7 +1277,7 @@ queued_item TapProducer::nextFgFetched_UNLOCKED(bool &shouldPause) {
                     }
                 }
                 break;
-            case queue_op_empty:
+            case queue_op::empty:
                 {
                     ++open_checkpoint_count;
                 }
@@ -1780,16 +1780,16 @@ Item* TapProducer::getNextItem(const void *c, uint16_t *vbucket, uint16_t &ret,
     queued_item checkpoint_msg = nextCheckpointMessage_UNLOCKED();
     if (checkpoint_msg.get() != NULL) {
         switch (checkpoint_msg->getOperation()) {
-        case queue_op_checkpoint_start:
+        case queue_op::checkpoint_start:
             ret = TAP_CHECKPOINT_START;
             break;
-        case queue_op_checkpoint_end:
+        case queue_op::checkpoint_end:
             ret = TAP_CHECKPOINT_END;
             break;
         default:
             logger.log(EXTENSION_LOG_WARNING,
-                "Checkpoint start or end msg with incorrect opcode %d",
-                checkpoint_msg->getOperation());
+                "Checkpoint start or end msg with incorrect opcode %s",
+                to_string(checkpoint_msg->getOperation()).c_str());
             ret = TAP_DISCONNECT;
             return NULL;
         }
@@ -1840,7 +1840,7 @@ Item* TapProducer::getNextItem(const void *c, uint16_t *vbucket, uint16_t &ret,
 
         ++stats.numTapBGFetched;
         qi = queued_item(new Item(itm->getKey(), itm->getVBucketId(),
-                                  ret == TAP_MUTATION ? queue_op_set : queue_op_del,
+                                  ret == TAP_MUTATION ? queue_op::set : queue_op::del,
                                   itm->getRevSeqno(), itm->getBySeqno()));
     } else if (hasItemFromVBHashtable_UNLOCKED()) { // Item from memory backfill or checkpoints
         if (waitForCheckpointMsgAck()) {
@@ -1862,7 +1862,7 @@ Item* TapProducer::getNextItem(const void *c, uint16_t *vbucket, uint16_t &ret,
             return NULL;
         }
 
-        if (qi->getOperation() == queue_op_set) {
+        if (qi->getOperation() == queue_op::set) {
             get_options_t options = DELETE_TEMP;
             GetValue gv(engine_.getEpStore()->get(qi->getKey(), qi->getVBucketId(),
                                                   c, options));
@@ -1871,7 +1871,7 @@ Item* TapProducer::getNextItem(const void *c, uint16_t *vbucket, uint16_t &ret,
                 itm = gv.getValue();
                 if (itm == nullptr) {
                     throw std::logic_error("TapProducer::getNextItem: found a"
-                            " NULL value for GetValue from queue_op_set");
+                            " NULL value for GetValue from queue_op::set");
                 }
                 nru = gv.getNRUValue();
                 ret = TAP_MUTATION;
@@ -1909,7 +1909,7 @@ Item* TapProducer::getNextItem(const void *c, uint16_t *vbucket, uint16_t &ret,
                 return NULL;
             }
             ++stats.numTapFGFetched;
-        } else if (qi->getOperation() == queue_op_del) {
+        } else if (qi->getOperation() == queue_op::del) {
             itm = new Item(qi->getKey().c_str(), qi->getNKey(),
                            /*flags*/0, /*exp*/0,
                            /*data*/NULL, /*size*/0,
index 1edd3bd..0d3e543 100644 (file)
@@ -141,19 +141,19 @@ public:
         item_ = qi;
 
         switch(item_->getOperation()) {
-        case queue_op_set:
+        case queue_op::set:
             event_ = TAP_MUTATION;
             break;
-        case queue_op_del:
+        case queue_op::del:
             event_ = TAP_DELETION;
             break;
-        case queue_op_flush:
+        case queue_op::flush:
             event_ = TAP_FLUSH;
             break;
-        case queue_op_checkpoint_start:
+        case queue_op::checkpoint_start:
             event_ = TAP_CHECKPOINT_START;
             break;
-        case queue_op_checkpoint_end:
+        case queue_op::checkpoint_end:
             event_ = TAP_CHECKPOINT_END;
             break;
         default:
index bfa6595..3a554eb 100644 (file)
@@ -107,7 +107,7 @@ static void launch_persistence_thread(void *arg) {
         args->checkpoint_manager->getAllItemsForCursor(cursor, items);
         for(itemPos = 0; itemPos < items.size(); ++itemPos) {
             queued_item qi = items.at(itemPos);
-            if (qi->getOperation() == queue_op_flush) {
+            if (qi->getOperation() == queue_op::flush) {
                 flush = true;
                 break;
             }
@@ -118,9 +118,9 @@ static void launch_persistence_thread(void *arg) {
             // these. Anything else will be considered an error.
             for(size_t i = itemPos + 1; i < items.size(); ++i) {
                 queued_item qi = items.at(i);
-                EXPECT_TRUE(queue_op_checkpoint_start == qi->getOperation() ||
-                            queue_op_checkpoint_end == qi->getOperation())
-                    << "Unexpected operation:" << qi->getOperation();
+                EXPECT_TRUE(queue_op::checkpoint_start == qi->getOperation() ||
+                            queue_op::checkpoint_end == qi->getOperation())
+                    << "Unexpected operation:" << to_string(qi->getOperation());
             }
             break;
         }
@@ -143,7 +143,7 @@ static void launch_tap_client_thread(void *arg) {
     while(true) {
         queued_item qi = args->checkpoint_manager->nextItem(args->name,
                                                             isLastItem);
-        if (qi->getOperation() == queue_op_flush) {
+        if (qi->getOperation() == queue_op::flush) {
             flush = true;
             break;
         }
@@ -183,7 +183,7 @@ static void launch_set_thread(void *arg) {
         std::stringstream key;
         key << "key-" << i;
         queued_item qi(new Item(key.str(), args->vbucket->getId(),
-                                queue_op_set, 0, 0));
+                                queue_op::set, 0, 0));
         args->checkpoint_manager->queueDirty(args->vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes);
     }
 }
@@ -264,7 +264,7 @@ TEST_F(CheckpointTest, basic_chk_test) {
 
     // Push the flush command into the queue so that all other threads can be terminated.
     std::string key("flush");
-    queued_item qi(new Item(key, vbucket->getId(), queue_op_flush, 0xffff, 0));
+    queued_item qi(new Item(key, vbucket->getId(), queue_op::flush, 0xffff, 0));
     checkpoint_manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes);
 
     rc = cb_join_thread(persistence_thread);
@@ -299,7 +299,7 @@ TEST_F(CheckpointTest, reset_checkpoint_id) {
     for (i = 0; i < 10; ++i) {
         std::stringstream key;
         key << "key-" << i;
-        queued_item qi(new Item(key.str(), vbucket->getId(), queue_op_set,
+        queued_item qi(new Item(key.str(), vbucket->getId(), queue_op::set,
                                 0, 0));
         manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes);
     }
@@ -313,19 +313,19 @@ TEST_F(CheckpointTest, reset_checkpoint_id) {
     manager->getAllItemsForCursor(cursor, items);
     for(itemPos = 0; itemPos < items.size(); ++itemPos) {
         queued_item qi = items.at(itemPos);
-        if (qi->getOperation() != queue_op_checkpoint_start &&
-            qi->getOperation() != queue_op_checkpoint_end) {
+        if (qi->getOperation() != queue_op::checkpoint_start &&
+            qi->getOperation() != queue_op::checkpoint_end) {
             size_t mid = qi->getBySeqno();
             EXPECT_GT(mid, lastMutationId);
             lastMutationId = qi->getBySeqno();
         }
         if (itemPos == 0 || itemPos == (items.size() - 1)) {
-            EXPECT_EQ(queue_op_checkpoint_start, qi->getOperation()) << "For itemPos:" << itemPos;
+            EXPECT_EQ(queue_op::checkpoint_start, qi->getOperation()) << "For itemPos:" << itemPos;
         } else if (itemPos == (items.size() - 2)) {
-            EXPECT_EQ(queue_op_checkpoint_end, qi->getOperation()) << "For itemPos:" << itemPos;
+            EXPECT_EQ(queue_op::checkpoint_end, qi->getOperation()) << "For itemPos:" << itemPos;
             chk++;
         } else {
-            EXPECT_EQ(queue_op_set, qi->getOperation()) << "For itemPos:" << itemPos;
+            EXPECT_EQ(queue_op::set, qi->getOperation()) << "For itemPos:" << itemPos;
         }
     }
     EXPECT_EQ(13, items.size());
@@ -355,7 +355,7 @@ TEST_F(CheckpointTest, CheckFixture) {
 TEST_F(CheckpointTest, OneOpenCkpt) {
 
     // Queue a set operation.
-    queued_item qi(new Item("key1", vbucket->getId(), queue_op_set,
+    queued_item qi(new Item("key1", vbucket->getId(), queue_op::set,
                             /*revSeq*/20, /*bySeq*/0));
 
     // No set_ops in queue, expect queueDirty to return true (increase
@@ -368,7 +368,7 @@ TEST_F(CheckpointTest, OneOpenCkpt) {
     EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
 
     // Adding the same key again shouldn't increase the size.
-    queued_item qi2(new Item("key1", vbucket->getId(), queue_op_set,
+    queued_item qi2(new Item("key1", vbucket->getId(), queue_op::set,
                             /*revSeq*/21, /*bySeq*/0));
     EXPECT_FALSE(manager->queueDirty(vbucket, qi2, GenerateBySeqno::Yes, GenerateCas::Yes));
     EXPECT_EQ(1, manager->getNumCheckpoints());
@@ -378,7 +378,7 @@ TEST_F(CheckpointTest, OneOpenCkpt) {
     EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
 
     // Adding a different key should increase size.
-    queued_item qi3(new Item("key2", vbucket->getId(), queue_op_set,
+    queued_item qi3(new Item("key2", vbucket->getId(), queue_op::set,
                             /*revSeq*/0, /*bySeq*/0));
     EXPECT_TRUE(manager->queueDirty(vbucket, qi3, GenerateBySeqno::Yes, GenerateCas::Yes));
     EXPECT_EQ(1, manager->getNumCheckpoints());
@@ -394,7 +394,7 @@ TEST_F(CheckpointTest, OneOpenOneClosed) {
     // Add some items to the initial (open) checkpoint.
     for (auto i : {1,2}) {
         queued_item qi(new Item("key" + std::to_string(i), vbucket->getId(),
-                                queue_op_set, /*revSeq*/0, /*bySeq*/0));
+                                queue_op::set, /*revSeq*/0, /*bySeq*/0));
         EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     }
     EXPECT_EQ(1, manager->getNumCheckpoints());
@@ -412,7 +412,7 @@ TEST_F(CheckpointTest, OneOpenOneClosed) {
     // ckpt).
     for (auto ii : {1,2}) {
         queued_item qi(new Item("key" + std::to_string(ii), vbucket->getId(),
-                                queue_op_set, /*revSeq*/1, /*bySeq*/0));
+                                queue_op::set, /*revSeq*/1, /*bySeq*/0));
         EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     }
     EXPECT_EQ(2, manager->getNumCheckpoints());
@@ -448,14 +448,14 @@ TEST_F(CheckpointTest, ItemBasedCheckpointCreation) {
         EXPECT_EQ(ii + 1, manager->getNumOpenChkItems()); /* +1 for op_ckpt_start */
 
         qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
-                          queue_op_set, /*revSeq*/0, /*bySeq*/0));
+                          queue_op::set, /*revSeq*/0, /*bySeq*/0));
         EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
         EXPECT_EQ(1, manager->getNumCheckpoints());
 
     }
 
     // Add one more - should create a new checkpoint.
-    qi.reset(new Item("key_epoch", vbucket->getId(), queue_op_set, /*revSeq*/0,
+    qi.reset(new Item("key_epoch", vbucket->getId(), queue_op::set, /*revSeq*/0,
                       /*bySeq*/0));
     EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     EXPECT_EQ(2, manager->getNumCheckpoints());
@@ -466,14 +466,14 @@ TEST_F(CheckpointTest, ItemBasedCheckpointCreation) {
         EXPECT_EQ(ii + 2, manager->getNumOpenChkItems()); /* +2 op_ckpt_start, key_epoch */
 
         qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
-                                queue_op_set, /*revSeq*/1, /*bySeq*/0));
+                                queue_op::set, /*revSeq*/1, /*bySeq*/0));
         EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
         EXPECT_EQ(2, manager->getNumCheckpoints());
     }
 
     // Add one more - as we have hit maximum checkpoints should *not* create a
     // new one.
-    qi.reset(new Item("key_epoch2", vbucket->getId(), queue_op_set,
+    qi.reset(new Item("key_epoch2", vbucket->getId(), queue_op::set,
                       /*revSeq*/1, /*bySeq*/0));
     EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     EXPECT_EQ(2, manager->getNumCheckpoints());
@@ -493,7 +493,7 @@ TEST_F(CheckpointTest, ItemBasedCheckpointCreation) {
     EXPECT_EQ(12, manager->getNumOpenChkItems());
 
     // But adding a new item will create a new one.
-    qi.reset(new Item("key_epoch3", vbucket->getId(), queue_op_set,
+    qi.reset(new Item("key_epoch3", vbucket->getId(), queue_op::set,
                       /*revSeq*/1, /*bySeq*/0));
     EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     EXPECT_EQ(3, manager->getNumCheckpoints());
@@ -507,7 +507,7 @@ TEST_F(CheckpointTest, CursorOffsetOnCheckpointClose) {
     // Add two items to the initial (open) checkpoint.
     for (auto i : {1,2}) {
         queued_item qi(new Item("key" + std::to_string(i), vbucket->getId(),
-                                queue_op_set, /*revSeq*/0, /*bySeq*/0));
+                                queue_op::set, /*revSeq*/0, /*bySeq*/0));
         EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     }
     EXPECT_EQ(1, manager->getNumCheckpoints());
@@ -521,7 +521,7 @@ TEST_F(CheckpointTest, CursorOffsetOnCheckpointClose) {
     // Check de-dupe counting - after adding another item with the same key,
     // should still see two items.
     queued_item qi(new Item("key1", vbucket->getId(),
-                            queue_op_set, /*revSeq*/0, /*bySeq*/0));
+                            queue_op::set, /*revSeq*/0, /*bySeq*/0));
     EXPECT_FALSE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes))
         << "Adding a duplicate key to open checkpoint should not increase queue size";
 
@@ -551,7 +551,7 @@ TEST_F(CheckpointTest, CursorOffsetOnCheckpointClose) {
     // but cannot de-dupe across checkpoints.
     for (auto ii : {1,2}) {
         queued_item qi(new Item("key" + std::to_string(ii), vbucket->getId(),
-                                queue_op_set, /*revSeq*/1, /*bySeq*/0));
+                                queue_op::set, /*revSeq*/1, /*bySeq*/0));
         EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     }
 
@@ -618,7 +618,7 @@ TEST_F(CheckpointTest, ItemsForCheckpointCursor) {
     queued_item qi;
     for (unsigned int ii = 0; ii < 2 * MIN_CHECKPOINT_ITEMS; ii++) {
         qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
-                          queue_op_set, /*revSeq*/0, /*bySeq*/0));
+                          queue_op::set, /*revSeq*/0, /*bySeq*/0));
         EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     }
 
@@ -669,7 +669,7 @@ TEST_F(CheckpointTest, CursorMovement) {
     queued_item qi;
     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
         qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
-                          queue_op_set, /*revSeq*/0, /*bySeq*/0));
+                          queue_op::set, /*revSeq*/0, /*bySeq*/0));
         EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     }
 
@@ -736,7 +736,7 @@ TEST_F(CheckpointTest, CursorMovement) {
        checkpoint. TAP unlike DCP cannot skip the op_ckpt_end message */
     bool isLastItem = false;
     qi = manager->nextItem(tap_cursor, isLastItem);
-    EXPECT_EQ(queue_op_checkpoint_end, qi->getOperation());
+    EXPECT_EQ(queue_op::checkpoint_end, qi->getOperation());
     EXPECT_EQ(true, isLastItem);
 
 }
@@ -776,7 +776,7 @@ TEST_F(CheckpointTest, SeqnoAndHLCOrdering) {
             std::string key = "key" + std::to_string(ii);
             for (int item  = 0; item < n_items; item++) {
                 queued_item qi(new Item(key + std::to_string(item),
-                                        vbucket->getId(), queue_op_set,
+                                        vbucket->getId(), queue_op::set,
                                         /*revSeq*/0, /*bySeq*/0));
                 EXPECT_TRUE(manager->queueDirty(vbucket,
                                                 qi,