const std::string CheckpointManager::pCursorName("persistence");
+const char* to_string(enum checkpoint_state s) {
+ switch (s) {
+ case CHECKPOINT_OPEN: return "CHECKPOINT_OPEN";
+ case CHECKPOINT_CLOSED: return "CHECKPOINT_CLOSED";
+ }
+ return "<unknown>";
+}
+
/**
* A listener class to update checkpoint related configs at runtime.
*/
void Checkpoint::popBackCheckpointEndItem() {
if (!toWrite.empty() &&
- toWrite.back()->getOperation() == queue_op_checkpoint_end) {
+ toWrite.back()->getOperation() == queue_op::checkpoint_end) {
metaKeyIndex.erase(toWrite.back()->getKey());
toWrite.pop_back();
}
}
// Notify flusher if in case queued item is a checkpoint meta item
- if (qi->getOperation() == queue_op_checkpoint_start ||
- qi->getOperation() == queue_op_checkpoint_end) {
+ if (qi->getOperation() == queue_op::checkpoint_start ||
+ qi->getOperation() == queue_op::checkpoint_end) {
checkpointManager->notifyFlusher();
}
for (; rit != pPrevCheckpoint->rend(); ++rit) {
const std::string &key = (*rit)->getKey();
- if ((*rit)->getOperation() != queue_op_del &&
- (*rit)->getOperation() != queue_op_set) {
+ if ((*rit)->getOperation() != queue_op::del &&
+ (*rit)->getOperation() != queue_op::set) {
continue;
}
checkpoint_index::iterator it = keyIndex.find(key);
}
std::ostream& operator <<(std::ostream& os, const Checkpoint& c) {
- os << "Checkpoint[" << &c << "] with "
- << "seqno:{" << c.getLowSeqno() << "," << c.getHighSeqno() << "} "
- << "state:" << c.getState();
+ os << "Checkpoint[" << &c << "] with"
+ << " seqno:{" << c.getLowSeqno() << "," << c.getHighSeqno() << "}"
+ << " state:" << to_string(c.getState())
+ << " items:[" << std::endl;
+ for (const auto& e : c.toWrite) {
+ os << "\t{" << e->getBySeqno() << ","
+ << to_string(e->getOperation()) << "}" << std::endl;
+ }
+ os << "]";
return os;
}
// item in this new checkpoint can be safely shifted left by 1 if the
// first item is removed
// and pushed into the tail.
- queued_item qi = createCheckpointItem(0, 0xffff, queue_op_empty);
+ queued_item qi = createCheckpointItem(0, 0xffff, queue_op::empty);
checkpoint->queueDirty(qi, this);
+ // Note: We explicitly do /not/ include {empty} ops in numItems.
// This item represents the start of the new checkpoint and is also sent to the slave node.
- qi = createCheckpointItem(id, vbucketId, queue_op_checkpoint_start);
+ qi = createCheckpointItem(id, vbucketId, queue_op::checkpoint_start);
checkpoint->queueDirty(qi, this);
++numItems;
checkpointList.push_back(checkpoint);
if ((cursor.shouldSendCheckpointEndMetaItem() ==
MustSendCheckpointEnd::NO) &&
cursor.currentPos != (*(cursor.currentCheckpoint))->end() &&
- (*(cursor.currentPos))->getOperation() == queue_op_checkpoint_end) {
+ (*(cursor.currentPos))->getOperation() == queue_op::checkpoint_end) {
/* checkpoint_end meta item is expected by TAP cursors. Hence skip
it only for persitence and DCP cursors */
++(cursor.offset);
// This item represents the end of the current open checkpoint and is sent
// to the slave node.
queued_item qi = createCheckpointItem(cur_ckpt->getId(), vbucketId,
- queue_op_checkpoint_end);
+ queue_op::checkpoint_end);
checkpointList.back()->queueDirty(qi, this);
++numItems;
if (cc == connCursors.end()) {
continue;
}
- enum queue_operation qop = (*(cc->second.currentPos))->getOperation();
- if (qop == queue_op_empty || qop == queue_op_checkpoint_start) {
+ queue_op qop = (*(cc->second.currentPos))->getOperation();
+ if (qop == queue_op::empty || qop == queue_op::checkpoint_start) {
return;
}
}
(*(cc->second.currentPos))->isCheckPointMetaItem();
bool cursor_on_chk_start = false;
if ((*(cc->second.currentPos))->getOperation() ==
- queue_op_checkpoint_start) {
+ queue_op::checkpoint_start) {
cursor_on_chk_start = true;
}
slowCursors[*nameItr] =
queued_item& qi = *(it->second.currentPos);
items.push_back(qi);
- if (qi->getOperation() == queue_op_checkpoint_end) {
+ if (qi->getOperation() == queue_op::checkpoint_end) {
range.end = (*it->second.currentCheckpoint)->getSnapshotEndSeqno();
moveCursorToNextCheckpoint(it->second);
}
"The cursor with name \"%s\" is not found in the checkpoint of vbucket"
"%d.\n", name.c_str(), vbucketId);
queued_item qi(new Item(std::string(""), 0xffff,
- queue_op_empty, 0, 0));
+ queue_op::empty, 0, 0));
return qi;
}
if (checkpointList.back()->getId() == 0) {
" the cursor to fetch an item from it's current checkpoint",
vbucketId);
queued_item qi(new Item(std::string(""), 0xffff,
- queue_op_empty, 0, 0));
+ queue_op::empty, 0, 0));
return qi;
}
} else {
isLastMutationItem = false;
queued_item qi(new Item(std::string(""), 0xffff,
- queue_op_empty, 0, 0));
+ queue_op::empty, 0, 0));
return qi;
}
}
if (curr_pos == (*curr_chk)->end()) {
continue;
}
- if ((*curr_pos)->getOperation() == queue_op_checkpoint_start) {
+ if ((*curr_pos)->getOperation() == queue_op::checkpoint_start) {
if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
meta_items += 2;
} else {
cursor_index::iterator it = connCursors.find(name);
if (it != connCursors.end() &&
(*(it->second.currentPos))->getOperation() ==
- queue_op_checkpoint_end) {
+ queue_op::checkpoint_end) {
it->second.decrPos();
}
}
std::list<queued_item>::iterator it = cursor.currentPos;
++it;
if (it == (*(cursor.currentCheckpoint))->end() ||
- (*it)->getOperation() == queue_op_checkpoint_end) {
+ (*it)->getOperation() == queue_op::checkpoint_end) {
return true;
}
return false;
const std::string& key = (*(itr->second.currentPos))->getKey();
bool isMetaItem = (*(itr->second.currentPos))->isCheckPointMetaItem();
bool cursor_on_chk_start = false;
- if ((*(itr->second.currentPos))->getOperation() == queue_op_checkpoint_start) {
+ if ((*(itr->second.currentPos))->getOperation() == queue_op::checkpoint_start) {
cursor_on_chk_start = true;
}
cursorMap[itr->first.c_str()] =
while (mit != cursors.end()) {
std::pair<uint64_t, bool> val = mit->second;
if (val.first < id || (val.first == id && val.second &&
- (*last)->getOperation() == queue_op_checkpoint_start)) {
+ (*last)->getOperation() == queue_op::checkpoint_start)) {
cursor_index::iterator cc = connCursors.find(mit->first);
if (cc == connCursors.end() ||
}
queued_item CheckpointManager::createCheckpointItem(uint64_t id, uint16_t vbid,
- enum queue_operation checkpoint_op) {
+ queue_op checkpoint_op) {
uint64_t bySeqno;
std::string key;
switch (checkpoint_op) {
- case queue_op_checkpoint_start:
+ case queue_op::checkpoint_start:
key = "checkpoint_start";
bySeqno = lastBySeqno + 1;
break;
- case queue_op_checkpoint_end:
+ case queue_op::checkpoint_end:
key = "checkpoint_end";
bySeqno = lastBySeqno;
break;
- case queue_op_empty:
+ case queue_op::empty:
key = "dummy_key";
bySeqno = lastBySeqno;
break;
default:
throw std::invalid_argument("CheckpointManager::createCheckpointItem:"
"checkpoint_op (which is " +
- std::to_string(checkpoint_op) +
+ std::to_string(static_cast<std::underlying_type<queue_op>::type>(checkpoint_op)) +
") is not a valid item to create");
}
CHECKPOINT_CLOSED //!< The checkpoint is not open.
};
+const char* to_string(enum checkpoint_state);
+
/**
* A checkpoint index entry.
*/
class VBucket;
/**
- * A checkpoint cursor
+ * A checkpoint cursor, representing the current position in a Checkpoint
+ * series.
+ *
+ * CheckpointCursors are similar to STL-style iterators but for Checkpoints.
+ * A consumer (DCP, TAP, persistence) will have one CheckpointCursor, initially
+ * positioned at the first item they want. As they read items from the
+ * Checkpoint the Cursor is advanced, allowing them to continue from where
+ * they left off when they next attempt to read items.
+ *
+ * A CheckpointCursor has two main pieces of state:
+ *
+ * - currentCheckpoint - The current Checkpoint the cursor is operating on.
+ * - currentPos - the position with the current Checkpoint.
+ *
+ * When a CheckpointCursor reaches the end of Checkpoint, the CheckpointManager
+ * will move it to the next Checkpoint.
+ *
+ * To assist in accounting how many items remain in a Checkpoint series, a
+ * cursor also records its `offset` - the count of items (non-meta and meta) it
+ * has already 'consumed' from the Checkpoint series. Note that this `offset`
+ * count is not cumulative - when the CheckpointManager removes checkpoints
+ * the offset will be decremented. To put it another way - the number of items
+ * a CheckpointCursor has left to consume can be calcuated as
+ * `CheckpointManager::numItems - CheckpointCursor::offset`.
*/
class CheckpointCursor {
friend class CheckpointManager;
/**
* Representation of a checkpoint used in the unified queue for persistence and
* replication.
+ *
+ * Each Checkpoint consists of an ordered series of queued_item items, each
+ * of which either represents a 'real' user operation
+ * (queue_op::set & queue_op::del), or one of a range of meta-items
+ * (queue_op::checkpoint_start, queue_op::checkpoint_end, ...).
+ *
+ * A checkpoint may either be Open or Closed. Open checkpoints can still have
+ * new items appended to them, whereas Closed checkpoints cannot (and are
+ * logically immutable). A checkpoint begins life as an Open checkpoint, will
+ * have items added to it (including de-duplication if a key is added which
+ * already exists), and then once large/old enough it will be marked as Closed,
+ * and a new Open checkpoint created for new items.
+ *
+ * Consumers read items from Checkpoints by creating a CheckpointCursor
+ * (similar to an STL iterator), which they use to mark how far along the
+ * Checkpoint they are.
+ *
+ *
+ * Checkpoint (closed)
+ * numItems: 5 (1x start, 2x set, 1x del, 1x end)
+ *
+ * +-------+-------+-------+-------+-------+-------+
+ * | empty | Start | Set | Set | Del | End |
+ * +-------+-------+-------+-------+-------+-------+
+ * seqno 0 1 1 2 3 3
+ *
+ * ^
+ * |
+ * |
+ * CheckpointCursor
+ * (initial pos)
+ *
+ * Checkpoint (open)
+ * numItems: 4 (1x start, 1x set, 2x set)
+ *
+ * +-------+-------+-------+-------+-------+
+ * | empty | Start | Del | Set | Set
+ * +-------+-------+-------+-------+-------+
+ * seqno 3 4 4 5 6
+ *
+ * A Checkpoint starts with an empty item, followed by a checkpoint_start,
+ * and then 0...N set and del items, finally finishing with a checkpoint_end if
+ * the Checkpoint is closed.
+ * The empty item exists because Checkpoints are structured such that
+ * CheckpointCursors are advanced _before_ dereferencing them, not _after_
+ * (this differs from STL iterators which are typically incremented after
+ * dereferencing them) - i.e. the pseudo-code for reading elements is:
+ *
+ * getElements(CheckpointCursor& cur) {
+ * std::vector<...> result;
+ * while (incrCursorAndCheckValid(cur) {
+ * result.push_back(*cur);
+ * };
+ * return result;
+ * }
+ *
+ * As such we need to have a dummy element at the start of each Checkpoint, so
+ * after the first call to CheckpointManager::incrCursor() the cursor
+ * dereferences to the checkpoint_start element.
+ *
+ * Note that sequence numbers are only unique for normal operations (set & del) -
+ * for meta-items like checkpoint start/end they share the same sequence number
+ * as the associated op - for checkpoint_start that is the ID of the following
+ * op, for checkpoint_end the ID of the proceeding op.
*/
class Checkpoint {
public:
std::list<Checkpoint*>::iterator chkItr);
queued_item createCheckpointItem(uint64_t id, uint16_t vbid,
- enum queue_operation checkpoint_op);
+ queue_op checkpoint_op);
size_t getNumOfMetaItemsFromCursor(CheckpointCursor &cursor);
void ActiveStream::processItems(std::vector<queued_item>& items) {
if (!items.empty()) {
bool mark = false;
- if (items.front()->getOperation() == queue_op_checkpoint_start) {
+ if (items.front()->getOperation() == queue_op::checkpoint_start) {
mark = true;
}
mutations.push_back(new MutationResponse(qi, opaque_, nullptr,
isSendMutationKeyOnlyEnabled() ? KEY_ONLY :
KEY_VALUE));
- } else if (qi->getOperation() == queue_op_checkpoint_start) {
+ } else if (qi->getOperation() == queue_op::checkpoint_start) {
/* if there are already other mutations, then they belong to the
previous checkpoint and hence we must create a snapshot and
put them onto readyQ */
RememberingCallback<GetValue> gcb;
get(*it, vbid, gcb);
if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
- Item *itm = new Item(*it, vbid, queue_op_del, 0, 0);
+ Item *itm = new Item(*it, vbid, queue_op::del, 0, 0);
gcb.val.setValue(itm);
}
cb->callback(gcb.val);
AtomicValue<uint64_t> Item::casCounter(1);
const uint32_t Item::metaDataSize(2*sizeof(uint32_t) + 2*sizeof(uint64_t) + 2);
+
+std::string to_string(queue_op op) {
+ switch(op) {
+ case queue_op::set: return "set";
+ case queue_op::del: return "del";
+ case queue_op::flush: return "flush";
+ case queue_op::empty: return "empty";
+ case queue_op::checkpoint_start: return "checkpoint_start";
+ case queue_op::checkpoint_end: return "checkpoint_end";
+ }
+ return "<" +
+ std::to_string(static_cast<std::underlying_type<queue_op>::type>(op)) +
+ ">";
+
+}
+
/**
* Append another item to this item
*
#include "objectregistry.h"
#include "stats.h"
-enum queue_operation {
- queue_op_set,
- queue_op_del,
- queue_op_flush,
- queue_op_empty,
- queue_op_checkpoint_start,
- queue_op_checkpoint_end
+/// The set of possible operations which can be queued into a checkpoint.
+enum class queue_op : uint8_t {
+ /// Set a document key to a given value. Sets to the same key can (and
+ /// typically are) de-duplicated - only the most recent queue_op::set in a
+ /// checkpoint will be kept. This means that there's no guarantee that
+ /// clients will see all intermediate values of a key.
+ set,
+
+ /// Delete a key. Deletes can be de-duplicated with respect to queue_op::set -
+ /// set(key) followed by del(key) will result in just del(key).
+ del,
+
+ /// (meta item) Testing only op, used to mark the end of a test.
+ /// TODO: Remove this, it shouldn't be necessary / included just to support
+ /// testing.
+ flush,
+
+ /// (meta item) Dummy op added to the start of checkpoints to simplify
+ /// checkpoint logic.
+ /// This is because our Checkpoints are structured such that
+ /// CheckpointCursors are advanced before dereferencing them, not after -
+ /// see Checkpoint documentation for details. As such we need to have an
+ /// empty/dummy element at the start of each Checkpoint, so after the first
+ /// advance the cursor is pointing at the 'real' first element (normally
+ /// checkpoint_start).
+ ///
+ /// Unlike other operations, queue_op::empty is ignored for the purposes of
+ /// CheckpointManager::numItems - due to it only existing as a placeholder.
+ empty,
+
+ /// (meta item) Marker for the start of a checkpoint.
+ /// All checkpoints (open or closed) will start with an item of this type.
+ /// Like all meta items, this doens't directly match user operations, but
+ /// is used to delineate the start of a checkpoint.
+ checkpoint_start,
+
+ /// (meta item) Marker for the end of a checkpoint. Only exists in closed
+ /// checkpoints, where it is always the last item in the checkpoint.
+ checkpoint_end,
};
+/// Return a string representation of queue_op.
+std::string to_string(queue_op op);
+
// Max Value for NRU bits
const uint8_t MAX_NRU_VALUE = 3;
// Initial value for NRU bits
bySeqno(i),
queuedTime(ep_current_time()),
vbucketId(vbid),
- op(queue_op_set),
+ op(queue_op::set),
nru(nru_value)
{
if (bySeqno == 0) {
bySeqno(i),
queuedTime(ep_current_time()),
vbucketId(vbid),
- op(queue_op_set),
+ op(queue_op::set),
nru(nru_value)
{
if (bySeqno == 0) {
}
Item(const std::string &k, const uint16_t vb,
- enum queue_operation o, const uint64_t revSeq,
+ queue_op o, const uint64_t revSeq,
const int64_t bySeq, uint8_t nru_value = INITIAL_NRU_VALUE) :
metaData(),
key(k),
bySeqno(bySeq),
queuedTime(ep_current_time()),
vbucketId(vb),
- op(static_cast<uint16_t>(o)),
+ op(o),
nru(nru_value)
{
if (bySeqno < 0) {
}
bool isDeleted() {
- return op == queue_op_del;
+ return op == queue_op::del;
}
void setDeleted() {
- op = queue_op_del;
+ op = queue_op::del;
}
uint32_t getQueuedTime(void) const { return queuedTime; }
- enum queue_operation getOperation(void) const {
- return static_cast<enum queue_operation>(op);
+ queue_op getOperation(void) const {
+ return op;
}
/*
* Should this item be persisted?
*/
bool shouldPersist() const {
- return (op == queue_op_set) ||
- (op == queue_op_del);
+ return (op == queue_op::set) ||
+ (op == queue_op::del);
}
/*
* Should this item be replicated (e.g. by DCP)
*/
bool shouldReplicate() const {
- return (op == queue_op_set) ||
- (op == queue_op_del);
+ return (op == queue_op::set) ||
+ (op == queue_op::del);
}
- void setOperation(enum queue_operation o) {
- op = static_cast<uint8_t>(o);
+ void setOperation(queue_op o) {
+ op = o;
}
bool isCheckPointMetaItem(void) const {
- queue_operation qOp = static_cast<enum queue_operation>(op);
- if ((queue_op_set == qOp) || (queue_op_del == qOp)) {
+ if ((queue_op::set == op) || (queue_op::del == op)) {
return false;
}
return true;
int64_t bySeqno;
uint32_t queuedTime;
uint16_t vbucketId;
- uint8_t op;
+ queue_op op;
uint8_t nru : 2;
static AtomicValue<uint64_t> casCounter;
queued_item qi = vb->checkpointManager.nextItem(getName(),
isLastItem);
switch(qi->getOperation()) {
- case queue_op_set:
- case queue_op_del:
+ case queue_op::set:
+ case queue_op::del:
if (supportCheckpointSync_ && isLastItem) {
it->second.lastItem = true;
} else {
}
addEvent_UNLOCKED(qi);
break;
- case queue_op_checkpoint_start:
+ case queue_op::checkpoint_start:
{
it->second.currentCheckpointId = qi->getRevSeqno();
if (supportCheckpointSync_) {
}
}
break;
- case queue_op_checkpoint_end:
+ case queue_op::checkpoint_end:
if (supportCheckpointSync_) {
it->second.state = checkpoint_end;
uint32_t seqno_acked;
}
}
break;
- case queue_op_empty:
+ case queue_op::empty:
{
++open_checkpoint_count;
}
queued_item checkpoint_msg = nextCheckpointMessage_UNLOCKED();
if (checkpoint_msg.get() != NULL) {
switch (checkpoint_msg->getOperation()) {
- case queue_op_checkpoint_start:
+ case queue_op::checkpoint_start:
ret = TAP_CHECKPOINT_START;
break;
- case queue_op_checkpoint_end:
+ case queue_op::checkpoint_end:
ret = TAP_CHECKPOINT_END;
break;
default:
logger.log(EXTENSION_LOG_WARNING,
- "Checkpoint start or end msg with incorrect opcode %d",
- checkpoint_msg->getOperation());
+ "Checkpoint start or end msg with incorrect opcode %s",
+ to_string(checkpoint_msg->getOperation()).c_str());
ret = TAP_DISCONNECT;
return NULL;
}
++stats.numTapBGFetched;
qi = queued_item(new Item(itm->getKey(), itm->getVBucketId(),
- ret == TAP_MUTATION ? queue_op_set : queue_op_del,
+ ret == TAP_MUTATION ? queue_op::set : queue_op::del,
itm->getRevSeqno(), itm->getBySeqno()));
} else if (hasItemFromVBHashtable_UNLOCKED()) { // Item from memory backfill or checkpoints
if (waitForCheckpointMsgAck()) {
return NULL;
}
- if (qi->getOperation() == queue_op_set) {
+ if (qi->getOperation() == queue_op::set) {
get_options_t options = DELETE_TEMP;
GetValue gv(engine_.getEpStore()->get(qi->getKey(), qi->getVBucketId(),
c, options));
itm = gv.getValue();
if (itm == nullptr) {
throw std::logic_error("TapProducer::getNextItem: found a"
- " NULL value for GetValue from queue_op_set");
+ " NULL value for GetValue from queue_op::set");
}
nru = gv.getNRUValue();
ret = TAP_MUTATION;
return NULL;
}
++stats.numTapFGFetched;
- } else if (qi->getOperation() == queue_op_del) {
+ } else if (qi->getOperation() == queue_op::del) {
itm = new Item(qi->getKey().c_str(), qi->getNKey(),
/*flags*/0, /*exp*/0,
/*data*/NULL, /*size*/0,
item_ = qi;
switch(item_->getOperation()) {
- case queue_op_set:
+ case queue_op::set:
event_ = TAP_MUTATION;
break;
- case queue_op_del:
+ case queue_op::del:
event_ = TAP_DELETION;
break;
- case queue_op_flush:
+ case queue_op::flush:
event_ = TAP_FLUSH;
break;
- case queue_op_checkpoint_start:
+ case queue_op::checkpoint_start:
event_ = TAP_CHECKPOINT_START;
break;
- case queue_op_checkpoint_end:
+ case queue_op::checkpoint_end:
event_ = TAP_CHECKPOINT_END;
break;
default:
args->checkpoint_manager->getAllItemsForCursor(cursor, items);
for(itemPos = 0; itemPos < items.size(); ++itemPos) {
queued_item qi = items.at(itemPos);
- if (qi->getOperation() == queue_op_flush) {
+ if (qi->getOperation() == queue_op::flush) {
flush = true;
break;
}
// these. Anything else will be considered an error.
for(size_t i = itemPos + 1; i < items.size(); ++i) {
queued_item qi = items.at(i);
- EXPECT_TRUE(queue_op_checkpoint_start == qi->getOperation() ||
- queue_op_checkpoint_end == qi->getOperation())
- << "Unexpected operation:" << qi->getOperation();
+ EXPECT_TRUE(queue_op::checkpoint_start == qi->getOperation() ||
+ queue_op::checkpoint_end == qi->getOperation())
+ << "Unexpected operation:" << to_string(qi->getOperation());
}
break;
}
while(true) {
queued_item qi = args->checkpoint_manager->nextItem(args->name,
isLastItem);
- if (qi->getOperation() == queue_op_flush) {
+ if (qi->getOperation() == queue_op::flush) {
flush = true;
break;
}
std::stringstream key;
key << "key-" << i;
queued_item qi(new Item(key.str(), args->vbucket->getId(),
- queue_op_set, 0, 0));
+ queue_op::set, 0, 0));
args->checkpoint_manager->queueDirty(args->vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes);
}
}
// Push the flush command into the queue so that all other threads can be terminated.
std::string key("flush");
- queued_item qi(new Item(key, vbucket->getId(), queue_op_flush, 0xffff, 0));
+ queued_item qi(new Item(key, vbucket->getId(), queue_op::flush, 0xffff, 0));
checkpoint_manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes);
rc = cb_join_thread(persistence_thread);
for (i = 0; i < 10; ++i) {
std::stringstream key;
key << "key-" << i;
- queued_item qi(new Item(key.str(), vbucket->getId(), queue_op_set,
+ queued_item qi(new Item(key.str(), vbucket->getId(), queue_op::set,
0, 0));
manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes);
}
manager->getAllItemsForCursor(cursor, items);
for(itemPos = 0; itemPos < items.size(); ++itemPos) {
queued_item qi = items.at(itemPos);
- if (qi->getOperation() != queue_op_checkpoint_start &&
- qi->getOperation() != queue_op_checkpoint_end) {
+ if (qi->getOperation() != queue_op::checkpoint_start &&
+ qi->getOperation() != queue_op::checkpoint_end) {
size_t mid = qi->getBySeqno();
EXPECT_GT(mid, lastMutationId);
lastMutationId = qi->getBySeqno();
}
if (itemPos == 0 || itemPos == (items.size() - 1)) {
- EXPECT_EQ(queue_op_checkpoint_start, qi->getOperation()) << "For itemPos:" << itemPos;
+ EXPECT_EQ(queue_op::checkpoint_start, qi->getOperation()) << "For itemPos:" << itemPos;
} else if (itemPos == (items.size() - 2)) {
- EXPECT_EQ(queue_op_checkpoint_end, qi->getOperation()) << "For itemPos:" << itemPos;
+ EXPECT_EQ(queue_op::checkpoint_end, qi->getOperation()) << "For itemPos:" << itemPos;
chk++;
} else {
- EXPECT_EQ(queue_op_set, qi->getOperation()) << "For itemPos:" << itemPos;
+ EXPECT_EQ(queue_op::set, qi->getOperation()) << "For itemPos:" << itemPos;
}
}
EXPECT_EQ(13, items.size());
TEST_F(CheckpointTest, OneOpenCkpt) {
// Queue a set operation.
- queued_item qi(new Item("key1", vbucket->getId(), queue_op_set,
+ queued_item qi(new Item("key1", vbucket->getId(), queue_op::set,
/*revSeq*/20, /*bySeq*/0));
// No set_ops in queue, expect queueDirty to return true (increase
EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
// Adding the same key again shouldn't increase the size.
- queued_item qi2(new Item("key1", vbucket->getId(), queue_op_set,
+ queued_item qi2(new Item("key1", vbucket->getId(), queue_op::set,
/*revSeq*/21, /*bySeq*/0));
EXPECT_FALSE(manager->queueDirty(vbucket, qi2, GenerateBySeqno::Yes, GenerateCas::Yes));
EXPECT_EQ(1, manager->getNumCheckpoints());
EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
// Adding a different key should increase size.
- queued_item qi3(new Item("key2", vbucket->getId(), queue_op_set,
+ queued_item qi3(new Item("key2", vbucket->getId(), queue_op::set,
/*revSeq*/0, /*bySeq*/0));
EXPECT_TRUE(manager->queueDirty(vbucket, qi3, GenerateBySeqno::Yes, GenerateCas::Yes));
EXPECT_EQ(1, manager->getNumCheckpoints());
// Add some items to the initial (open) checkpoint.
for (auto i : {1,2}) {
queued_item qi(new Item("key" + std::to_string(i), vbucket->getId(),
- queue_op_set, /*revSeq*/0, /*bySeq*/0));
+ queue_op::set, /*revSeq*/0, /*bySeq*/0));
EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
}
EXPECT_EQ(1, manager->getNumCheckpoints());
// ckpt).
for (auto ii : {1,2}) {
queued_item qi(new Item("key" + std::to_string(ii), vbucket->getId(),
- queue_op_set, /*revSeq*/1, /*bySeq*/0));
+ queue_op::set, /*revSeq*/1, /*bySeq*/0));
EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
}
EXPECT_EQ(2, manager->getNumCheckpoints());
EXPECT_EQ(ii + 1, manager->getNumOpenChkItems()); /* +1 for op_ckpt_start */
qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
- queue_op_set, /*revSeq*/0, /*bySeq*/0));
+ queue_op::set, /*revSeq*/0, /*bySeq*/0));
EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
EXPECT_EQ(1, manager->getNumCheckpoints());
}
// Add one more - should create a new checkpoint.
- qi.reset(new Item("key_epoch", vbucket->getId(), queue_op_set, /*revSeq*/0,
+ qi.reset(new Item("key_epoch", vbucket->getId(), queue_op::set, /*revSeq*/0,
/*bySeq*/0));
EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
EXPECT_EQ(2, manager->getNumCheckpoints());
EXPECT_EQ(ii + 2, manager->getNumOpenChkItems()); /* +2 op_ckpt_start, key_epoch */
qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
- queue_op_set, /*revSeq*/1, /*bySeq*/0));
+ queue_op::set, /*revSeq*/1, /*bySeq*/0));
EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
EXPECT_EQ(2, manager->getNumCheckpoints());
}
// Add one more - as we have hit maximum checkpoints should *not* create a
// new one.
- qi.reset(new Item("key_epoch2", vbucket->getId(), queue_op_set,
+ qi.reset(new Item("key_epoch2", vbucket->getId(), queue_op::set,
/*revSeq*/1, /*bySeq*/0));
EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
EXPECT_EQ(2, manager->getNumCheckpoints());
EXPECT_EQ(12, manager->getNumOpenChkItems());
// But adding a new item will create a new one.
- qi.reset(new Item("key_epoch3", vbucket->getId(), queue_op_set,
+ qi.reset(new Item("key_epoch3", vbucket->getId(), queue_op::set,
/*revSeq*/1, /*bySeq*/0));
EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
EXPECT_EQ(3, manager->getNumCheckpoints());
// Add two items to the initial (open) checkpoint.
for (auto i : {1,2}) {
queued_item qi(new Item("key" + std::to_string(i), vbucket->getId(),
- queue_op_set, /*revSeq*/0, /*bySeq*/0));
+ queue_op::set, /*revSeq*/0, /*bySeq*/0));
EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
}
EXPECT_EQ(1, manager->getNumCheckpoints());
// Check de-dupe counting - after adding another item with the same key,
// should still see two items.
queued_item qi(new Item("key1", vbucket->getId(),
- queue_op_set, /*revSeq*/0, /*bySeq*/0));
+ queue_op::set, /*revSeq*/0, /*bySeq*/0));
EXPECT_FALSE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes))
<< "Adding a duplicate key to open checkpoint should not increase queue size";
// but cannot de-dupe across checkpoints.
for (auto ii : {1,2}) {
queued_item qi(new Item("key" + std::to_string(ii), vbucket->getId(),
- queue_op_set, /*revSeq*/1, /*bySeq*/0));
+ queue_op::set, /*revSeq*/1, /*bySeq*/0));
EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
}
queued_item qi;
for (unsigned int ii = 0; ii < 2 * MIN_CHECKPOINT_ITEMS; ii++) {
qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
- queue_op_set, /*revSeq*/0, /*bySeq*/0));
+ queue_op::set, /*revSeq*/0, /*bySeq*/0));
EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
}
queued_item qi;
for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
- queue_op_set, /*revSeq*/0, /*bySeq*/0));
+ queue_op::set, /*revSeq*/0, /*bySeq*/0));
EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
}
checkpoint. TAP unlike DCP cannot skip the op_ckpt_end message */
bool isLastItem = false;
qi = manager->nextItem(tap_cursor, isLastItem);
- EXPECT_EQ(queue_op_checkpoint_end, qi->getOperation());
+ EXPECT_EQ(queue_op::checkpoint_end, qi->getOperation());
EXPECT_EQ(true, isLastItem);
}
std::string key = "key" + std::to_string(ii);
for (int item = 0; item < n_items; item++) {
queued_item qi(new Item(key + std::to_string(item),
- vbucket->getId(), queue_op_set,
+ vbucket->getId(), queue_op::set,
/*revSeq*/0, /*bySeq*/0));
EXPECT_TRUE(manager->queueDirty(vbucket,
qi,