#include "atomic.h"
#include "bgfetcher.h"
+#include "conflict_resolution.h"
#include "ep_engine.h"
+#include "ep_types.h"
+#include "failover-table.h"
+#include "flusher.h"
+#include "pre_link_document_context.h"
+#include "vbucketdeletiontask.h"
#define STATWRITER_NAMESPACE vbucket
#include "statwriter.h"
#undef STATWRITER_NAMESPACE
+#include "stored_value_factories.h"
#include "vbucket.h"
+#include <xattr/blob.h>
+#include <xattr/utils.h>
+
+/* Macros */
+const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
+const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
+
+/* Statics definitions */
+std::atomic<size_t> VBucket::chkFlushTimeout(MIN_CHK_FLUSH_TIMEOUT);
+
VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
std::vector<uint16_t> tmp(acceptable.size() + other.size());
std::vector<uint16_t>::iterator end;
return out;
}
-std::atomic<size_t> VBucket::chkFlushTimeout(MIN_CHK_FLUSH_TIMEOUT);
-
const vbucket_state_t VBucket::ACTIVE =
static_cast<vbucket_state_t>(htonl(vbucket_state_active));
const vbucket_state_t VBucket::REPLICA =
const vbucket_state_t VBucket::DEAD =
static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
+VBucket::VBucket(id_type i,
+ vbucket_state_t newState,
+ EPStats& st,
+ CheckpointConfig& chkConfig,
+ int64_t lastSeqno,
+ uint64_t lastSnapStart,
+ uint64_t lastSnapEnd,
+ std::unique_ptr<FailoverTable> table,
+ std::shared_ptr<Callback<id_type> > flusherCb,
+ std::unique_ptr<AbstractStoredValueFactory> valFact,
+ NewSeqnoCallback newSeqnoCb,
+ Configuration& config,
+ item_eviction_policy_t evictionPolicy,
+ vbucket_state_t initState,
+ uint64_t purgeSeqno,
+ uint64_t maxCas,
+ const std::string& collectionsManifest)
+ : ht(st, std::move(valFact)),
+ checkpointManager(st,
+ i,
+ chkConfig,
+ lastSeqno,
+ lastSnapStart,
+ lastSnapEnd,
+ flusherCb),
+ failovers(std::move(table)),
+ opsCreate(0),
+ opsUpdate(0),
+ opsDelete(0),
+ opsReject(0),
+ dirtyQueueSize(0),
+ dirtyQueueMem(0),
+ dirtyQueueFill(0),
+ dirtyQueueDrain(0),
+ dirtyQueueAge(0),
+ dirtyQueuePendingWrites(0),
+ metaDataDisk(0),
+ numExpiredItems(0),
+ eviction(evictionPolicy),
+ stats(st),
+ persistenceSeqno(0),
+ numHpVBReqs(0),
+ id(i),
+ state(newState),
+ initialState(initState),
+ purge_seqno(purgeSeqno),
+ takeover_backed_up(false),
+ persisted_snapshot_start(lastSnapStart),
+ persisted_snapshot_end(lastSnapEnd),
+ rollbackItemCount(0),
+ hlc(maxCas,
+ std::chrono::microseconds(config.getHlcDriftAheadThresholdUs()),
+ std::chrono::microseconds(config.getHlcDriftBehindThresholdUs())),
+ statPrefix("vb_" + std::to_string(i)),
+ persistenceCheckpointId(0),
+ bucketCreation(false),
+ deferredDeletion(false),
+ deferredDeletionCookie(nullptr),
+ newSeqnoCb(std::move(newSeqnoCb)),
+ manifest(collectionsManifest) {
+ if (config.getConflictResolutionType().compare("lww") == 0) {
+ conflictResolver.reset(new LastWriteWinsResolution());
+ } else {
+ conflictResolver.reset(new RevisionSeqnoResolution());
+ }
+
+ backfill.isBackfillPhase = false;
+ pendingOpsStart = 0;
+ stats.memOverhead->fetch_add(sizeof(VBucket)
+ + ht.memorySize() + sizeof(CheckpointManager));
+ LOG(EXTENSION_LOG_NOTICE,
+ "VBucket: created vbucket:%" PRIu16 " with state:%s "
+ "initialState:%s "
+ "lastSeqno:%" PRIu64 " "
+ "lastSnapshot:{%" PRIu64 ",%" PRIu64 "} "
+ "persisted_snapshot:{%" PRIu64 ",%" PRIu64 "} "
+ "max_cas:%" PRIu64,
+ id, VBucket::toString(state), VBucket::toString(initialState),
+ lastSeqno, lastSnapStart, lastSnapEnd,
+ persisted_snapshot_start, persisted_snapshot_end,
+ getMaxCas());
+}
+
VBucket::~VBucket() {
- if (!pendingOps.empty() || !pendingBGFetches.empty()) {
+ if (!pendingOps.empty()) {
LOG(EXTENSION_LOG_WARNING,
- "Have %ld pending ops and %ld pending reads "
- "while destroying vbucket\n",
- pendingOps.size(), pendingBGFetches.size());
+ "~Vbucket(): vbucket:%" PRIu16 " has %ld pending ops",
+ id,
+ pendingOps.size());
}
- stats.decrDiskQueueSize(dirtyQueueSize.load());
-
- delete failovers;
+ stats.diskQueueSize.fetch_sub(dirtyQueueSize.load());
+ stats.vbBackfillQueueSize.fetch_sub(getBackfillSize());
// Clear out the bloomfilter(s)
clearFilter();
void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
ENGINE_ERROR_CODE code) {
- LockHolder lh(pendingOpLock);
+ std::unique_lock<std::mutex> lh(pendingOpLock);
if (pendingOpsStart > 0) {
hrtime_t now = gethrtime();
dirtyQueuePendingWrites.fetch_add(itemBytes);
}
-void VBucket::doStatsForFlushing(Item& qi, size_t itemBytes)
-{
- decrDirtyQueueSize(1);
+void VBucket::doStatsForFlushing(const Item& qi, size_t itemBytes) {
+ --dirtyQueueSize;
decrDirtyQueueMem(sizeof(Item));
++dirtyQueueDrain;
decrDirtyQueueAge(qi.getQueuedTime());
decrDirtyQueuePendingWrites(itemBytes);
}
-void VBucket::incrMetaDataDisk(Item& qi)
-{
- metaDataDisk.fetch_add(qi.getNKey() + sizeof(ItemMetaData));
+void VBucket::incrMetaDataDisk(const Item& qi) {
+ metaDataDisk.fetch_add(qi.getKey().size() + sizeof(ItemMetaData));
}
-void VBucket::decrMetaDataDisk(Item& qi)
-{
+void VBucket::decrMetaDataDisk(const Item& qi) {
// assume couchstore remove approx this much data from disk
- metaDataDisk.fetch_sub((qi.getNKey() + sizeof(ItemMetaData)));
+ metaDataDisk.fetch_sub((qi.getKey().size() + sizeof(ItemMetaData)));
}
void VBucket::resetStats() {
opsDelete.store(0);
opsReject.store(0);
- stats.decrDiskQueueSize(dirtyQueueSize.load());
- dirtyQueueSize.store(0);
+ stats.diskQueueSize.fetch_sub(dirtyQueueSize.exchange(0));
dirtyQueueMem.store(0);
dirtyQueueFill.store(0);
dirtyQueueAge.store(0);
dirtyQueuePendingWrites.store(0);
dirtyQueueDrain.store(0);
- fileSpaceUsed = 0;
- fileSize = 0;
hlc.resetStats();
}
}
}
-size_t VBucket::queueBGFetchItem(const const_sized_buffer key,
- VBucketBGFetchItem *fetch,
- BgFetcher *bgFetcher) {
- LockHolder lh(pendingBGFetchesLock);
- vb_bgfetch_item_ctx_t& bgfetch_itm_ctx =
- pendingBGFetches[std::string(key.data(), key.size())];
-
- if (bgfetch_itm_ctx.bgfetched_list.empty()) {
- bgfetch_itm_ctx.isMetaOnly = true;
- }
-
- bgfetch_itm_ctx.bgfetched_list.push_back(fetch);
-
- if (!fetch->metaDataOnly) {
- bgfetch_itm_ctx.isMetaOnly = false;
- }
- bgFetcher->addPendingVB(id);
- return pendingBGFetches.size();
-}
-
-bool VBucket::getBGFetchItems(vb_bgfetch_queue_t &fetches) {
- LockHolder lh(pendingBGFetchesLock);
- fetches.insert(pendingBGFetches.begin(), pendingBGFetches.end());
- pendingBGFetches.clear();
- return fetches.size() > 0;
-}
-
-void VBucket::addHighPriorityVBEntry(uint64_t id, const void *cookie,
- bool isBySeqno) {
- LockHolder lh(hpChksMutex);
- if (shard) {
- ++shard->highPriorityCount;
- }
- hpChks.push_back(HighPriorityVBEntry(cookie, id, isBySeqno));
- numHpChks.store(hpChks.size());
-}
-
-void VBucket::notifyOnPersistence(EventuallyPersistentEngine &e,
- uint64_t idNum,
- bool isBySeqno) {
- LockHolder lh(hpChksMutex);
- std::map<const void*, ENGINE_ERROR_CODE> toNotify;
- std::list<HighPriorityVBEntry>::iterator entry = hpChks.begin();
-
- std::string logStr(isBySeqno
- ? "seqno persistence"
- : "checkpoint persistence");
-
- while (entry != hpChks.end()) {
- if (isBySeqno != entry->isBySeqno_) {
- ++entry;
- continue;
+void VBucket::handlePreExpiry(StoredValue& v) {
+ value_t value = v.getValue();
+ if (value) {
+ std::unique_ptr<Item> itm(v.toItem(false, id));
+ item_info itm_info;
+ EventuallyPersistentEngine* engine = ObjectRegistry::getCurrentEngine();
+ itm_info = itm->toItemInfo(failovers->getLatestUUID());
+ value_t new_val(Blob::Copy(*value));
+ itm->setValue(new_val);
+
+ SERVER_HANDLE_V1* sapi = engine->getServerApi();
+ /* TODO: In order to minimize allocations, the callback needs to
+ * allocate an item whose value size will be exactly the size of the
+ * value after pre-expiry is performed.
+ */
+ if (sapi->document->pre_expiry(itm_info)) {
+ char* extMeta = const_cast<char *>(v.getValue()->getExtMeta());
+ Item new_item(v.getKey(), v.getFlags(), v.getExptime(),
+ itm_info.value[0].iov_base, itm_info.value[0].iov_len,
+ reinterpret_cast<uint8_t*>(extMeta),
+ v.getValue()->getExtLen(), v.getCas(),
+ v.getBySeqno(), id, v.getRevSeqno(),
+ v.getNRUValue());
+
+ new_item.setDeleted();
+ v.setValue(new_item, ht);
}
-
- std::string logStr(isBySeqno ?
- "seqno persistence" :
- "checkpoint persistence");
-
- hrtime_t wall_time(gethrtime() - entry->start);
- size_t spent = wall_time / 1000000000;
- if (entry->id <= idNum) {
- toNotify[entry->cookie] = ENGINE_SUCCESS;
- stats.chkPersistenceHisto.add(wall_time / 1000);
- adjustCheckpointFlushTimeout(wall_time / 1000000000);
- LOG(EXTENSION_LOG_NOTICE, "Notified the completion of %s "
- "for vbucket %" PRIu16 ", Check for: %" PRIu64 ", "
- "Persisted upto: %" PRIu64 ", cookie %p",
- logStr.c_str(), id, entry->id, idNum, entry->cookie);
- entry = hpChks.erase(entry);
- if (shard) {
- --shard->highPriorityCount;
- }
- } else if (spent > getCheckpointFlushTimeout()) {
- adjustCheckpointFlushTimeout(spent);
- e.storeEngineSpecific(entry->cookie, NULL);
- toNotify[entry->cookie] = ENGINE_TMPFAIL;
- LOG(EXTENSION_LOG_WARNING, "Notified the timeout on %s "
- "for vbucket %" PRIu16 ", Check for: %" PRIu64 ", "
- "Persisted upto: %" PRIu64 ", cookie %p",
- logStr.c_str(), id, entry->id, idNum, entry->cookie);
- entry = hpChks.erase(entry);
- if (shard) {
- --shard->highPriorityCount;
- }
- } else {
- ++entry;
- }
- }
- numHpChks.store(hpChks.size());
- lh.unlock();
-
- std::map<const void*, ENGINE_ERROR_CODE>::iterator itr = toNotify.begin();
- for (; itr != toNotify.end(); ++itr) {
- e.notifyIOComplete(itr->first, itr->second);
}
-
}
-void VBucket::notifyAllPendingConnsFailed(EventuallyPersistentEngine &e) {
- LockHolder lh(hpChksMutex);
- std::map<const void*, ENGINE_ERROR_CODE> toNotify;
- std::list<HighPriorityVBEntry>::iterator entry = hpChks.begin();
- while (entry != hpChks.end()) {
- toNotify[entry->cookie] = ENGINE_TMPFAIL;
- e.storeEngineSpecific(entry->cookie, NULL);
- entry = hpChks.erase(entry);
- if (shard) {
- --shard->highPriorityCount;
- }
- }
- lh.unlock();
-
- // Add all the pendingBGFetches to the toNotify map
- {
- LockHolder lh(pendingBGFetchesLock);
- size_t num_of_deleted_pending_fetches = 0;
- for (auto& bgf : pendingBGFetches) {
- vb_bgfetch_item_ctx_t& bg_itm_ctx = bgf.second;
- for (auto& bgitem : bg_itm_ctx.bgfetched_list) {
- toNotify[bgitem->cookie] = ENGINE_NOT_MY_VBUCKET;
- e.storeEngineSpecific(bgitem->cookie, nullptr);
- delete bgitem;
- ++num_of_deleted_pending_fetches;
- }
- }
- stats.numRemainingBgJobs.fetch_sub(num_of_deleted_pending_fetches);
- pendingBGFetches.clear();
- }
-
- std::map<const void*, ENGINE_ERROR_CODE>::iterator itr = toNotify.begin();
- for (; itr != toNotify.end(); ++itr) {
- e.notifyIOComplete(itr->first, itr->second);
- }
-
- fireAllOps(e);
-}
-
-void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
- size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
-
- if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
- chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
- } else if (wall_time <= middle) {
- chkFlushTimeout = middle;
- } else {
- chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
- }
-}
-
-size_t VBucket::getHighPriorityChkSize() {
- return numHpChks.load();
-}
-
-size_t VBucket::getCheckpointFlushTimeout() {
- return chkFlushTimeout;
-}
-
-size_t VBucket::getNumItems(item_eviction_policy_t policy) {
- if (policy == VALUE_ONLY) {
- return ht.getNumInMemoryItems();
- } else {
- return ht.getNumItems();
- }
-}
-
-size_t VBucket::getNumNonResidentItems(item_eviction_policy_t policy) {
- if (policy == VALUE_ONLY) {
+size_t VBucket::getNumNonResidentItems() const {
+ if (eviction == VALUE_ONLY) {
return ht.getNumInMemoryNonResItems();
} else {
size_t num_items = ht.getNumItems();
persistenceCheckpointId.store(checkpointId);
}
-bool VBucket::isResidentRatioUnderThreshold(float threshold,
- item_eviction_policy_t policy) {
- if (policy != FULL_EVICTION) {
+void VBucket::markDirty(const DocKey& key) {
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = ht.unlocked_find(
+ key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::Yes);
+ if (v) {
+ v->markDirty();
+ } else {
+ LOG(EXTENSION_LOG_WARNING, "markDirty: Error marking dirty, a key "
+ "is missing from vb:%" PRIu16, id);
+ }
+}
+
+bool VBucket::isResidentRatioUnderThreshold(float threshold) {
+ if (eviction != FULL_EVICTION) {
throw std::invalid_argument("VBucket::isResidentRatioUnderThreshold: "
- "policy (which is " + std::to_string(policy) +
+ "policy (which is " + std::to_string(eviction) +
") must be FULL_EVICTION");
}
- size_t num_items = getNumItems(policy);
- size_t num_non_resident_items = getNumNonResidentItems(policy);
+ size_t num_items = getNumItems();
+ size_t num_non_resident_items = getNumNonResidentItems();
if (threshold >= ((float)(num_items - num_non_resident_items) /
num_items)) {
return true;
// - Rebalance
LockHolder lh(bfMutex);
if (bFilter == nullptr && tempFilter == nullptr) {
- bFilter = new BloomFilter(key_count, probability, BFILTER_ENABLED);
+ bFilter = std::make_unique<BloomFilter>(key_count, probability,
+ BFILTER_ENABLED);
} else {
LOG(EXTENSION_LOG_WARNING, "(vb %" PRIu16 ") Bloom filter / Temp filter"
" already exist!", id);
// if the main filter is found to exist, set its state to
// COMPACTING as well.
LockHolder lh(bfMutex);
- if (tempFilter) {
- delete tempFilter;
- }
- tempFilter = new BloomFilter(key_count, probability, BFILTER_COMPACTING);
+ tempFilter = std::make_unique<BloomFilter>(key_count, probability,
+ BFILTER_COMPACTING);
if (bFilter) {
bFilter->setStatus(BFILTER_COMPACTING);
}
}
-void VBucket::addToFilter(const std::string &key) {
+void VBucket::addToFilter(const DocKey& key) {
LockHolder lh(bfMutex);
if (bFilter) {
- bFilter->addKey(key.c_str(), key.length());
+ bFilter->addKey(key);
}
// If the temp bloom filter is not found to be NULL,
// well, as once compaction completes the temp filter
// will replace the main bloom filter.
if (tempFilter) {
- tempFilter->addKey(key.c_str(), key.length());
+ tempFilter->addKey(key);
}
}
-bool VBucket::maybeKeyExistsInFilter(const const_sized_buffer key) {
+bool VBucket::maybeKeyExistsInFilter(const DocKey& key) {
LockHolder lh(bfMutex);
if (bFilter) {
- return bFilter->maybeKeyExists(key.data(), key.size());
+ return bFilter->maybeKeyExists(key);
} else {
// If filter doesn't exist, allow the BgFetch to go through.
return true;
}
}
-void VBucket::addToTempFilter(const std::string &key) {
+void VBucket::addToTempFilter(const DocKey& key) {
// Keys will be added to only the temp filter during
// compaction.
LockHolder lh(bfMutex);
if (tempFilter) {
- tempFilter->addKey(key.c_str(), key.length());
+ tempFilter->addKey(key);
}
}
// compaction.
LockHolder lh(bfMutex);
- if (bFilter && tempFilter) {
- delete bFilter;
- bFilter = NULL;
- }
- if (tempFilter &&
- (tempFilter->getStatus() == BFILTER_COMPACTING ||
- tempFilter->getStatus() == BFILTER_ENABLED)) {
- bFilter = tempFilter;
- tempFilter = NULL;
- bFilter->setStatus(BFILTER_ENABLED);
- } else if (tempFilter) {
- delete tempFilter;
- tempFilter = NULL;
+ if (tempFilter) {
+ bFilter.reset();
+
+ if (tempFilter->getStatus() == BFILTER_COMPACTING ||
+ tempFilter->getStatus() == BFILTER_ENABLED) {
+ bFilter = std::move(tempFilter);
+ bFilter->setStatus(BFILTER_ENABLED);
+ }
+ tempFilter.reset();
}
}
void VBucket::clearFilter() {
LockHolder lh(bfMutex);
- if (bFilter) {
- delete bFilter;
- bFilter = NULL;
- }
- if (tempFilter) {
- delete tempFilter;
- tempFilter = NULL;
- }
+ bFilter.reset();
+ tempFilter.reset();
}
void VBucket::setFilterStatus(bfilter_status_t to) {
}
}
-void VBucket::addStats(bool details, ADD_STAT add_stat, const void *c,
- item_eviction_policy_t policy) {
+VBNotifyCtx VBucket::queueDirty(
+ StoredValue& v,
+ const GenerateBySeqno generateBySeqno,
+ const GenerateCas generateCas,
+ const bool isBackfillItem,
+ PreLinkDocumentContext* preLinkDocumentContext) {
+ VBNotifyCtx notifyCtx;
+
+ queued_item qi(v.toItem(false, getId()));
+
+ if (isBackfillItem) {
+ queueBackfillItem(qi, generateBySeqno);
+ notifyCtx.notifyFlusher = true;
+ /* During backfill on a TAP receiver we need to update the snapshot
+ range in the checkpoint. Has to be done here because in case of TAP
+ backfill, above, we use vb.queueBackfillItem() instead of
+ vb.checkpointManager.queueDirty() */
+ if (generateBySeqno == GenerateBySeqno::Yes) {
+ checkpointManager.resetSnapshotRange();
+ }
+ } else {
+ notifyCtx.notifyFlusher =
+ checkpointManager.queueDirty(*this,
+ qi,
+ generateBySeqno,
+ generateCas,
+ preLinkDocumentContext);
+ notifyCtx.notifyReplication = true;
+ if (GenerateCas::Yes == generateCas) {
+ v.setCas(qi->getCas());
+ }
+ }
+
+ v.setBySeqno(qi->getBySeqno());
+ notifyCtx.bySeqno = qi->getBySeqno();
+
+ return notifyCtx;
+}
+
+StoredValue* VBucket::fetchValidValue(HashTable::HashBucketLock& hbl,
+ const DocKey& key,
+ WantsDeleted wantsDeleted,
+ TrackReference trackReference,
+ QueueExpired queueExpired) {
+ if (!hbl.getHTLock()) {
+ throw std::logic_error(
+ "Hash bucket lock not held in "
+ "VBucket::fetchValidValue() for hash bucket: " +
+ std::to_string(hbl.getBucketNum()) + "for key: " +
+ std::string(reinterpret_cast<const char*>(key.data()),
+ key.size()));
+ }
+ StoredValue* v = ht.unlocked_find(
+ key, hbl.getBucketNum(), wantsDeleted, trackReference);
+ if (v && !v->isDeleted() && !v->isTempItem()) {
+ // In the deleted case, we ignore expiration time.
+ if (v->isExpired(ep_real_time())) {
+ if (getState() != vbucket_state_active) {
+ return wantsDeleted == WantsDeleted::Yes ? v : NULL;
+ }
+
+ // queueDirty only allowed on active VB
+ if (queueExpired == QueueExpired::Yes &&
+ getState() == vbucket_state_active) {
+ incExpirationStat(ExpireBy::Access);
+ handlePreExpiry(*v);
+ VBNotifyCtx notifyCtx;
+ std::tie(std::ignore, v, notifyCtx) =
+ processExpiredItem(hbl, *v);
+ notifyNewSeqno(notifyCtx);
+ }
+ return wantsDeleted == WantsDeleted::Yes ? v : NULL;
+ }
+ }
+ return v;
+}
+
+void VBucket::incExpirationStat(const ExpireBy source) {
+ switch (source) {
+ case ExpireBy::Pager:
+ ++stats.expired_pager;
+ break;
+ case ExpireBy::Compactor:
+ ++stats.expired_compactor;
+ break;
+ case ExpireBy::Access:
+ ++stats.expired_access;
+ break;
+ }
+ ++numExpiredItems;
+}
+
+MutationStatus VBucket::setFromInternal(Item& itm) {
+ return ht.set(itm);
+}
+
+ENGINE_ERROR_CODE VBucket::set(Item& itm,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ const int bgFetchDelay) {
+ bool cas_op = (itm.getCas() != 0);
+ auto hbl = ht.getLockedBucket(itm.getKey());
+ StoredValue* v = ht.unlocked_find(itm.getKey(),
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+ if (v && v->isLocked(ep_current_time()) &&
+ (getState() == vbucket_state_replica ||
+ getState() == vbucket_state_pending)) {
+ v->unlock();
+ }
+
+ bool maybeKeyExists = true;
+ // If we didn't find a valid item, check Bloomfilter's prediction if in
+ // full eviction policy and for a CAS operation.
+ if ((v == nullptr || v->isTempInitialItem()) &&
+ (eviction == FULL_EVICTION) && (itm.getCas() != 0)) {
+ // Check Bloomfilter's prediction
+ if (!maybeKeyExistsInFilter(itm.getKey())) {
+ maybeKeyExists = false;
+ }
+ }
+
+ PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
+ VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
+ GenerateCas::Yes,
+ TrackCasDrift::No,
+ /*isBackfillItem*/ false,
+ &preLinkDocumentContext);
+
+ MutationStatus status;
+ VBNotifyCtx notifyCtx;
+ std::tie(status, notifyCtx) = processSet(hbl,
+ v,
+ itm,
+ itm.getCas(),
+ /*allowExisting*/ true,
+ /*hashMetaData*/ false,
+ &queueItmCtx,
+ maybeKeyExists);
+
+ ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
+ switch (status) {
+ case MutationStatus::NoMem:
+ ret = ENGINE_ENOMEM;
+ break;
+ case MutationStatus::InvalidCas:
+ ret = ENGINE_KEY_EEXISTS;
+ break;
+ case MutationStatus::IsLocked:
+ ret = ENGINE_LOCKED;
+ break;
+ case MutationStatus::NotFound:
+ if (cas_op) {
+ ret = ENGINE_KEY_ENOENT;
+ break;
+ }
+ // FALLTHROUGH
+ case MutationStatus::WasDirty:
+ // Even if the item was dirty, push it into the vbucket's open
+ // checkpoint.
+ case MutationStatus::WasClean:
+ notifyNewSeqno(notifyCtx);
+
+ itm.setBySeqno(v->getBySeqno());
+ itm.setCas(v->getCas());
+ break;
+ case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
+ // +
+ // full eviction.
+ if (v) {
+ // temp item is already created. Simply schedule a bg fetch job
+ hbl.getHTLock().unlock();
+ bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ }
+ ret = addTempItemAndBGFetch(
+ hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
+ break;
+ }
+ }
+
+ return ret;
+}
+
+ENGINE_ERROR_CODE VBucket::replace(Item& itm,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ const int bgFetchDelay) {
+ auto hbl = ht.getLockedBucket(itm.getKey());
+ StoredValue* v = ht.unlocked_find(itm.getKey(),
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+ if (v) {
+ if (v->isDeleted() || v->isTempDeletedItem() ||
+ v->isTempNonExistentItem()) {
+ return ENGINE_KEY_ENOENT;
+ }
+
+ MutationStatus mtype;
+ VBNotifyCtx notifyCtx;
+ if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
+ mtype = MutationStatus::NeedBgFetch;
+ } else {
+ PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
+ VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
+ GenerateCas::Yes,
+ TrackCasDrift::No,
+ /*isBackfillItem*/ false,
+ &preLinkDocumentContext);
+ std::tie(mtype, notifyCtx) = processSet(hbl,
+ v,
+ itm,
+ 0,
+ /*allowExisting*/ true,
+ /*hasMetaData*/ false,
+ &queueItmCtx);
+ }
+
+ ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
+ switch (mtype) {
+ case MutationStatus::NoMem:
+ ret = ENGINE_ENOMEM;
+ break;
+ case MutationStatus::IsLocked:
+ ret = ENGINE_LOCKED;
+ break;
+ case MutationStatus::InvalidCas:
+ case MutationStatus::NotFound:
+ ret = ENGINE_NOT_STORED;
+ break;
+ // FALLTHROUGH
+ case MutationStatus::WasDirty:
+ // Even if the item was dirty, push it into the vbucket's open
+ // checkpoint.
+ case MutationStatus::WasClean:
+ notifyNewSeqno(notifyCtx);
+
+ itm.setBySeqno(v->getBySeqno());
+ itm.setCas(v->getCas());
+ break;
+ case MutationStatus::NeedBgFetch: {
+ // temp item is already created. Simply schedule a bg fetch job
+ hbl.getHTLock().unlock();
+ bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
+ ret = ENGINE_EWOULDBLOCK;
+ break;
+ }
+ }
+
+ return ret;
+ } else {
+ if (eviction == VALUE_ONLY) {
+ return ENGINE_KEY_ENOENT;
+ }
+
+ if (maybeKeyExistsInFilter(itm.getKey())) {
+ return addTempItemAndBGFetch(
+ hbl, itm.getKey(), cookie, engine, bgFetchDelay, false);
+ } else {
+ // As bloomfilter predicted that item surely doesn't exist
+ // on disk, return ENOENT for replace().
+ return ENGINE_KEY_ENOENT;
+ }
+ }
+}
+
+ENGINE_ERROR_CODE VBucket::addBackfillItem(Item& itm,
+ const GenerateBySeqno genBySeqno) {
+ auto hbl = ht.getLockedBucket(itm.getKey());
+ StoredValue* v = ht.unlocked_find(itm.getKey(),
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+
+ // Note that this function is only called on replica or pending vbuckets.
+ if (v && v->isLocked(ep_current_time())) {
+ v->unlock();
+ }
+
+ VBQueueItemCtx queueItmCtx(genBySeqno,
+ GenerateCas::No,
+ TrackCasDrift::No,
+ /*isBackfillItem*/ true,
+ nullptr /* No pre link should happen */);
+ MutationStatus status;
+ VBNotifyCtx notifyCtx;
+ std::tie(status, notifyCtx) = processSet(hbl,
+ v,
+ itm,
+ 0,
+ /*allowExisting*/ true,
+ /*hasMetaData*/ true,
+ &queueItmCtx);
+
+ ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
+ switch (status) {
+ case MutationStatus::NoMem:
+ ret = ENGINE_ENOMEM;
+ break;
+ case MutationStatus::InvalidCas:
+ case MutationStatus::IsLocked:
+ ret = ENGINE_KEY_EEXISTS;
+ break;
+ case MutationStatus::WasDirty:
+ // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
+ // set correctly, and also the sequence numbers are ordered correctly.
+ // (MB-14003)
+ case MutationStatus::NotFound:
+ // FALLTHROUGH
+ case MutationStatus::WasClean: {
+ setMaxCas(v->getCas());
+ // we unlock ht lock here because we want to avoid potential lock
+ // inversions arising from notifyNewSeqno() call
+ hbl.getHTLock().unlock();
+ notifyNewSeqno(notifyCtx);
+ } break;
+ case MutationStatus::NeedBgFetch:
+ throw std::logic_error(
+ "VBucket::addBackfillItem: "
+ "SET on a non-active vbucket should not require a "
+ "bg_metadata_fetch.");
+ }
+
+ return ret;
+}
+
+ENGINE_ERROR_CODE VBucket::setWithMeta(Item& itm,
+ const uint64_t cas,
+ uint64_t* seqno,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ const int bgFetchDelay,
+ const bool force,
+ const bool allowExisting,
+ const GenerateBySeqno genBySeqno,
+ const GenerateCas genCas,
+ const bool isReplication) {
+ auto hbl = ht.getLockedBucket(itm.getKey());
+ StoredValue* v = ht.unlocked_find(itm.getKey(),
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+
+ bool maybeKeyExists = true;
+ if (!force) {
+ if (v) {
+ if (v->isTempInitialItem()) {
+ bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ }
+
+ if (!(conflictResolver->resolve(*v,
+ itm.getMetaData(),
+ itm.getDataType(),
+ itm.isDeleted()))) {
+ ++stats.numOpsSetMetaResolutionFailed;
+ return ENGINE_KEY_EEXISTS;
+ }
+ } else {
+ if (maybeKeyExistsInFilter(itm.getKey())) {
+ return addTempItemAndBGFetch(hbl,
+ itm.getKey(),
+ cookie,
+ engine,
+ bgFetchDelay,
+ true,
+ isReplication);
+ } else {
+ maybeKeyExists = false;
+ }
+ }
+ } else {
+ if (eviction == FULL_EVICTION) {
+ // Check Bloomfilter's prediction
+ if (!maybeKeyExistsInFilter(itm.getKey())) {
+ maybeKeyExists = false;
+ }
+ }
+ }
+
+ if (v && v->isLocked(ep_current_time()) &&
+ (getState() == vbucket_state_replica ||
+ getState() == vbucket_state_pending)) {
+ v->unlock();
+ }
+
+ VBQueueItemCtx queueItmCtx(genBySeqno,
+ genCas,
+ TrackCasDrift::Yes,
+ /*isBackfillItem*/ false,
+ nullptr /* No pre link step needed */);
+ MutationStatus status;
+ VBNotifyCtx notifyCtx;
+ std::tie(status, notifyCtx) = processSet(hbl,
+ v,
+ itm,
+ cas,
+ allowExisting,
+ true,
+ &queueItmCtx,
+ maybeKeyExists,
+ isReplication);
+
+ ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
+ switch (status) {
+ case MutationStatus::NoMem:
+ ret = ENGINE_ENOMEM;
+ break;
+ case MutationStatus::InvalidCas:
+ ret = ENGINE_KEY_EEXISTS;
+ break;
+ case MutationStatus::IsLocked:
+ ret = ENGINE_LOCKED;
+ break;
+ case MutationStatus::WasDirty:
+ case MutationStatus::WasClean: {
+ if (seqno) {
+ *seqno = static_cast<uint64_t>(v->getBySeqno());
+ }
+ // we unlock ht lock here because we want to avoid potential lock
+ // inversions arising from notifyNewSeqno() call
+ hbl.getHTLock().unlock();
+ notifyNewSeqno(notifyCtx);
+ } break;
+ case MutationStatus::NotFound:
+ ret = ENGINE_KEY_ENOENT;
+ break;
+ case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
+ // + full eviction.
+ if (v) { // temp item is already created. Simply schedule a
+ hbl.getHTLock().unlock(); // bg fetch job.
+ bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ }
+ ret = addTempItemAndBGFetch(hbl,
+ itm.getKey(),
+ cookie,
+ engine,
+ bgFetchDelay,
+ true,
+ isReplication);
+ }
+ }
+
+ return ret;
+}
+
+ENGINE_ERROR_CODE VBucket::deleteItem(const DocKey& key,
+ uint64_t& cas,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ const int bgFetchDelay,
+ ItemMetaData* itemMeta,
+ mutation_descr_t* mutInfo) {
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = ht.unlocked_find(
+ key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
+
+ if (!v || v->isDeleted() || v->isTempItem()) {
+ if (eviction == VALUE_ONLY) {
+ return ENGINE_KEY_ENOENT;
+ } else { // Full eviction.
+ if (!v) { // Item might be evicted from cache.
+ if (maybeKeyExistsInFilter(key)) {
+ return addTempItemAndBGFetch(
+ hbl, key, cookie, engine, bgFetchDelay, true);
+ } else {
+ // As bloomfilter predicted that item surely doesn't
+ // exist on disk, return ENOENT for deleteItem().
+ return ENGINE_KEY_ENOENT;
+ }
+ } else if (v->isTempInitialItem()) {
+ hbl.getHTLock().unlock();
+ bgFetch(key, cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ } else { // Non-existent or deleted key.
+ if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
+ // Delete a temp non-existent item to ensure that
+ // if a delete were issued over an item that doesn't
+ // exist, then we don't preserve a temp item.
+ deleteStoredValue(hbl, *v);
+ }
+ return ENGINE_KEY_ENOENT;
+ }
+ }
+ }
+
+ if (v->isLocked(ep_current_time()) &&
+ (getState() == vbucket_state_replica ||
+ getState() == vbucket_state_pending)) {
+ v->unlock();
+ }
+
+ if (itemMeta != nullptr) {
+ itemMeta->cas = v->getCas();
+ }
+
+ MutationStatus delrv;
+ VBNotifyCtx notifyCtx;
+ if (v->isExpired(ep_real_time())) {
+ std::tie(delrv, v, notifyCtx) = processExpiredItem(hbl, *v);
+ } else {
+ ItemMetaData metadata;
+ metadata.revSeqno = v->getRevSeqno() + 1;
+ std::tie(delrv, v, notifyCtx) =
+ processSoftDelete(hbl,
+ *v,
+ cas,
+ metadata,
+ VBQueueItemCtx(GenerateBySeqno::Yes,
+ GenerateCas::Yes,
+ TrackCasDrift::No,
+ /*isBackfillItem*/ false,
+ nullptr /* no pre link */),
+ /*use_meta*/ false,
+ /*bySeqno*/ v->getBySeqno());
+ }
+
+ uint64_t seqno = 0;
+ ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
+ switch (delrv) {
+ case MutationStatus::NoMem:
+ ret = ENGINE_ENOMEM;
+ break;
+ case MutationStatus::InvalidCas:
+ ret = ENGINE_KEY_EEXISTS;
+ break;
+ case MutationStatus::IsLocked:
+ ret = ENGINE_LOCKED_TMPFAIL;
+ break;
+ case MutationStatus::NotFound:
+ ret = ENGINE_KEY_ENOENT;
+ /* Fallthrough:
+ * A NotFound return value at this point indicates that the
+ * item has expired. But, a deletion still needs to be queued
+ * for the item in order to persist it.
+ */
+ case MutationStatus::WasClean:
+ case MutationStatus::WasDirty:
+ if (itemMeta != nullptr) {
+ itemMeta->revSeqno = v->getRevSeqno();
+ itemMeta->flags = v->getFlags();
+ itemMeta->exptime = v->getExptime();
+ }
+
+ notifyNewSeqno(notifyCtx);
+ seqno = static_cast<uint64_t>(v->getBySeqno());
+ cas = v->getCas();
+
+ if (delrv != MutationStatus::NotFound) {
+ if (mutInfo) {
+ mutInfo->seqno = seqno;
+ mutInfo->vbucket_uuid = failovers->getLatestUUID();
+ }
+ if (itemMeta != nullptr) {
+ itemMeta->cas = v->getCas();
+ }
+ }
+ break;
+ case MutationStatus::NeedBgFetch:
+ // We already figured out if a bg fetch is requred for a full-evicted
+ // item above.
+ throw std::logic_error(
+ "VBucket::deleteItem: "
+ "Unexpected NEEDS_BG_FETCH from processSoftDelete");
+ }
+ return ret;
+}
+
+ENGINE_ERROR_CODE VBucket::deleteWithMeta(const DocKey& key,
+ uint64_t& cas,
+ uint64_t* seqno,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ const int bgFetchDelay,
+ const bool force,
+ const ItemMetaData& itemMeta,
+ const bool backfill,
+ const GenerateBySeqno genBySeqno,
+ const GenerateCas generateCas,
+ const uint64_t bySeqno,
+ const bool isReplication) {
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = ht.unlocked_find(
+ key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
+ if (!force) { // Need conflict resolution.
+ if (v) {
+ if (v->isTempInitialItem()) {
+ bgFetch(key, cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ }
+
+ if (!(conflictResolver->resolve(*v,
+ itemMeta,
+ PROTOCOL_BINARY_RAW_BYTES,
+ true))) {
+ ++stats.numOpsDelMetaResolutionFailed;
+ return ENGINE_KEY_EEXISTS;
+ }
+ } else {
+ // Item is 1) deleted or not existent in the value eviction case OR
+ // 2) deleted or evicted in the full eviction.
+ if (maybeKeyExistsInFilter(key)) {
+ return addTempItemAndBGFetch(hbl,
+ key,
+ cookie,
+ engine,
+ bgFetchDelay,
+ true,
+ isReplication);
+ } else {
+ // Even though bloomfilter predicted that item doesn't exist
+ // on disk, we must put this delete on disk if the cas is valid.
+ AddStatus rv = addTempStoredValue(hbl, key, isReplication);
+ if (rv == AddStatus::NoMem) {
+ return ENGINE_ENOMEM;
+ }
+ v = ht.unlocked_find(key,
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+ v->setDeleted();
+ }
+ }
+ } else {
+ if (!v) {
+ // We should always try to persist a delete here.
+ AddStatus rv = addTempStoredValue(hbl, key, isReplication);
+ if (rv == AddStatus::NoMem) {
+ return ENGINE_ENOMEM;
+ }
+ v = ht.unlocked_find(key,
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+ v->setDeleted();
+ v->setCas(cas);
+ } else if (v->isTempInitialItem()) {
+ v->setDeleted();
+ v->setCas(cas);
+ }
+ }
+
+ if (v && v->isLocked(ep_current_time()) &&
+ (getState() == vbucket_state_replica ||
+ getState() == vbucket_state_pending)) {
+ v->unlock();
+ }
+
+ MutationStatus delrv;
+ VBNotifyCtx notifyCtx;
+ if (!v) {
+ if (eviction == FULL_EVICTION) {
+ delrv = MutationStatus::NeedBgFetch;
+ } else {
+ delrv = MutationStatus::NotFound;
+ }
+ } else {
+ VBQueueItemCtx queueItmCtx(genBySeqno,
+ generateCas,
+ TrackCasDrift::Yes,
+ backfill,
+ nullptr /* No pre link step needed */);
+
+ // system xattrs must remain
+ std::unique_ptr<Item> itm;
+ if (mcbp::datatype::is_xattr(v->getDatatype()) &&
+ (itm = pruneXattrDocument(*v, itemMeta))) {
+ std::tie(v, delrv, notifyCtx) =
+ updateStoredValue(hbl, *v, *itm, &queueItmCtx);
+ } else {
+ std::tie(delrv, v, notifyCtx) = processSoftDelete(hbl,
+ *v,
+ cas,
+ itemMeta,
+ queueItmCtx,
+ /*use_meta*/ true,
+ bySeqno);
+ }
+ }
+ cas = v ? v->getCas() : 0;
+
+ switch (delrv) {
+ case MutationStatus::NoMem:
+ return ENGINE_ENOMEM;
+ case MutationStatus::InvalidCas:
+ return ENGINE_KEY_EEXISTS;
+ case MutationStatus::IsLocked:
+ return ENGINE_LOCKED_TMPFAIL;
+ case MutationStatus::NotFound:
+ return ENGINE_KEY_ENOENT;
+ case MutationStatus::WasDirty:
+ case MutationStatus::WasClean: {
+ if (seqno) {
+ *seqno = static_cast<uint64_t>(v->getBySeqno());
+ }
+ // we unlock ht lock here because we want to avoid potential lock
+ // inversions arising from notifyNewSeqno() call
+ hbl.getHTLock().unlock();
+ notifyNewSeqno(notifyCtx);
+ break;
+ }
+ case MutationStatus::NeedBgFetch:
+ hbl.getHTLock().unlock();
+ bgFetch(key, cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ }
+ return ENGINE_SUCCESS;
+}
+
+void VBucket::deleteExpiredItem(const Item& it,
+ time_t startTime,
+ ExpireBy source) {
+
+ // The item is correctly trimmed (by the caller). Fetch the one in the
+ // hashtable and replace it if the CAS match (same item; no race).
+ // If not found in the hashtable we should add it as a deleted item
+ const DocKey& key = it.getKey();
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = ht.unlocked_find(
+ key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
+ if (v) {
+ if (v->getCas() != it.getCas()) {
+ return;
+ }
+
+ if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
+ bool deleted = deleteStoredValue(hbl, *v);
+ if (!deleted) {
+ throw std::logic_error(
+ "VBucket::deleteExpiredItem: "
+ "Failed to delete seqno:" +
+ std::to_string(v->getBySeqno()) + " from bucket " +
+ std::to_string(hbl.getBucketNum()));
+ }
+ } else if (v->isExpired(startTime) && !v->isDeleted()) {
+ VBNotifyCtx notifyCtx;
+ std::tie(std::ignore, std::ignore, notifyCtx) =
+ processExpiredItem(hbl, *v);
+ // we unlock ht lock here because we want to avoid potential lock
+ // inversions arising from notifyNewSeqno() call
+ hbl.getHTLock().unlock();
+ notifyNewSeqno(notifyCtx);
+ }
+ } else {
+ if (eviction == FULL_EVICTION) {
+ // Create a temp item and delete and push it
+ // into the checkpoint queue, only if the bloomfilter
+ // predicts that the item may exist on disk.
+ if (maybeKeyExistsInFilter(key)) {
+ AddStatus rv = addTempStoredValue(hbl, key);
+ if (rv == AddStatus::NoMem) {
+ return;
+ }
+ v = ht.unlocked_find(key,
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+ v->setDeleted();
+ v->setRevSeqno(it.getRevSeqno());
+ v->setValue(it, ht);
+ VBNotifyCtx notifyCtx;
+ std::tie(std::ignore, std::ignore, notifyCtx) =
+ processExpiredItem(hbl, *v);
+ // we unlock ht lock here because we want to avoid potential
+ // lock inversions arising from notifyNewSeqno() call
+ hbl.getHTLock().unlock();
+ notifyNewSeqno(notifyCtx);
+ }
+ }
+ }
+ incExpirationStat(source);
+}
+
+ENGINE_ERROR_CODE VBucket::add(Item& itm,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ const int bgFetchDelay) {
+ auto hbl = ht.getLockedBucket(itm.getKey());
+ StoredValue* v = ht.unlocked_find(itm.getKey(),
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+
+ bool maybeKeyExists = true;
+ if ((v == nullptr || v->isTempInitialItem()) &&
+ (eviction == FULL_EVICTION)) {
+ // Check bloomfilter's prediction
+ if (!maybeKeyExistsInFilter(itm.getKey())) {
+ maybeKeyExists = false;
+ }
+ }
+
+ PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
+ VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
+ GenerateCas::Yes,
+ TrackCasDrift::No,
+ /*isBackfillItem*/ false,
+ &preLinkDocumentContext);
+ AddStatus status;
+ VBNotifyCtx notifyCtx;
+ std::tie(status, notifyCtx) =
+ processAdd(hbl, v, itm, maybeKeyExists, false, &queueItmCtx);
+
+ switch (status) {
+ case AddStatus::NoMem:
+ return ENGINE_ENOMEM;
+ case AddStatus::Exists:
+ return ENGINE_NOT_STORED;
+ case AddStatus::AddTmpAndBgFetch:
+ return addTempItemAndBGFetch(
+ hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
+ case AddStatus::BgFetch:
+ hbl.getHTLock().unlock();
+ bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ case AddStatus::Success:
+ case AddStatus::UnDel:
+ notifyNewSeqno(notifyCtx);
+ itm.setBySeqno(v->getBySeqno());
+ itm.setCas(v->getCas());
+ break;
+ }
+ return ENGINE_SUCCESS;
+}
+
+GetValue VBucket::getAndUpdateTtl(const DocKey& key,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ int bgFetchDelay,
+ time_t exptime) {
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = fetchValidValue(hbl,
+ key,
+ WantsDeleted::Yes,
+ TrackReference::Yes,
+ QueueExpired::Yes);
+
+ if (v) {
+ if (v->isDeleted() || v->isTempDeletedItem() ||
+ v->isTempNonExistentItem()) {
+ return {};
+ }
+
+ if (!v->isResident()) {
+ bgFetch(key, cookie, engine, bgFetchDelay);
+ return GetValue(nullptr, ENGINE_EWOULDBLOCK, v->getBySeqno());
+ }
+ if (v->isLocked(ep_current_time())) {
+ return GetValue(nullptr, ENGINE_KEY_EEXISTS, 0);
+ }
+
+ const bool exptime_mutated = exptime != v->getExptime();
+ if (exptime_mutated) {
+ v->markDirty();
+ v->setExptime(exptime);
+ v->setRevSeqno(v->getRevSeqno() + 1);
+ }
+
+ GetValue rv(
+ v->toItem(v->isLocked(ep_current_time()), getId()).release(),
+ ENGINE_SUCCESS,
+ v->getBySeqno());
+
+ if (exptime_mutated) {
+ VBNotifyCtx notifyCtx = queueDirty(*v);
+ rv.getValue()->setCas(v->getCas());
+ // we unlock ht lock here because we want to avoid potential lock
+ // inversions arising from notifyNewSeqno() call
+ hbl.getHTLock().unlock();
+ notifyNewSeqno(notifyCtx);
+ }
+
+ return rv;
+ } else {
+ if (eviction == VALUE_ONLY) {
+ return {};
+ } else {
+ if (maybeKeyExistsInFilter(key)) {
+ ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
+ hbl, key, cookie, engine, bgFetchDelay, false);
+ return GetValue(NULL, ec, -1, true);
+ } else {
+ // As bloomfilter predicted that item surely doesn't exist
+ // on disk, return ENOENT for getAndUpdateTtl().
+ return {};
+ }
+ }
+ }
+}
+
+MutationStatus VBucket::insertFromWarmup(Item& itm,
+ bool eject,
+ bool keyMetaDataOnly) {
+ if (!StoredValue::hasAvailableSpace(stats, itm)) {
+ return MutationStatus::NoMem;
+ }
+
+ auto hbl = ht.getLockedBucket(itm.getKey());
+ StoredValue* v = ht.unlocked_find(itm.getKey(),
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+
+ if (v == NULL) {
+ v = addNewStoredValue(hbl, itm, /*queueItmCtx*/ nullptr).first;
+ if (keyMetaDataOnly) {
+ v->markNotResident();
+ /* For now ht stats are updated from outside ht. This seems to be
+ a better option for now than passing a flag to
+ addNewStoredValue() just for this func */
+ ++(ht.numNonResidentItems);
+ }
+ /* For now ht stats are updated from outside ht. This seems to be
+ a better option for now than passing a flag to
+ addNewStoredValue() just for this func.
+ We need to decrNumTotalItems because ht.numTotalItems is already
+ set by warmup when it estimated the item count from disk */
+ ht.decrNumTotalItems();
+ v->setNewCacheItem(false);
+ } else {
+ if (keyMetaDataOnly) {
+ // We don't have a better error code ;)
+ return MutationStatus::InvalidCas;
+ }
+
+ // Verify that the CAS isn't changed
+ if (v->getCas() != itm.getCas()) {
+ if (v->getCas() == 0) {
+ v->setCas(itm.getCas());
+ v->setFlags(itm.getFlags());
+ v->setExptime(itm.getExptime());
+ v->setRevSeqno(itm.getRevSeqno());
+ } else {
+ return MutationStatus::InvalidCas;
+ }
+ }
+ updateStoredValue(hbl, *v, itm, /*queueItmCtx*/ nullptr);
+ }
+
+ v->markClean();
+
+ if (eject && !keyMetaDataOnly) {
+ ht.unlocked_ejectItem(v, eviction);
+ }
+
+ return MutationStatus::NotFound;
+}
+
+GetValue VBucket::getInternal(const DocKey& key,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ int bgFetchDelay,
+ get_options_t options,
+ bool diskFlushAll) {
+ const TrackReference trackReference = (options & TRACK_REFERENCE)
+ ? TrackReference::Yes
+ : TrackReference::No;
+ const bool getDeletedValue = (options & GET_DELETED_VALUE);
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = fetchValidValue(
+ hbl, key, WantsDeleted::Yes, trackReference, QueueExpired::Yes);
+ if (v) {
+ if (v->isDeleted() && !getDeletedValue) {
+ return GetValue();
+ }
+ if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
+ // Delete a temp non-existent item to ensure that
+ // if the get were issued over an item that doesn't
+ // exist, then we dont preserve a temp item.
+ if (options & DELETE_TEMP) {
+ deleteStoredValue(hbl, *v);
+ }
+ return GetValue();
+ }
+
+ // If the value is not resident, wait for it...
+ if (!v->isResident()) {
+ return getInternalNonResident(
+ key, cookie, engine, bgFetchDelay, options, *v);
+ }
+
+ // Should we hide (return -1) for the items' CAS?
+ const bool hide_cas =
+ (options & HIDE_LOCKED_CAS) && v->isLocked(ep_current_time());
+ return GetValue(v->toItem(hide_cas, getId()).release(),
+ ENGINE_SUCCESS,
+ v->getBySeqno(),
+ false,
+ v->getNRUValue());
+ } else {
+ if (!getDeletedValue && (eviction == VALUE_ONLY || diskFlushAll)) {
+ return GetValue();
+ }
+
+ if (maybeKeyExistsInFilter(key)) {
+ ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
+ if (options &
+ QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
+ ec = addTempItemAndBGFetch(
+ hbl, key, cookie, engine, bgFetchDelay, false);
+ }
+ return GetValue(NULL, ec, -1, true);
+ } else {
+ // As bloomfilter predicted that item surely doesn't exist
+ // on disk, return ENOENT, for getInternal().
+ return GetValue();
+ }
+ }
+}
+
+ENGINE_ERROR_CODE VBucket::getMetaData(const DocKey& key,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ int bgFetchDelay,
+ ItemMetaData& metadata,
+ uint32_t& deleted,
+ uint8_t& datatype) {
+ deleted = 0;
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = ht.unlocked_find(
+ key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
+
+ if (v) {
+ stats.numOpsGetMeta++;
+ if (v->isTempInitialItem()) {
+ // Need bg meta fetch.
+ bgFetch(key, cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ } else if (v->isTempNonExistentItem()) {
+ metadata.cas = v->getCas();
+ return ENGINE_KEY_ENOENT;
+ } else {
+ if (v->isTempDeletedItem() || v->isDeleted() ||
+ v->isExpired(ep_real_time())) {
+ deleted |= GET_META_ITEM_DELETED_FLAG;
+ }
+
+ if (v->isLocked(ep_current_time())) {
+ metadata.cas = static_cast<uint64_t>(-1);
+ } else {
+ metadata.cas = v->getCas();
+ }
+ metadata.flags = v->getFlags();
+ metadata.exptime = v->getExptime();
+ metadata.revSeqno = v->getRevSeqno();
+ datatype = v->getDatatype();
+
+ return ENGINE_SUCCESS;
+ }
+ } else {
+ // The key wasn't found. However, this may be because it was previously
+ // deleted or evicted with the full eviction strategy.
+ // So, add a temporary item corresponding to the key to the hash table
+ // and schedule a background fetch for its metadata from the persistent
+ // store. The item's state will be updated after the fetch completes.
+ //
+ // Schedule this bgFetch only if the key is predicted to be may-be
+ // existent on disk by the bloomfilter.
+
+ if (maybeKeyExistsInFilter(key)) {
+ return addTempItemAndBGFetch(
+ hbl, key, cookie, engine, bgFetchDelay, true);
+ } else {
+ stats.numOpsGetMeta++;
+ return ENGINE_KEY_ENOENT;
+ }
+ }
+}
+
+ENGINE_ERROR_CODE VBucket::getKeyStats(const DocKey& key,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ int bgFetchDelay,
+ struct key_stats& kstats,
+ WantsDeleted wantsDeleted) {
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = fetchValidValue(hbl,
+ key,
+ WantsDeleted::Yes,
+ TrackReference::Yes,
+ QueueExpired::Yes);
+
+ if (v) {
+ if ((v->isDeleted() && wantsDeleted == WantsDeleted::No) ||
+ v->isTempNonExistentItem() || v->isTempDeletedItem()) {
+ return ENGINE_KEY_ENOENT;
+ }
+ if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
+ hbl.getHTLock().unlock();
+ bgFetch(key, cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ }
+ kstats.logically_deleted = v->isDeleted();
+ kstats.dirty = v->isDirty();
+ kstats.exptime = v->getExptime();
+ kstats.flags = v->getFlags();
+ kstats.cas = v->getCas();
+ kstats.vb_state = getState();
+ return ENGINE_SUCCESS;
+ } else {
+ if (eviction == VALUE_ONLY) {
+ return ENGINE_KEY_ENOENT;
+ } else {
+ if (maybeKeyExistsInFilter(key)) {
+ return addTempItemAndBGFetch(
+ hbl, key, cookie, engine, bgFetchDelay, true);
+ } else {
+ // If bgFetch were false, or bloomfilter predicted that
+ // item surely doesn't exist on disk, return ENOENT for
+ // getKeyStats().
+ return ENGINE_KEY_ENOENT;
+ }
+ }
+ }
+}
+
+GetValue VBucket::getLocked(const DocKey& key,
+ rel_time_t currentTime,
+ uint32_t lockTimeout,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ int bgFetchDelay) {
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = fetchValidValue(hbl,
+ key,
+ WantsDeleted::Yes,
+ TrackReference::Yes,
+ QueueExpired::Yes);
+
+ if (v) {
+ if (v->isDeleted() || v->isTempNonExistentItem() ||
+ v->isTempDeletedItem()) {
+ return GetValue(NULL, ENGINE_KEY_ENOENT);
+ }
+
+ // if v is locked return error
+ if (v->isLocked(currentTime)) {
+ return GetValue(NULL, ENGINE_TMPFAIL);
+ }
+
+ // If the value is not resident, wait for it...
+ if (!v->isResident()) {
+ if (cookie) {
+ bgFetch(key, cookie, engine, bgFetchDelay);
+ }
+ return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
+ }
+
+ // acquire lock and increment cas value
+ v->lock(currentTime + lockTimeout);
+
+ auto it = v->toItem(false, getId());
+ it->setCas(nextHLCCas());
+ v->setCas(it->getCas());
+
+ return GetValue(it.release());
+
+ } else {
+ // No value found in the hashtable.
+ switch (eviction) {
+ case VALUE_ONLY:
+ return GetValue(NULL, ENGINE_KEY_ENOENT);
+
+ case FULL_EVICTION:
+ if (maybeKeyExistsInFilter(key)) {
+ ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
+ hbl, key, cookie, engine, bgFetchDelay, false);
+ return GetValue(NULL, ec, -1, true);
+ } else {
+ // As bloomfilter predicted that item surely doesn't exist
+ // on disk, return ENOENT for getLocked().
+ return GetValue(NULL, ENGINE_KEY_ENOENT);
+ }
+ }
+ return GetValue(); // just to prevent compiler warning
+ }
+}
+
+void VBucket::deletedOnDiskCbk(const Item& queuedItem, bool deleted) {
+ auto hbl = ht.getLockedBucket(queuedItem.getKey());
+ StoredValue* v = fetchValidValue(hbl,
+ queuedItem.getKey(),
+ WantsDeleted::Yes,
+ TrackReference::No,
+ QueueExpired::Yes);
+ // Delete the item in the hash table iff:
+ // 1. Item is existent in hashtable, and deleted flag is true
+ // 2. rev seqno of queued item matches rev seqno of hash table item
+ if (v && v->isDeleted() && (queuedItem.getRevSeqno() == v->getRevSeqno())) {
+ bool isDeleted = deleteStoredValue(hbl, *v);
+ if (!isDeleted) {
+ throw std::logic_error(
+ "deletedOnDiskCbk:callback: "
+ "Failed to delete key with seqno:" +
+ std::to_string(v->getBySeqno()) + "' from bucket " +
+ std::to_string(hbl.getBucketNum()));
+ }
+
+ /**
+ * Deleted items are to be added to the bloomfilter,
+ * in either eviction policy.
+ */
+ addToFilter(queuedItem.getKey());
+ }
+
+ if (deleted) {
+ ++stats.totalPersisted;
+ ++opsDelete;
+ }
+ doStatsForFlushing(queuedItem, queuedItem.size());
+ --stats.diskQueueSize;
+ decrMetaDataDisk(queuedItem);
+}
+
+bool VBucket::deleteKey(const DocKey& key) {
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = ht.unlocked_find(
+ key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
+ if (!v) {
+ return false;
+ }
+ return deleteStoredValue(hbl, *v);
+}
+
+void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
+ uint64_t prevHighSeqno) {
+ failovers->pruneEntries(rollbackResult.highSeqno);
+ checkpointManager.clear(*this, rollbackResult.highSeqno);
+ setPersistedSnapshot(rollbackResult.snapStartSeqno,
+ rollbackResult.snapEndSeqno);
+ incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
+ checkpointManager.setOpenCheckpointId(1);
+}
+
+void VBucket::dump() const {
+ std::cerr << "VBucket[" << this << "] with state: " << toString(getState())
+ << " numItems:" << getNumItems()
+ << " numNonResident:" << getNumNonResidentItems()
+ << " ht: " << std::endl << " " << ht << std::endl
+ << "]" << std::endl;
+}
+
+void VBucket::_addStats(bool details, ADD_STAT add_stat, const void* c) {
addStat(NULL, toString(state), add_stat, c);
if (details) {
- size_t numItems = getNumItems(policy);
+ size_t numItems = getNumItems();
size_t tempItems = getNumTempItems();
addStat("num_items", numItems, add_stat, c);
addStat("num_temp_items", tempItems, add_stat, c);
- addStat("num_non_resident", getNumNonResidentItems(policy),
- add_stat, c);
+ addStat("num_non_resident", getNumNonResidentItems(), add_stat, c);
addStat("ht_memory", ht.memorySize(), add_stat, c);
addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
addStat("ht_cache_size", ht.cacheSize.load(), add_stat, c);
+ addStat("ht_size", ht.getSize(), add_stat, c);
addStat("num_ejects", ht.getNumEjects(), add_stat, c);
addStat("ops_create", opsCreate.load(), add_stat, c);
addStat("ops_update", opsUpdate.load(), add_stat, c);
addStat("ops_delete", opsDelete.load(), add_stat, c);
addStat("ops_reject", opsReject.load(), add_stat, c);
addStat("queue_size", dirtyQueueSize.load(), add_stat, c);
+ addStat("backfill_queue_size", getBackfillSize(), add_stat, c);
addStat("queue_memory", dirtyQueueMem.load(), add_stat, c);
addStat("queue_fill", dirtyQueueFill.load(), add_stat, c);
addStat("queue_drain", dirtyQueueDrain.load(), add_stat, c);
addStat("queue_age", getQueueAge(), add_stat, c);
addStat("pending_writes", dirtyQueuePendingWrites.load(), add_stat, c);
- addStat("db_data_size", fileSpaceUsed.load(), add_stat, c);
- addStat("db_file_size", fileSize.load(), add_stat, c);
+
addStat("high_seqno", getHighSeqno(), add_stat, c);
addStat("uuid", failovers->getLatestUUID(), add_stat, c);
addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
addStat("bloom_filter_size", getFilterSize(), add_stat, c);
addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
+ addStat("hp_vb_req_size", getHighPriorityChkSize(), add_stat, c);
hlc.addStats(statPrefix, add_stat, c);
}
}
}
} while (!dirtyQueuePendingWrites.compare_exchange_strong(oldVal, newVal));
}
+
+std::pair<MutationStatus, VBNotifyCtx> VBucket::processSet(
+ const HashTable::HashBucketLock& hbl,
+ StoredValue*& v,
+ Item& itm,
+ uint64_t cas,
+ bool allowExisting,
+ bool hasMetaData,
+ const VBQueueItemCtx* queueItmCtx,
+ bool maybeKeyExists,
+ bool isReplication) {
+ if (!hbl.getHTLock()) {
+ throw std::invalid_argument(
+ "VBucket::processSet: htLock not held for "
+ "VBucket " +
+ std::to_string(getId()));
+ }
+
+ if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
+ return {MutationStatus::NoMem, VBNotifyCtx()};
+ }
+
+ if (cas && eviction == FULL_EVICTION && maybeKeyExists) {
+ if (!v || v->isTempInitialItem()) {
+ return {MutationStatus::NeedBgFetch, VBNotifyCtx()};
+ }
+ }
+
+ /*
+ * prior to checking for the lock, we should check if this object
+ * has expired. If so, then check if CAS value has been provided
+ * for this set op. In this case the operation should be denied since
+ * a cas operation for a key that doesn't exist is not a very cool
+ * thing to do. See MB 3252
+ */
+ if (v && v->isExpired(ep_real_time()) && !hasMetaData && !itm.isDeleted()) {
+ if (v->isLocked(ep_current_time())) {
+ v->unlock();
+ }
+ if (cas) {
+ /* item has expired and cas value provided. Deny ! */
+ return {MutationStatus::NotFound, VBNotifyCtx()};
+ }
+ }
+
+ if (v) {
+ if (!allowExisting && !v->isTempItem()) {
+ return {MutationStatus::InvalidCas, VBNotifyCtx()};
+ }
+ if (v->isLocked(ep_current_time())) {
+ /*
+ * item is locked, deny if there is cas value mismatch
+ * or no cas value is provided by the user
+ */
+ if (cas != v->getCas()) {
+ return {MutationStatus::IsLocked, VBNotifyCtx()};
+ }
+ /* allow operation*/
+ v->unlock();
+ } else if (cas && cas != v->getCas()) {
+ if (v->isTempNonExistentItem()) {
+ // This is a temporary item which marks a key as non-existent;
+ // therefore specifying a non-matching CAS should be exposed
+ // as item not existing.
+ return {MutationStatus::NotFound, VBNotifyCtx()};
+ }
+ if ((v->isTempDeletedItem() || v->isDeleted()) && !itm.isDeleted()) {
+ // Existing item is deleted, and we are not replacing it with
+ // a (different) deleted value - return not existing.
+ return {MutationStatus::NotFound, VBNotifyCtx()};
+ }
+ // None of the above special cases; the existing item cannot be
+ // modified with the specified CAS.
+ return {MutationStatus::InvalidCas, VBNotifyCtx()};
+ }
+ if (!hasMetaData) {
+ itm.setRevSeqno(v->getRevSeqno() + 1);
+ /* MB-23530: We must ensure that a replace operation (i.e.
+ * set with a CAS) /fails/ if the old document is deleted; it
+ * logically "doesn't exist". However, if the new value is deleted
+ * this op is a /delete/ with a CAS and we must permit a
+ * deleted -> deleted transition for Deleted Bodies.
+ */
+ if (cas && (v->isDeleted() || v->isTempDeletedItem()) &&
+ !itm.isDeleted()) {
+ return {MutationStatus::NotFound, VBNotifyCtx()};
+ }
+ }
+
+ MutationStatus status;
+ VBNotifyCtx notifyCtx;
+ std::tie(v, status, notifyCtx) =
+ updateStoredValue(hbl, *v, itm, queueItmCtx);
+ return {status, notifyCtx};
+ } else if (cas != 0) {
+ return {MutationStatus::NotFound, VBNotifyCtx()};
+ } else {
+ VBNotifyCtx notifyCtx;
+ std::tie(v, notifyCtx) = addNewStoredValue(hbl, itm, queueItmCtx);
+ if (!hasMetaData) {
+ updateRevSeqNoOfNewStoredValue(*v);
+ itm.setRevSeqno(v->getRevSeqno());
+ }
+ return {MutationStatus::WasClean, notifyCtx};
+ }
+}
+
+std::pair<AddStatus, VBNotifyCtx> VBucket::processAdd(
+ const HashTable::HashBucketLock& hbl,
+ StoredValue*& v,
+ Item& itm,
+ bool maybeKeyExists,
+ bool isReplication,
+ const VBQueueItemCtx* queueItmCtx) {
+ if (!hbl.getHTLock()) {
+ throw std::invalid_argument(
+ "VBucket::processAdd: htLock not held for "
+ "VBucket " +
+ std::to_string(getId()));
+ }
+
+ if (v && !v->isDeleted() && !v->isExpired(ep_real_time()) &&
+ !v->isTempItem()) {
+ return {AddStatus::Exists, VBNotifyCtx()};
+ }
+ if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
+ return {AddStatus::NoMem, VBNotifyCtx()};
+ }
+
+ std::pair<AddStatus, VBNotifyCtx> rv = {AddStatus::Success, VBNotifyCtx()};
+
+ if (v) {
+ if (v->isTempInitialItem() && eviction == FULL_EVICTION &&
+ maybeKeyExists) {
+ // Need to figure out if an item exists on disk
+ return {AddStatus::BgFetch, VBNotifyCtx()};
+ }
+
+ rv.first = (v->isDeleted() || v->isExpired(ep_real_time()))
+ ? AddStatus::UnDel
+ : AddStatus::Success;
+
+ if (v->isTempDeletedItem()) {
+ itm.setRevSeqno(v->getRevSeqno() + 1);
+ } else {
+ itm.setRevSeqno(ht.getMaxDeletedRevSeqno() + 1);
+ }
+
+ if (!v->isTempItem()) {
+ itm.setRevSeqno(v->getRevSeqno() + 1);
+ }
+
+ std::tie(v, std::ignore, rv.second) =
+ updateStoredValue(hbl, *v, itm, queueItmCtx);
+ } else {
+ if (itm.getBySeqno() != StoredValue::state_temp_init) {
+ if (eviction == FULL_EVICTION && maybeKeyExists) {
+ return {AddStatus::AddTmpAndBgFetch, VBNotifyCtx()};
+ }
+ }
+ std::tie(v, rv.second) = addNewStoredValue(hbl, itm, queueItmCtx);
+ updateRevSeqNoOfNewStoredValue(*v);
+ itm.setRevSeqno(v->getRevSeqno());
+ if (v->isTempItem()) {
+ rv.first = AddStatus::BgFetch;
+ }
+ }
+
+ if (v->isTempItem()) {
+ v->setNRUValue(MAX_NRU_VALUE);
+ }
+ return rv;
+}
+
+std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
+VBucket::processSoftDelete(const HashTable::HashBucketLock& hbl,
+ StoredValue& v,
+ uint64_t cas,
+ const ItemMetaData& metadata,
+ const VBQueueItemCtx& queueItmCtx,
+ bool use_meta,
+ uint64_t bySeqno) {
+ if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
+ return std::make_tuple(MutationStatus::NeedBgFetch, &v, VBNotifyCtx());
+ }
+
+ if (v.isLocked(ep_current_time())) {
+ if (cas != v.getCas()) {
+ return std::make_tuple(MutationStatus::IsLocked, &v, VBNotifyCtx());
+ }
+ v.unlock();
+ }
+
+ if (cas != 0 && cas != v.getCas()) {
+ return std::make_tuple(MutationStatus::InvalidCas, &v, VBNotifyCtx());
+ }
+
+ /* allow operation */
+ v.unlock();
+
+ MutationStatus rv =
+ v.isDirty() ? MutationStatus::WasDirty : MutationStatus::WasClean;
+
+ if (use_meta) {
+ v.setCas(metadata.cas);
+ v.setFlags(metadata.flags);
+ v.setExptime(metadata.exptime);
+ }
+
+ v.setRevSeqno(metadata.revSeqno);
+ VBNotifyCtx notifyCtx;
+ StoredValue* newSv;
+ std::tie(newSv, notifyCtx) =
+ softDeleteStoredValue(hbl,
+ v,
+ /*onlyMarkDeleted*/ false,
+ queueItmCtx,
+ bySeqno);
+ ht.updateMaxDeletedRevSeqno(metadata.revSeqno);
+ return std::make_tuple(rv, newSv, notifyCtx);
+}
+
+std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
+VBucket::processExpiredItem(const HashTable::HashBucketLock& hbl,
+ StoredValue& v) {
+ if (!hbl.getHTLock()) {
+ throw std::invalid_argument(
+ "VBucket::processExpiredItem: htLock not held for VBucket " +
+ std::to_string(getId()));
+ }
+
+ if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
+ return std::make_tuple(MutationStatus::NeedBgFetch,
+ &v,
+ queueDirty(v,
+ GenerateBySeqno::Yes,
+ GenerateCas::Yes,
+ /*isBackfillItem*/ false));
+ }
+
+ /* If the datatype is XATTR, mark the item as deleted
+ * but don't delete the value as system xattrs can
+ * still be queried by mobile clients even after
+ * deletion.
+ * TODO: The current implementation is inefficient
+ * but functionally correct and for performance reasons
+ * only the system xattrs need to be stored.
+ */
+ value_t value = v.getValue();
+ bool onlyMarkDeleted =
+ value && mcbp::datatype::is_xattr(value->getDataType());
+ v.setRevSeqno(v.getRevSeqno() + 1);
+ VBNotifyCtx notifyCtx;
+ StoredValue* newSv;
+ std::tie(newSv, notifyCtx) =
+ softDeleteStoredValue(hbl,
+ v,
+ onlyMarkDeleted,
+ VBQueueItemCtx(GenerateBySeqno::Yes,
+ GenerateCas::Yes,
+ TrackCasDrift::No,
+ /*isBackfillItem*/ false,
+ nullptr /* no pre link */),
+ v.getBySeqno());
+ ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno() + 1);
+ return std::make_tuple(MutationStatus::NotFound, newSv, notifyCtx);
+}
+
+bool VBucket::deleteStoredValue(const HashTable::HashBucketLock& hbl,
+ StoredValue& v) {
+ if (!v.isDeleted() && v.isLocked(ep_current_time())) {
+ return false;
+ }
+
+ /* StoredValue deleted here. If any other in-memory data structures are
+ using the StoredValue intrusively then they must have handled the delete
+ by this point */
+ ht.unlocked_del(hbl, v.getKey());
+ return true;
+}
+
+AddStatus VBucket::addTempStoredValue(const HashTable::HashBucketLock& hbl,
+ const DocKey& key,
+ bool isReplication) {
+ uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_RAW_BYTES};
+ static_assert(sizeof(ext_meta) == 1,
+ "VBucket::addTempStoredValue(): expected "
+ "EXT_META_LEN to be 1");
+ Item itm(key,
+ /*flags*/ 0,
+ /*exp*/ 0,
+ /*data*/ NULL,
+ /*size*/ 0,
+ ext_meta,
+ sizeof(ext_meta),
+ 0,
+ StoredValue::state_temp_init);
+
+ /* if a temp item for a possibly deleted, set it non-resident by resetting
+ the value cuz normally a new item added is considered resident which
+ does not apply for temp item. */
+ StoredValue* v = nullptr;
+ return processAdd(hbl, v, itm, true, isReplication, nullptr).first;
+}
+
+void VBucket::notifyNewSeqno(const VBNotifyCtx& notifyCtx) {
+ if (newSeqnoCb) {
+ newSeqnoCb->callback(getId(), notifyCtx);
+ }
+}
+
+/*
+ * Queue the item to the checkpoint and return the seqno the item was
+ * allocated.
+ */
+int64_t VBucket::queueItem(Item* item, OptionalSeqno seqno) {
+ item->setVBucketId(id);
+ queued_item qi(item);
+ checkpointManager.queueDirty(
+ *this,
+ qi,
+ seqno ? GenerateBySeqno::No : GenerateBySeqno::Yes,
+ GenerateCas::Yes,
+ nullptr /* No pre link step as this is for system events */);
+ VBNotifyCtx notifyCtx;
+ // If the seqno is initialized, skip replication notification
+ notifyCtx.notifyReplication = !seqno.is_initialized();
+ notifyCtx.notifyFlusher = true;
+ notifyCtx.bySeqno = qi->getBySeqno();
+ notifyNewSeqno(notifyCtx);
+ return qi->getBySeqno();
+}
+
+VBNotifyCtx VBucket::queueDirty(StoredValue& v,
+ const VBQueueItemCtx& queueItmCtx) {
+ if (queueItmCtx.trackCasDrift == TrackCasDrift::Yes) {
+ setMaxCasAndTrackDrift(v.getCas());
+ }
+ return queueDirty(v,
+ queueItmCtx.genBySeqno,
+ queueItmCtx.genCas,
+ queueItmCtx.isBackfillItem,
+ queueItmCtx.preLinkDocumentContext);
+}
+
+void VBucket::updateRevSeqNoOfNewStoredValue(StoredValue& v) {
+ /**
+ * Possibly, this item is being recreated. Conservatively assign it
+ * a seqno that is greater than the greatest seqno of all deleted
+ * items seen so far.
+ */
+ uint64_t seqno = ht.getMaxDeletedRevSeqno();
+ if (!v.isTempItem()) {
+ ++seqno;
+ }
+ v.setRevSeqno(seqno);
+}
+
+void VBucket::addHighPriorityVBEntry(uint64_t seqnoOrChkId,
+ const void* cookie,
+ HighPriorityVBNotify reqType) {
+ std::unique_lock<std::mutex> lh(hpVBReqsMutex);
+ hpVBReqs.push_back(HighPriorityVBEntry(cookie, seqnoOrChkId, reqType));
+ numHpVBReqs.store(hpVBReqs.size());
+
+ LOG(EXTENSION_LOG_NOTICE,
+ "Added high priority async request %s "
+ "for vb:%" PRIu16 ", Check for:%" PRIu64 ", "
+ "Persisted upto:%" PRIu64 ", cookie:%p",
+ to_string(reqType).c_str(),
+ getId(),
+ seqnoOrChkId,
+ getPersistenceSeqno(),
+ cookie);
+}
+
+std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifications(
+ EventuallyPersistentEngine& engine,
+ uint64_t idNum,
+ HighPriorityVBNotify notifyType) {
+ std::unique_lock<std::mutex> lh(hpVBReqsMutex);
+ std::map<const void*, ENGINE_ERROR_CODE> toNotify;
+
+ auto entry = hpVBReqs.begin();
+
+ while (entry != hpVBReqs.end()) {
+ if (notifyType != entry->reqType) {
+ ++entry;
+ continue;
+ }
+
+ std::string logStr(to_string(notifyType));
+
+ hrtime_t wall_time(gethrtime() - entry->start);
+ size_t spent = wall_time / 1000000000;
+ if (entry->id <= idNum) {
+ toNotify[entry->cookie] = ENGINE_SUCCESS;
+ stats.chkPersistenceHisto.add(wall_time / 1000);
+ adjustCheckpointFlushTimeout(wall_time / 1000000000);
+ LOG(EXTENSION_LOG_NOTICE,
+ "Notified the completion of %s "
+ "for vbucket %" PRIu16 ", Check for: %" PRIu64
+ ", "
+ "Persisted upto: %" PRIu64 ", cookie %p",
+ logStr.c_str(),
+ getId(),
+ entry->id,
+ idNum,
+ entry->cookie);
+ entry = hpVBReqs.erase(entry);
+ } else if (spent > getCheckpointFlushTimeout()) {
+ adjustCheckpointFlushTimeout(spent);
+ engine.storeEngineSpecific(entry->cookie, NULL);
+ toNotify[entry->cookie] = ENGINE_TMPFAIL;
+ LOG(EXTENSION_LOG_WARNING,
+ "Notified the timeout on %s "
+ "for vbucket %" PRIu16 ", Check for: %" PRIu64
+ ", "
+ "Persisted upto: %" PRIu64 ", cookie %p",
+ logStr.c_str(),
+ getId(),
+ entry->id,
+ idNum,
+ entry->cookie);
+ entry = hpVBReqs.erase(entry);
+ } else {
+ ++entry;
+ }
+ }
+ numHpVBReqs.store(hpVBReqs.size());
+ return toNotify;
+}
+
+std::map<const void*, ENGINE_ERROR_CODE> VBucket::tmpFailAndGetAllHpNotifies(
+ EventuallyPersistentEngine& engine) {
+ std::map<const void*, ENGINE_ERROR_CODE> toNotify;
+
+ LockHolder lh(hpVBReqsMutex);
+
+ for (auto& entry : hpVBReqs) {
+ toNotify[entry.cookie] = ENGINE_TMPFAIL;
+ engine.storeEngineSpecific(entry.cookie, NULL);
+ }
+ hpVBReqs.clear();
+
+ return toNotify;
+}
+
+void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
+ size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
+
+ if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
+ chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
+ } else if (wall_time <= middle) {
+ chkFlushTimeout = middle;
+ } else {
+ chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
+ }
+}
+
+size_t VBucket::getCheckpointFlushTimeout() {
+ return chkFlushTimeout;
+}
+
+std::unique_ptr<Item> VBucket::pruneXattrDocument(
+ StoredValue& v, const ItemMetaData& itemMeta) {
+ // Need to take a copy of the value, prune it, and add it back
+
+ // Create work-space document
+ std::vector<uint8_t> workspace(v.getValue()->vlength());
+ std::copy_n(v.getValue()->getData(),
+ v.getValue()->vlength(),
+ workspace.begin());
+
+ // Now attach to the XATTRs in the document
+ auto sz = cb::xattr::get_body_offset(
+ {reinterpret_cast<char*>(workspace.data()), workspace.size()});
+
+ cb::xattr::Blob xattr({workspace.data(), sz});
+ xattr.prune_user_keys();
+
+ auto prunedXattrs = xattr.finalize();
+
+ if (prunedXattrs.size()) {
+ // Something remains - Create a Blob and copy-in just the XATTRs
+ auto newValue =
+ Blob::New(reinterpret_cast<const char*>(prunedXattrs.data()),
+ prunedXattrs.size(),
+ const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(
+ v.getValue()->getExtMeta())),
+ v.getValue()->getExtLen());
+
+ return std::make_unique<Item>(v.getKey(),
+ itemMeta.flags,
+ itemMeta.exptime,
+ newValue,
+ itemMeta.cas,
+ v.getBySeqno(),
+ getId(),
+ itemMeta.revSeqno);
+ } else {
+ return {};
+ }
+}
+
+void VBucket::DeferredDeleter::operator()(VBucket* vb) const {
+ // If the vbucket is marked as deleting then we must schedule task to
+ // perform the resource destruction (memory/disk).
+ if (vb->isDeletionDeferred()) {
+ vb->scheduleDeferredDeletion(engine);
+ return;
+ }
+ delete vb;
+}