}
bool EventuallyPersistentStore::startFlusher() {
- for (uint16_t i = 0; i < vbMap.shards.size(); ++i) {
- Flusher *flusher = vbMap.shards[i]->getFlusher();
- flusher->start();
+ for (auto* shard : vbMap.shards) {
+ shard->getFlusher()->start();
}
return true;
}
vb->rejectQueue.pop();
}
- const std::string cursor(CheckpointManager::pCursorName);
+ // Append any 'backfill' items (mutations added by a TAP stream).
vb->getBackfillItems(items);
+ // Append all items outstanding for the persistence cursor.
snapshot_range_t range;
- range = vb->checkpointManager.getAllItemsForCursor(cursor, items);
+ range = vb->checkpointManager.getAllItemsForCursor(
+ CheckpointManager::pCursorName, items);
if (!items.empty()) {
while (!rwUnderlying->begin()) {
uint64_t maxCas = 0;
uint64_t maxDeletedRevSeqno = 0;
std::list<PersistenceCallback*>& pcbs = rwUnderlying->getPersistenceCbList();
- std::vector<queued_item>::iterator it = items.begin();
- for(; it != items.end(); ++it) {
+ for (const auto& item : items) {
- if (!(*it)->shouldPersist()) {
+ if (!item->shouldPersist()) {
continue;
- } else if (!prev || prev->getKey() != (*it)->getKey()) {
- prev = (*it).get();
+ } else if (!prev || prev->getKey() != item->getKey()) {
+ prev = item.get();
++items_flushed;
- PersistenceCallback *cb = flushOneDelOrSet(*it, vb);
+ PersistenceCallback *cb = flushOneDelOrSet(item, vb);
if (cb) {
pcbs.push_back(cb);
}
- maxSeqno = std::max(maxSeqno, (uint64_t)(*it)->getBySeqno());
- maxCas = std::max(maxCas, (uint64_t)(*it)->getCas());
- if ((*it)->isDeleted()) {
+ maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
+ maxCas = std::max(maxCas, item->getCas());
+ if (item->isDeleted()) {
maxDeletedRevSeqno = std::max(maxDeletedRevSeqno,
- (uint64_t)(*it)->getRevSeqno());
+ item->getRevSeqno());
}
++stats.flusher_todo;
} else {
+ // Item is the same key as the previous one - don't need
+ // to flush to disk.
stats.decrDiskQueueSize(1);
- vb->doStatsForFlushing(*(*it), (*it)->size());
+ vb->doStatsForFlushing(*item, item->size());
}
}