std::list<queued_item>::iterator currPos = it->second.position;
uint64_t currMutationId = it->second.mutation_id;
+ // Given the key already exists, need to check all cursors in this
+ // Checkpoint and see if the existing item for this key is to
+ // the "left" of the cursor (i.e. has already been processed) - in
+ // which case we need to adjust the cursor's offset to ensure that
+ // we correctly account for the updated item which will need to be
+ // iterated over.
for (auto& cursor : checkpointManager->connCursors) {
if (*(cursor.second.currentCheckpoint) == this) {
- queued_item &tqi = *(cursor.second.currentPos);
- const std::string &key = tqi->getKey();
- checkpoint_index::iterator ita = keyIndex.find(key);
- if (ita != keyIndex.end() && (!tqi->isCheckPointMetaItem()))
- {
- uint64_t mutationId = ita->second.mutation_id;
- if (currMutationId <= mutationId) {
- cursor.second.decrOffset(1);
- if (cursor.second.name.compare(CheckpointManager::pCursorName)
- == 0) {
- rv = PERSIST_AGAIN;
- }
+
+ queued_item& cursor_item = *(cursor.second.currentPos);
+
+ auto& index =
+ cursor_item->isCheckPointMetaItem() ? metaKeyIndex
+ : keyIndex;
+
+ auto cursor_item_idx = index.find(cursor_item->getKey());
+ if (cursor_item_idx == keyIndex.end()) {
+ throw std::logic_error("Checkpoint::queueDirty: Unable "
+ "to find key with"
+ " op:" + to_string(cursor_item->getOperation()) +
+ " seqno:" + std::to_string(cursor_item->getBySeqno()) +
+ "for cursor:" + cursor.first + " in current checkpoint.");
+ }
+
+ // If the cursor item is non-meta, then we need to decrement
+ // offset if existing item is either before or on the cursor
+ // - as the cursor points to the "last processed" item.
+ // However if the cursor item is meta, then we only
+ // decrement if the the existing item is strictly less than
+ // the cursor, as meta-items can share a seqno with
+ // a non-meta item but are logically before them.
+ uint64_t cursor_mutation_id = cursor_item_idx->second.mutation_id;
+ if (cursor_item->isCheckPointMetaItem()) {
+ --cursor_mutation_id;
+ }
+ if (currMutationId <= cursor_mutation_id) {
+ // Cursor has already processed the previous value for
+ // this key - need to logically move the cursor
+ // backwards one so it will pick up the new value for
+ // this key.
+ cursor.second.decrOffset(1);
+ if (cursor.second.name == CheckpointManager::pCursorName) {
+ rv = PERSIST_AGAIN;
}
}
/* If an TAP cursor points to the existing item for the same
}
}
- // Notify flusher if in case queued item is a checkpoint meta item
+ // Notify flusher if in case queued item is a checkpoint meta item or
+ // vbpersist state.
if (qi->getOperation() == queue_op::checkpoint_start ||
- qi->getOperation() == queue_op::checkpoint_end) {
+ qi->getOperation() == queue_op::checkpoint_end ||
+ qi->getOperation() == queue_op::set_vbucket_state) {
checkpointManager->notifyFlusher();
}
size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
size_t numNewItems = 0;
size_t newEntryMemOverhead = 0;
- std::list<queued_item>::reverse_iterator rit = pPrevCheckpoint->rbegin();
LOG(EXTENSION_LOG_INFO,
"Collapse the checkpoint %" PRIu64 " into the checkpoint %" PRIu64
++itr;
(*itr)->setBySeqno(seqno);
- for (; rit != pPrevCheckpoint->rend(); ++rit) {
+ // Iterate in reverse over the previous checkpoints' items, inserting them
+ // into the current checkpoint as necessary.
+ for (auto rit = pPrevCheckpoint->rbegin(); rit != pPrevCheckpoint->rend();
+ ++rit) {
const std::string &key = (*rit)->getKey();
- if ((*rit)->isCheckPointMetaItem()) {
- continue;
- }
- checkpoint_index::iterator it = keyIndex.find(key);
- if (it == keyIndex.end()) {
- std::list<queued_item>::iterator pos = toWrite.begin();
- // Skip the first two meta items
- ++pos; ++pos;
- toWrite.insert(pos, *rit);
- index_entry entry = {--pos, static_cast<int64_t>(pPrevCheckpoint->
- getMutationIdForKey(key, false))};
- keyIndex[key] = entry;
- newEntryMemOverhead += key.size() + sizeof(index_entry);
- ++numItems;
- ++numNewItems;
+ switch ((*rit)->getOperation()) {
+ case queue_op::set:
+ case queue_op::del:
+ // For the two 'normal' operations, re-insert into the current
+ // checkpoint if the key isn't already present (if it is already
+ // present then it must be an older revision and hence we can
+ // safely discard it).
+ if (keyIndex.find(key) == keyIndex.end()) {
+ // Skip the first two meta items (empty & checkpoint start).
+ auto pos = std::next(toWrite.begin(), 2);
+ toWrite.insert(pos, *rit);
+ index_entry entry = {--pos, static_cast<int64_t>(pPrevCheckpoint->
+ getMutationIdForKey(key, false))};
+ keyIndex[key] = entry;
+ newEntryMemOverhead += key.size() + sizeof(index_entry);
+ ++numItems;
+ ++numNewItems;
+
+ // Update new checkpoint's memory usage
+ incrementMemConsumption((*rit)->size());
+ }
+ break;
+
+ case queue_op::flush:
+ // Should expect to see any `flush` items actually queued.
+ throw std::logic_error("Checkpoint::mergePrevCheckpoint: "
+ "Unexpected flush item in checkpoint");
+ break;
+
+ case queue_op::empty:
+ // Empty will be the first item in the checkpoint (and handled
+ // already above) - ignore.
+ break;
+
+ case queue_op::checkpoint_start:
+ // Similarly - handled in prologue of this method - ignore.
+ break;
+
+ case queue_op::checkpoint_end:
+ // Can also ignore checkpoint_end.
+ break;
- // Update new checkpoint's memory usage
- incrementMemConsumption((*rit)->size());
+ case queue_op::set_vbucket_state:
+ // Need to re-insert these into the correct place in the index.
+ if (metaKeyIndex.find(key) == metaKeyIndex.end()) {
+ // Skip the first two meta items (empty & checkpoint start).
+ auto pos = std::next(toWrite.begin(), 2);
+ toWrite.insert(pos, *rit);
+ auto mutationId = static_cast<int64_t>(
+ pPrevCheckpoint->getMutationIdForKey(key, true));
+ metaKeyIndex[key] = {--pos, mutationId};
+ newEntryMemOverhead += key.size() + sizeof(index_entry);
+ ++numMetaItems;
+ ++numNewItems;
+
+ // Update new checkpoint's memory usage
+ incrementMemConsumption((*rit)->size());
+ }
+ break;
}
}
void CheckpointManager::setOpenCheckpointId_UNLOCKED(uint64_t id) {
if (!checkpointList.empty()) {
// Update the checkpoint_start item with the new Id.
- std::list<queued_item>::iterator it =
- ++(checkpointList.back()->begin());
- (*it)->setRevSeqno(id);
+ const auto ckpt_start = ++(checkpointList.back()->begin());
+ (*ckpt_start)->setRevSeqno(id);
if (checkpointList.back()->getId() == 0) {
- (*it)->setBySeqno(lastBySeqno + 1);
+ (*ckpt_start)->setBySeqno(lastBySeqno + 1);
checkpointList.back()->setSnapshotStartSeqno(lastBySeqno);
checkpointList.back()->setSnapshotEndSeqno(lastBySeqno);
}
+ // Update any set_vbstate items to have the same seqno as the
+ // checkpoint_start.
+ const auto ckpt_start_seqno = (*ckpt_start)->getBySeqno();
+ for (auto item = std::next(ckpt_start);
+ item != checkpointList.back()->end();
+ item++) {
+ if ((*item)->getOperation() == queue_op::set_vbucket_state) {
+ (*item)->setBySeqno(ckpt_start_seqno);
+ }
+ }
+
checkpointList.back()->setId(id);
LOG(EXTENSION_LOG_INFO, "Set the current open checkpoint id to %" PRIu64
" for vbucket %d, bySeqno is %" PRId64 ", max is %" PRId64,
- id, vbucketId, (*it)->getBySeqno(), lastBySeqno);
+ id, vbucketId, (*ckpt_start)->getBySeqno(), lastBySeqno);
}
}
return cursorsToDrop;
}
+void CheckpointManager::updateStatsForNewQueuedItem_UNLOCKED(const LockHolder&,
+ VBucket& vb,
+ const queued_item& qi) {
+ ++stats.totalEnqueued;
+ ++stats.diskQueueSize;
+ vb.doStatsForQueueing(*qi, qi->size());
+ // Update the checkpoint's memory usage
+ checkpointList.back()->incrementMemConsumption(qi->size());
+}
+
bool CheckpointManager::queueDirty(VBucket& vb, queued_item& qi,
const GenerateBySeqno generateBySeqno,
const GenerateCas generateCas) {
}
if (result != EXISTING_ITEM) {
- ++stats.totalEnqueued;
- ++stats.diskQueueSize;
- vb.doStatsForQueueing(*qi, qi->size());
-
- // Update the checkpoint's memory usage
- checkpointList.back()->incrementMemConsumption(qi->size());
+ updateStatsForNewQueuedItem_UNLOCKED(lh, vb, qi);
}
return result != EXISTING_ITEM;
}
+void CheckpointManager::queueSetVBState(VBucket& vb) {
+ // Take lock to serialize use of {lastBySeqno} and to queue op.
+ LockHolder lh(queueLock);
+
+ // Create the setVBState operation, and enqueue it.
+ queued_item item = createCheckpointItem(/*id*/0, vbucketId,
+ queue_op::set_vbucket_state);
+
+ auto result = checkpointList.back()->queueDirty(item, this);
+
+ if (result == NEW_ITEM) {
+ ++numItems;
+ updateStatsForNewQueuedItem_UNLOCKED(lh, vb, item);
+ } else {
+ throw std::logic_error("CheckpointManager::queueSetVBState: "
+ "expected: NEW_ITEM, got:" + std::to_string(result) +
+ "after queueDirty. vbid:" + std::to_string(vbucketId));
+ }
+}
+
snapshot_range_t CheckpointManager::getAllItemsForCursor(
const std::string& name,
std::vector<queued_item> &items) {
}
queued_item CheckpointManager::createCheckpointItem(uint64_t id, uint16_t vbid,
- queue_op checkpoint_op) {
+ queue_op checkpoint_op) {
uint64_t bySeqno;
std::string key;
const GenerateBySeqno generateBySeqno,
const GenerateCas generateCas);
+ /*
+ * Queue writing of the VBucket's state to persistent layer.
+ * @param vb the vbucket that a new item is pushed into.
+ */
+ void queueSetVBState(VBucket& vb);
+
/**
* Return the next item to be sent to a given connection
* @param name the name of a given connection
static const std::string pCursorName;
+protected:
+
+ // Helper method for queueing methods - update the global and per-VBucket
+ // stats after queueing a new item to a checkpoint.
+ // Must be called with queueLock held (LockHolder passed in as argument to
+ // 'prove' this).
+ void updateStatsForNewQueuedItem_UNLOCKED(const LockHolder&,
+ VBucket& vb, const queued_item& qi);
+
private:
// Pair of {sequence number, cursor at checkpoint start} used when
return cachedVBStates[vbucketId];
}
-bool CouchKVStore::setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
+bool CouchKVStore::setVBucketState(uint16_t vbucketId,
+ const vbucket_state &vbstate,
Callback<kvstats_ctx> *kvcb, bool reset) {
Db *db = NULL;
uint64_t fileRev, newFileRev;
return couchErr2EngineErr(errCode);
}
-couchstore_error_t CouchKVStore::saveVBState(Db *db, vbucket_state &vbState) {
+couchstore_error_t CouchKVStore::saveVBState(Db *db,
+ const vbucket_state &vbState) {
std::stringstream jsonState;
jsonState << "{\"state\": \"" << VBucket::toString(vbState.state) << "\""
void destroyScanContext(ScanContext* ctx);
protected:
- bool setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
+ bool setVBucketState(uint16_t vbucketId, const vbucket_state &vbstate,
Callback<kvstats_ctx> *cb, bool reset=false);
template <typename T>
void commitCallback(std::vector<CouchRequest *> &committedReqs,
kvstats_ctx &kvctx,
couchstore_error_t errCode);
- couchstore_error_t saveVBState(Db *db, vbucket_state &vbState);
+ couchstore_error_t saveVBState(Db *db, const vbucket_state &vbState);
void setDocsCommitted(uint16_t docs);
void closeDatabaseHandle(Db *db);
RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
vb->failovers->replaceFailoverLog(body, bodylen);
EventuallyPersistentStore* st = engine_.getEpStore();
- st->scheduleVBSnapshot(VBSnapshotTask::Priority::HIGH,
- st->getVBuckets().getShardByVbId(vbucket)->getId());
+ st->scheduleVBStatePersist(vbucket);
}
LOG(EXTENSION_LOG_INFO, "%s (vb %d) Add stream for opaque %" PRIu32
" %s with error code %d", logHeader(), vbucket, opaque,
checkpointRemoverInterval);
ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
- ExTask vbSnapshotTask = new DaemonVBSnapshotTask(&engine);
- ExecutorPool::get()->schedule(vbSnapshotTask, WRITER_TASK_IDX);
-
ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
}
if (!success) {
- scheduleVBSnapshot(prio, shard->getId());
+ scheduleVBSnapshot(prio);
} else {
stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
}
ExTask notifyTask = new PendingOpsNotification(engine, vb);
ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
}
- scheduleVBStatePersist(VBStatePersistTask::Priority::LOW, vbid);
+ scheduleVBStatePersist(vbid);
} else if (vbid < vbMap.getSize()) {
FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
KVShard* shard = vbMap.getShardByVbId(vbid);
vbMap.setPersistenceSeqno(vbid, 0);
vbMap.setBucketCreation(vbid, true);
lh.unlock();
- scheduleVBStatePersist(VBStatePersistTask::Priority::HIGH, vbid);
+ scheduleVBStatePersist(vbid);
} else {
return ENGINE_ERANGE;
}
return true;
}
-void EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio,
- uint16_t shardId,
- bool force) {
- KVShard *shard = vbMap.shards[shardId];
- if (prio == VBSnapshotTask::Priority::HIGH) {
- if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
- ExTask task = new VBSnapshotTaskHigh(&engine, shardId, true);
- ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
- }
- } else {
- if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
- ExTask task = new VBSnapshotTaskLow(&engine, shardId, true);
- ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
- }
+void EventuallyPersistentStore::scheduleVBStatePersist() {
+ for (auto vbid : vbMap.getBuckets()) {
+ scheduleVBStatePersist(vbid);
}
}
-void EventuallyPersistentStore::scheduleVBStatePersist(VBStatePersistTask::Priority priority,
- uint16_t vbid) {
-
-
- bool inverse = false;
- if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
+void EventuallyPersistentStore::scheduleVBStatePersist(VBucket::id_type vbid) {
+ RCPtr<VBucket> vb = getVBucket(vbid);
- if (priority == VBStatePersistTask::Priority::HIGH) {
- ExecutorPool::get()->schedule(new VBStatePersistTaskHigh(&engine, vbid, true), WRITER_TASK_IDX);
- } else {
- ExecutorPool::get()->schedule(new VBStatePersistTaskLow(&engine, vbid, true), WRITER_TASK_IDX);
- }
+ if (!vb) {
+ LOG(EXTENSION_LOG_WARNING,
+ "EPStore::scheduleVBStatePersist: vb:%" PRIu16
+ " does not not exist. Unable to schedule persistence.", vbid);
+ return;
}
+
+ vb->checkpointManager.queueSetVBState(*vb);
}
bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
void EventuallyPersistentStore::flushOneDeleteAll() {
for (VBucketMap::id_type i = 0; i < vbMap.getSize(); ++i) {
RCPtr<VBucket> vb = getVBucket(i);
- if (vb) {
+ // Reset the vBucket if it's non-null and not already in the middle of
+ // being created / destroyed.
+ if (vb &&
+ !(vbMap.isBucketCreation(i) || vbMap.isBucketDeletion(i))) {
LockHolder lh(vb_mutexes[vb->getId()]);
getRWUnderlying(vb->getId())->reset(i);
}
}
}
- if (vbMap.isBucketCreation(vbid)) {
- return RETRY_FLUSH_VBUCKET;
- }
-
int items_flushed = 0;
- rel_time_t flush_start = ep_current_time();
+ const rel_time_t flush_start = ep_current_time();
RCPtr<VBucket> vb = vbMap.getBucket(vbid);
if (vb) {
rwUnderlying->optimizeWrites(items);
Item *prev = NULL;
+ auto vbstate = vb->getVBucketState();
uint64_t maxSeqno = 0;
- uint64_t maxCas = 0;
- uint64_t maxDeletedRevSeqno = 0;
+ range.start = std::max(range.start, vbstate.lastSnapStart);
+
+ bool mustCheckpointVBState = false;
std::list<PersistenceCallback*>& pcbs = rwUnderlying->getPersistenceCbList();
for (const auto& item : items) {
if (!item->shouldPersist()) {
continue;
+ }
+
+ if (item->getOperation() == queue_op::set_vbucket_state) {
+ // No actual item explicitly persisted to (this op exists
+ // to ensure a commit occurs with the current vbstate);
+ // flag that we must trigger a snapshot even if there are
+ // no 'real' items in the checkpoint.
+ mustCheckpointVBState = true;
+
+ // Update maxSeqno to ensure the snap {start,end} range
+ // is correct if no other normal item is included in this
+ // checkpoint.
+ maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
+
+ // Update queuing stats how this item has logically been
+ // processed.
+ stats.decrDiskQueueSize(1);
+ vb->doStatsForFlushing(*item, item->size());
+
} else if (!prev || prev->getKey() != item->getKey()) {
prev = item.get();
++items_flushed;
}
maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
- maxCas = std::max(maxCas, item->getCas());
+ vbstate.maxCas = std::max(vbstate.maxCas, item->getCas());
if (item->isDeleted()) {
- maxDeletedRevSeqno = std::max(maxDeletedRevSeqno,
- item->getRevSeqno());
+ vbstate.maxDeletedSeqno =
+ std::max(vbstate.maxDeletedSeqno,
+ item->getRevSeqno());
}
++stats.flusher_todo;
+
} else {
- // Item is the same key as the previous one - don't need
+ // Item is the same key as the previous[1] one - don't need
// to flush to disk.
+ // [1] Previous here really means 'next' - optimizeWrites()
+ // above has actually re-ordered items such that items
+ // with the same key are ordered from high->low seqno.
+ // This means we only write the highest (i.e. newest)
+ // item for a given key, and discard any duplicate,
+ // older items.
stats.decrDiskQueueSize(1);
vb->doStatsForFlushing(*item, item->size());
}
}
}
- // Get current vbstate, then update based on the changes we
- // have just made.
- auto vbstate = vb->getVBucketState();
+ // Update VBstate based on the changes we have just made,
+ // then tell the rwUnderlying the 'new' state
+ // (which will persisted as part of the commit() below).
vbstate.lastSnapStart = range.start;
vbstate.lastSnapEnd = range.end;
- vbstate.maxCas = maxCas;
- vbstate.maxDeletedSeqno = maxDeletedRevSeqno;
+ // Do we need to trigger a persist of the state?
+ // If there are no "real" items to flush, and we encountered
+ // a set_vbucket_state meta-item.
+ const bool persist = (items_flushed == 0) && mustCheckpointVBState;
+
+ KVStatsCallback kvcb(this);
if (rwUnderlying->snapshotVBucket(vb->getId(), vbstate,
- NULL, false) != true) {
+ &kvcb, persist) != true) {
return RETRY_FLUSH_VBUCKET;
}
+
+ if (vbMap.setBucketCreation(vbid, false)) {
+ LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
+ }
}
- //commit all mutations to disk if the commit interval is zero
- if (decrCommitInterval(shard->getId()) == 0) {
+ // Commit all mutations to disk if there is a non-zero number
+ // of items to flush, and the commit interval is zero.
+ if ((items_flushed > 0) &&
+ (decrCommitInterval(shard->getId()) == 0)) {
+
commit(shard->getId());
+
+ // Now the commit is complete, vBucket file must exist.
+ if (vbMap.setBucketCreation(vbid, false)) {
+ LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
+ }
}
hrtime_t end = gethrtime();
stats.cumulativeFlushTime.fetch_add(ep_current_time()
- flush_start);
stats.flusher_todo.store(0);
+ stats.totalPersistVBState++;
+
if (vb->rejectQueue.empty()) {
vb->setPersistedSnapshot(range.start, range.end);
uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
stats.dirtyAgeHighWat.load()));
- // Wait until the vbucket database is created by the vbucket state
- // snapshot task.
- if (vbMap.isBucketCreation(qi->getVBucketId()) ||
- vbMap.isBucketDeletion(qi->getVBucketId())) {
- vb->rejectQueue.push(qi);
- ++vb->opsReject;
- return NULL;
- }
-
KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
if (!deleted) {
// TODO: Need to separate disk_insert from disk_update because
}
void EventuallyPersistentStore::warmupCompleted() {
- // Run the vbucket state snapshot job once after the warmup
- scheduleVBSnapshot(VBSnapshotTask::Priority::HIGH);
+ // Snapshot VBucket state after warmup to ensure Failover table is
+ // persisted.
+ scheduleVBStatePersist();
if (engine.getConfiguration().getAlogPath().length() > 0) {
}
void EventuallyPersistentStore::runVbStatePersistTask(int vbid) {
- scheduleVBStatePersist(VBStatePersistTask::Priority::LOW, vbid);
+ scheduleVBStatePersist(vbid);
}
void EventuallyPersistentStore::setCursorDroppingLowerUpperThresholds(
bool scheduleVBSnapshot(VBSnapshotTask::Priority prio);
/**
- * schedule a vb_state snapshot task for a given shard.
+ * Schedule a vbstate persistence operation for all vbuckets.
*/
- void scheduleVBSnapshot(VBSnapshotTask::Priority prio, uint16_t shardId,
- bool force = false);
+ void scheduleVBStatePersist();
/**
- * Schedule a vbstate persistence task for a given vbucket.
+ * Schedule a vbstate persistence operation for a given vbucket.
*/
- void scheduleVBStatePersist(VBStatePersistTask::Priority prio,
- uint16_t vbid);
+ void scheduleVBStatePersist(VBucket::id_type vbid);
/**
* Persist a vbucket's state.
epstats.snapshotVbucketHisto.total(), add_stat, cookie);
add_casted_stat("ep_persist_vbstate_total",
- epstats.persistVBStateHisto.total(), add_stat, cookie);
+ epstats.totalPersistVBState, add_stat, cookie);
add_casted_stat("ep_vb_total",
activeCountVisitor.getVBucketNumber() +
Item(const Item& other, bool copyKeyOnly = false) :
metaData(other.metaData),
key(other.key),
- bySeqno(other.bySeqno),
+ bySeqno(other.bySeqno.load()),
queuedTime(other.queuedTime),
vbucketId(other.vbucketId),
op(other.op),
}
int64_t getBySeqno() const {
- return bySeqno;
+ return bySeqno.load();
}
void setBySeqno(int64_t to) {
- bySeqno = to;
+ bySeqno.store(to);
}
int getNKey() const {
ItemMetaData metaData;
value_t value;
std::string key;
- int64_t bySeqno;
+
+ // bySeqno is atomic because it (rarely) needs to be changed after
+ // the item has been added to a Checkpoint - for meta-items in
+ // checkpoints when updating a the open checkpointID - see
+ // CheckpointManager::setOpenCheckpointId_UNLOCKED
+ std::atomic<int64_t> bySeqno;
uint32_t queuedTime;
uint16_t vbucketId;
queue_op op;
void setBucket(const RCPtr<VBucket> &b);
void resetBucket(VBucket::id_type id);
- KVShard::id_type getId() { return shardId; }
+ KVShard::id_type getId() const { return shardId; }
std::vector<VBucket::id_type> getVBucketsSortedByState();
std::vector<VBucket::id_type> getVBuckets();
size_t getMaxNumVbuckets() { return maxVbuckets; }
tooYoung(0),
tooOld(0),
totalPersisted(0),
+ totalPersistVBState(0),
totalEnqueued(0),
flushFailed(0),
flushExpired(0),
AtomicValue<size_t> tooOld;
//! Number of items persisted.
AtomicValue<size_t> totalPersisted;
+ //! Number of times VBucket state persisted.
+ AtomicValue<size_t> totalPersistVBState;
//! Cumulative number of items added to the queue.
AtomicValue<size_t> totalEnqueued;
//! Number of times an item flush failed.
void reset() {
tooYoung.store(0);
tooOld.store(0);
+ totalPersistVBState.store(0);
dirtyAge.store(0);
dirtyAgeHighWat.store(0);
commit_time.store(0);
+ cursorsDropped.store(0);
pagerRuns.store(0);
itemsRemovedFromCheckpoints.store(0);
numValueEjects.store(0);
-void VBucket::doStatsForQueueing(Item& qi, size_t itemBytes)
+void VBucket::doStatsForQueueing(const Item& qi, size_t itemBytes)
{
++dirtyQueueSize;
dirtyQueueMem.fetch_add(sizeof(Item));
return true;
}
- void doStatsForQueueing(Item& item, size_t itemBytes);
+ void doStatsForQueueing(const Item& item, size_t itemBytes);
void doStatsForFlushing(Item& item, size_t itemBytes);
void incrMetaDataDisk(Item& qi);
void decrMetaDataDisk(Item& qi);
void setHLCDriftAheadThreshold(uint64_t threshold);
void setHLCDriftBehindThreshold(uint64_t threshold);
+ // Returns the current state of the given vBucket.
+ vbucket_state getVBucketState(VBucket::id_type id) const;
+
private:
std::vector<KVShard*> shards;
// Give incr time to finish (it's doing another background fetch)
wait_for_stat_change(h, h1, "ep_bg_fetched", 1);
- wait_for_stat_change(h, h1, "ep_total_enqueued", 1);
+ wait_for_stat_to_be(h, h1, "ep_total_enqueued", initial_enqueued + 2);
// The incr mutated the value.
check_key_value(h, h1, "k1", "14", 2);
static enum test_result test_flush_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
item *i = NULL;
- int overhead = get_int_stat(h, h1, "ep_overhead");
int cacheSize = get_int_stat(h, h1, "ep_total_cache_size");
int nonResident = get_int_stat(h, h1, "ep_num_non_resident");
wait_for_flusher_to_settle(h, h1);
- int overhead2 = get_int_stat(h, h1, "ep_overhead");
int cacheSize2 = get_int_stat(h, h1, "ep_total_cache_size");
int nonResident2 = get_int_stat(h, h1, "ep_num_non_resident");
- cb_assert(overhead2 == overhead);
cb_assert(nonResident2 == nonResident);
cb_assert(cacheSize2 == cacheSize);
h1->release(h, NULL, itm);
}
}
- wait_for_stat_to_be(h, h1, "vb_0:num_checkpoint_items", 4501, "checkpoint");
+ // 4500 keys + 1x checkpoint_start + 1x set_vbucket_state.
+ wait_for_stat_to_be(h, h1, "vb_0:num_checkpoint_items", 4502, "checkpoint");
return SUCCESS;
}
int items = get_int_stat(h, h1, "curr_items_tot");
int seqno = get_int_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
- int chk = get_int_stat(h, h1, "vb_0:num_checkpoint_items", "checkpoint");
checkeq(40, items, "Got invalid amount of items");
checkeq(40, seqno, "Seqno should be 40 after rollback");
- checkeq(1, chk, "There should only be one checkpoint item");
checkeq(num_items/2, get_int_stat(h, h1, "vb_replica_rollback_item_count"),
"Replica rollback count does not match");
checkeq(num_items/2, get_int_stat(h, h1, "rollback_item_count"),
{
char const *key = "k1";
- // wait until the vb snapshot has run
- wait_for_stat_change(h, h1, "ep_vb_snapshot_total", 0);
// check the stat
int temp = get_int_stat(h, h1, "ep_num_ops_get_meta");
check(temp == 0, "Expect zero getMeta ops");
checkeq(0, get_int_stat(h, h1, "ep_num_ops_del_meta"),
"Expect zero setMeta ops");
- // wait until the vb snapshot has run
- wait_for_stat_change(h, h1, "ep_vb_snapshot_total", 0);
-
// get metadata of nonexistent key
check(!get_meta(h, h1, key), "Expected get meta to return false");
checkeq(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, last_status.load(),
// check the stat
checkeq(0, get_int_stat(h, h1, "ep_num_ops_set_meta"), "Expect zero ops");
- // wait until the vb snapshot has run
- wait_for_stat_change(h, h1, "ep_vb_snapshot_total", 0);
// get metadata for the key
check(!get_meta(h, h1, key), "Expected get meta to return false");
checkeq(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, last_status.load(), "Expected enoent");
previousCas = items[ii]->getCas();
}
}
+
+// Test cursor is correctly updated when enqueuing a key which already exists
+// in the checkpoint (and needs de-duping), where the cursor points at a
+// meta-item at the head of the checkpoint:
+//
+// Before:
+// Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET(key), 2:SET_VBSTATE() ]
+// ^
+// Cursor
+//
+// After:
+// Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 2:SET_VBSTATE(), 2:SET(key) ]
+// ^
+// Cursor
+//
+TEST_F(CheckpointTest, CursorUpdateForExistingItemWithMetaItemAtHead) {
+ // Setup the checkpoint and cursor.
+ ASSERT_EQ(1, manager->getNumItems());
+ ASSERT_TRUE(queueNewItem("key"));
+ ASSERT_EQ(2, manager->getNumItems());
+ manager->queueSetVBState(*vbucket.get());
+
+ ASSERT_EQ(3, manager->getNumItems());
+
+ // Advance persistence cursor so all items have been consumed.
+ std::vector<queued_item> items;
+ manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+ ASSERT_EQ(3, items.size());
+ ASSERT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+ // Queue an item with a duplicate key.
+ queueNewItem("key");
+
+ // Test: Should have one item for cursor (the one we just added).
+ EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+ // Should have another item to read (new version of 'key')
+ items.clear();
+ manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+ EXPECT_EQ(1, items.size());
+}
+
+// Test cursor is correctly updated when enqueuing a key which already exists
+// in the checkpoint (and needs de-duping), where the cursor points at a
+// meta-item *not* at the head of the checkpoint:
+//
+// Before:
+// Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET_VBSTATE(key), 1:SET() ]
+// ^
+// Cursor
+//
+// After:
+// Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET_VBSTATE(key), 2:SET() ]
+// ^
+// Cursor
+//
+TEST_F(CheckpointTest, CursorUpdateForExistingItemWithNonMetaItemAtHead) {
+ // Setup the checkpoint and cursor.
+ ASSERT_EQ(1, manager->getNumItems());
+ manager->queueSetVBState(*vbucket.get());
+ ASSERT_EQ(2, manager->getNumItems());
+
+ // Advance persistence cursor so all items have been consumed.
+ std::vector<queued_item> items;
+ manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+ ASSERT_EQ(2, items.size());
+ ASSERT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+ // Queue a set (cursor will now be one behind).
+ ASSERT_TRUE(queueNewItem("key"));
+ ASSERT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+ // Test: queue an item with a duplicate key.
+ queueNewItem("key");
+
+ // Test: Should have one item for cursor (the one we just added).
+ EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+ // Should an item to read (new version of 'key')
+ items.clear();
+ manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+ EXPECT_EQ(1, items.size());
+ EXPECT_EQ(1002, items.at(0)->getBySeqno());
+ EXPECT_EQ("key", items.at(0)->getKey());
+}
EXPECT_EQ(ENGINE_SUCCESS, engine->initialize(config.c_str()))
<< "Failed to initialize engine.";
- engine->setVBucketState(vbid, vbucket_state_active, false);
-
// Wait for warmup to complete.
while (engine->getEpStore()->isWarmingUp()) {
usleep(10);
}
+
+ // Once warmup is complete, set VB to active.
+ engine->setVBucketState(vbid, vbucket_state_active, false);
}
void EventuallyPersistentEngineTest::TearDown() {
* On return the state will be changed and the task completed.
*/
void setVBucketStateAndRunPersistTask(uint16_t vbid, vbucket_state_t newState) {
- auto& lpWriterQ = *task_executor->getLpTaskQ()[WRITER_TASK_IDX];
-
- // Change state - this should add 1 VBStatePersistTask to the WRITER queue.
+ // Change state - this should add 1 set_vbucket_state op to the
+ //VBuckets' persistence queue.
EXPECT_EQ(ENGINE_SUCCESS,
store->setVBucketState(vbid, newState, /*transfer*/false));
- runNextTask(lpWriterQ, "Persisting a vbucket state for vbucket: "
- + std::to_string(vbid));
+ // Trigger the flusher to flush state to disk.
+ EXPECT_EQ(0, store->flushVBucket(vbid));
}
/*
auto& lpNonioQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
// [[2]] Perform a vbucket reset. This will perform some work synchronously,
- // but also schedules 3 tasks:
+ // but also created 2 tasks and notifies the flusher:
// 1. vbucket memory deletion (NONIO)
// 2. vbucket disk deletion (WRITER)
- // 3. VBStatePersistTask (WRITER)
+ // 3. FlusherTask notified (WRITER)
// MB-19695: If we try to get the number of persisted deletes between
- // tasks (2) and (3) running then an exception is thrown (and client
+ // steps (2) and (3) running then an exception is thrown (and client
// disconnected).
EXPECT_TRUE(store->resetVBucket(vbid));
EXPECT_NO_THROW(engine->public_doDcpVbTakeoverStats
(nullptr, dummy_cb, key, vbid));
- // Cleanup - run the 3rd task - VBStatePersistTask.
- runNextTask(lpWriterQ, "Persisting a vbucket state for vbucket: 0");
+ // Cleanup - run flusher.
+ EXPECT_EQ(0, store->flushVBucket(vbid));
}
/*