}
}
-void VBucket::addStats(bool details, ADD_STAT add_stat, const void *c,
- item_eviction_policy_t policy) {
+VBNotifyCtx VBucket::queueDirty(
+ StoredValue& v,
+ const GenerateBySeqno generateBySeqno,
+ const GenerateCas generateCas,
+ const bool isBackfillItem,
+ PreLinkDocumentContext* preLinkDocumentContext) {
+ VBNotifyCtx notifyCtx;
+
+ queued_item qi(v.toItem(false, getId()));
+
+ if (isBackfillItem) {
+ queueBackfillItem(qi, generateBySeqno);
+ notifyCtx.notifyFlusher = true;
+ /* During backfill on a TAP receiver we need to update the snapshot
+ range in the checkpoint. Has to be done here because in case of TAP
+ backfill, above, we use vb.queueBackfillItem() instead of
+ vb.checkpointManager.queueDirty() */
+ if (generateBySeqno == GenerateBySeqno::Yes) {
+ checkpointManager.resetSnapshotRange();
+ }
+ } else {
+ notifyCtx.notifyFlusher =
+ checkpointManager.queueDirty(*this,
+ qi,
+ generateBySeqno,
+ generateCas,
+ preLinkDocumentContext);
+ notifyCtx.notifyReplication = true;
+ if (GenerateCas::Yes == generateCas) {
+ v.setCas(qi->getCas());
+ }
+ }
+
+ v.setBySeqno(qi->getBySeqno());
+ notifyCtx.bySeqno = qi->getBySeqno();
+
+ return notifyCtx;
+}
+
+StoredValue* VBucket::fetchValidValue(HashTable::HashBucketLock& hbl,
+ const DocKey& key,
+ WantsDeleted wantsDeleted,
+ TrackReference trackReference,
+ QueueExpired queueExpired) {
+ if (!hbl.getHTLock()) {
+ throw std::logic_error(
+ "Hash bucket lock not held in "
+ "VBucket::fetchValidValue() for hash bucket: " +
+ std::to_string(hbl.getBucketNum()) + "for key: " +
+ std::string(reinterpret_cast<const char*>(key.data()),
+ key.size()));
+ }
+ StoredValue* v = ht.unlocked_find(
+ key, hbl.getBucketNum(), wantsDeleted, trackReference);
+ if (v && !v->isDeleted() && !v->isTempItem()) {
+ // In the deleted case, we ignore expiration time.
+ if (v->isExpired(ep_real_time())) {
+ if (getState() != vbucket_state_active) {
+ return wantsDeleted == WantsDeleted::Yes ? v : NULL;
+ }
+
+ // queueDirty only allowed on active VB
+ if (queueExpired == QueueExpired::Yes &&
+ getState() == vbucket_state_active) {
+ incExpirationStat(ExpireBy::Access);
+ handlePreExpiry(*v);
+ VBNotifyCtx notifyCtx;
+ std::tie(std::ignore, v, notifyCtx) =
+ processExpiredItem(hbl, *v);
+ notifyNewSeqno(notifyCtx);
+ }
+ return wantsDeleted == WantsDeleted::Yes ? v : NULL;
+ }
+ }
+ return v;
+}
+
+void VBucket::incExpirationStat(const ExpireBy source) {
+ switch (source) {
+ case ExpireBy::Pager:
+ ++stats.expired_pager;
+ break;
+ case ExpireBy::Compactor:
+ ++stats.expired_compactor;
+ break;
+ case ExpireBy::Access:
+ ++stats.expired_access;
+ break;
+ }
+ ++numExpiredItems;
+}
+
+MutationStatus VBucket::setFromInternal(Item& itm) {
+ return ht.set(itm);
+}
+
+ENGINE_ERROR_CODE VBucket::set(Item& itm,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ const int bgFetchDelay) {
+ bool cas_op = (itm.getCas() != 0);
+ auto hbl = ht.getLockedBucket(itm.getKey());
+ StoredValue* v = ht.unlocked_find(itm.getKey(),
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+ if (v && v->isLocked(ep_current_time()) &&
+ (getState() == vbucket_state_replica ||
+ getState() == vbucket_state_pending)) {
+ v->unlock();
+ }
+
+ bool maybeKeyExists = true;
+ // If we didn't find a valid item, check Bloomfilter's prediction if in
+ // full eviction policy and for a CAS operation.
+ if ((v == nullptr || v->isTempInitialItem()) &&
+ (eviction == FULL_EVICTION) && (itm.getCas() != 0)) {
+ // Check Bloomfilter's prediction
+ if (!maybeKeyExistsInFilter(itm.getKey())) {
+ maybeKeyExists = false;
+ }
+ }
+
+ PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
+ VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
+ GenerateCas::Yes,
+ TrackCasDrift::No,
+ /*isBackfillItem*/ false,
+ &preLinkDocumentContext);
+
+ MutationStatus status;
+ VBNotifyCtx notifyCtx;
+ std::tie(status, notifyCtx) = processSet(hbl,
+ v,
+ itm,
+ itm.getCas(),
+ /*allowExisting*/ true,
+ /*hashMetaData*/ false,
+ &queueItmCtx,
+ maybeKeyExists);
+
+ ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
+ switch (status) {
+ case MutationStatus::NoMem:
+ ret = ENGINE_ENOMEM;
+ break;
+ case MutationStatus::InvalidCas:
+ ret = ENGINE_KEY_EEXISTS;
+ break;
+ case MutationStatus::IsLocked:
+ ret = ENGINE_LOCKED;
+ break;
+ case MutationStatus::NotFound:
+ if (cas_op) {
+ ret = ENGINE_KEY_ENOENT;
+ break;
+ }
+ // FALLTHROUGH
+ case MutationStatus::WasDirty:
+ // Even if the item was dirty, push it into the vbucket's open
+ // checkpoint.
+ case MutationStatus::WasClean:
+ notifyNewSeqno(notifyCtx);
+
+ itm.setBySeqno(v->getBySeqno());
+ itm.setCas(v->getCas());
+ break;
+ case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
+ // +
+ // full eviction.
+ if (v) {
+ // temp item is already created. Simply schedule a bg fetch job
+ hbl.getHTLock().unlock();
+ bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ }
+ ret = addTempItemAndBGFetch(
+ hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
+ break;
+ }
+ }
+
+ return ret;
+}
+
+ENGINE_ERROR_CODE VBucket::replace(Item& itm,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ const int bgFetchDelay) {
+ auto hbl = ht.getLockedBucket(itm.getKey());
+ StoredValue* v = ht.unlocked_find(itm.getKey(),
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+ if (v) {
+ if (v->isDeleted() || v->isTempDeletedItem() ||
+ v->isTempNonExistentItem()) {
+ return ENGINE_KEY_ENOENT;
+ }
+
+ MutationStatus mtype;
+ VBNotifyCtx notifyCtx;
+ if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
+ mtype = MutationStatus::NeedBgFetch;
+ } else {
+ PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
+ VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
+ GenerateCas::Yes,
+ TrackCasDrift::No,
+ /*isBackfillItem*/ false,
+ &preLinkDocumentContext);
+ std::tie(mtype, notifyCtx) = processSet(hbl,
+ v,
+ itm,
+ 0,
+ /*allowExisting*/ true,
+ /*hasMetaData*/ false,
+ &queueItmCtx);
+ }
+
+ ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
+ switch (mtype) {
+ case MutationStatus::NoMem:
+ ret = ENGINE_ENOMEM;
+ break;
+ case MutationStatus::IsLocked:
+ ret = ENGINE_LOCKED;
+ break;
+ case MutationStatus::InvalidCas:
+ case MutationStatus::NotFound:
+ ret = ENGINE_NOT_STORED;
+ break;
+ // FALLTHROUGH
+ case MutationStatus::WasDirty:
+ // Even if the item was dirty, push it into the vbucket's open
+ // checkpoint.
+ case MutationStatus::WasClean:
+ notifyNewSeqno(notifyCtx);
+
+ itm.setBySeqno(v->getBySeqno());
+ itm.setCas(v->getCas());
+ break;
+ case MutationStatus::NeedBgFetch: {
+ // temp item is already created. Simply schedule a bg fetch job
+ hbl.getHTLock().unlock();
+ bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
+ ret = ENGINE_EWOULDBLOCK;
+ break;
+ }
+ }
+
+ return ret;
+ } else {
+ if (eviction == VALUE_ONLY) {
+ return ENGINE_KEY_ENOENT;
+ }
+
+ if (maybeKeyExistsInFilter(itm.getKey())) {
+ return addTempItemAndBGFetch(
+ hbl, itm.getKey(), cookie, engine, bgFetchDelay, false);
+ } else {
+ // As bloomfilter predicted that item surely doesn't exist
+ // on disk, return ENOENT for replace().
+ return ENGINE_KEY_ENOENT;
+ }
+ }
+}
+
+ENGINE_ERROR_CODE VBucket::addBackfillItem(Item& itm,
+ const GenerateBySeqno genBySeqno) {
+ auto hbl = ht.getLockedBucket(itm.getKey());
+ StoredValue* v = ht.unlocked_find(itm.getKey(),
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+
+ // Note that this function is only called on replica or pending vbuckets.
+ if (v && v->isLocked(ep_current_time())) {
+ v->unlock();
+ }
+
+ VBQueueItemCtx queueItmCtx(genBySeqno,
+ GenerateCas::No,
+ TrackCasDrift::No,
+ /*isBackfillItem*/ true,
+ nullptr /* No pre link should happen */);
+ MutationStatus status;
+ VBNotifyCtx notifyCtx;
+ std::tie(status, notifyCtx) = processSet(hbl,
+ v,
+ itm,
+ 0,
+ /*allowExisting*/ true,
+ /*hasMetaData*/ true,
+ &queueItmCtx);
+
+ ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
+ switch (status) {
+ case MutationStatus::NoMem:
+ ret = ENGINE_ENOMEM;
+ break;
+ case MutationStatus::InvalidCas:
+ case MutationStatus::IsLocked:
+ ret = ENGINE_KEY_EEXISTS;
+ break;
+ case MutationStatus::WasDirty:
+ // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
+ // set correctly, and also the sequence numbers are ordered correctly.
+ // (MB-14003)
+ case MutationStatus::NotFound:
+ // FALLTHROUGH
+ case MutationStatus::WasClean: {
+ setMaxCas(v->getCas());
+ // we unlock ht lock here because we want to avoid potential lock
+ // inversions arising from notifyNewSeqno() call
+ hbl.getHTLock().unlock();
+ notifyNewSeqno(notifyCtx);
+ } break;
+ case MutationStatus::NeedBgFetch:
+ throw std::logic_error(
+ "VBucket::addBackfillItem: "
+ "SET on a non-active vbucket should not require a "
+ "bg_metadata_fetch.");
+ }
+
+ return ret;
+}
+
+ENGINE_ERROR_CODE VBucket::setWithMeta(Item& itm,
+ const uint64_t cas,
+ uint64_t* seqno,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ const int bgFetchDelay,
+ const bool force,
+ const bool allowExisting,
+ const GenerateBySeqno genBySeqno,
+ const GenerateCas genCas,
+ const bool isReplication) {
+ auto hbl = ht.getLockedBucket(itm.getKey());
+ StoredValue* v = ht.unlocked_find(itm.getKey(),
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+
+ bool maybeKeyExists = true;
+ if (!force) {
+ if (v) {
+ if (v->isTempInitialItem()) {
+ bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ }
+
+ if (!(conflictResolver->resolve(*v,
+ itm.getMetaData(),
+ itm.getDataType(),
+ itm.isDeleted()))) {
+ ++stats.numOpsSetMetaResolutionFailed;
+ return ENGINE_KEY_EEXISTS;
+ }
+ } else {
+ if (maybeKeyExistsInFilter(itm.getKey())) {
+ return addTempItemAndBGFetch(hbl,
+ itm.getKey(),
+ cookie,
+ engine,
+ bgFetchDelay,
+ true,
+ isReplication);
+ } else {
+ maybeKeyExists = false;
+ }
+ }
+ } else {
+ if (eviction == FULL_EVICTION) {
+ // Check Bloomfilter's prediction
+ if (!maybeKeyExistsInFilter(itm.getKey())) {
+ maybeKeyExists = false;
+ }
+ }
+ }
+
+ if (v && v->isLocked(ep_current_time()) &&
+ (getState() == vbucket_state_replica ||
+ getState() == vbucket_state_pending)) {
+ v->unlock();
+ }
+
+ VBQueueItemCtx queueItmCtx(genBySeqno,
+ genCas,
+ TrackCasDrift::Yes,
+ /*isBackfillItem*/ false,
+ nullptr /* No pre link step needed */);
+ MutationStatus status;
+ VBNotifyCtx notifyCtx;
+ std::tie(status, notifyCtx) = processSet(hbl,
+ v,
+ itm,
+ cas,
+ allowExisting,
+ true,
+ &queueItmCtx,
+ maybeKeyExists,
+ isReplication);
+
+ ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
+ switch (status) {
+ case MutationStatus::NoMem:
+ ret = ENGINE_ENOMEM;
+ break;
+ case MutationStatus::InvalidCas:
+ ret = ENGINE_KEY_EEXISTS;
+ break;
+ case MutationStatus::IsLocked:
+ ret = ENGINE_LOCKED;
+ break;
+ case MutationStatus::WasDirty:
+ case MutationStatus::WasClean: {
+ if (seqno) {
+ *seqno = static_cast<uint64_t>(v->getBySeqno());
+ }
+ // we unlock ht lock here because we want to avoid potential lock
+ // inversions arising from notifyNewSeqno() call
+ hbl.getHTLock().unlock();
+ notifyNewSeqno(notifyCtx);
+ } break;
+ case MutationStatus::NotFound:
+ ret = ENGINE_KEY_ENOENT;
+ break;
+ case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
+ // + full eviction.
+ if (v) { // temp item is already created. Simply schedule a
+ hbl.getHTLock().unlock(); // bg fetch job.
+ bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ }
+ ret = addTempItemAndBGFetch(hbl,
+ itm.getKey(),
+ cookie,
+ engine,
+ bgFetchDelay,
+ true,
+ isReplication);
+ }
+ }
+
+ return ret;
+}
+
+ENGINE_ERROR_CODE VBucket::deleteItem(const DocKey& key,
+ uint64_t& cas,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ const int bgFetchDelay,
+ ItemMetaData* itemMeta,
+ mutation_descr_t* mutInfo) {
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = ht.unlocked_find(
+ key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
+
+ if (!v || v->isDeleted() || v->isTempItem()) {
+ if (eviction == VALUE_ONLY) {
+ return ENGINE_KEY_ENOENT;
+ } else { // Full eviction.
+ if (!v) { // Item might be evicted from cache.
+ if (maybeKeyExistsInFilter(key)) {
+ return addTempItemAndBGFetch(
+ hbl, key, cookie, engine, bgFetchDelay, true);
+ } else {
+ // As bloomfilter predicted that item surely doesn't
+ // exist on disk, return ENOENT for deleteItem().
+ return ENGINE_KEY_ENOENT;
+ }
+ } else if (v->isTempInitialItem()) {
+ hbl.getHTLock().unlock();
+ bgFetch(key, cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ } else { // Non-existent or deleted key.
+ if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
+ // Delete a temp non-existent item to ensure that
+ // if a delete were issued over an item that doesn't
+ // exist, then we don't preserve a temp item.
+ deleteStoredValue(hbl, *v);
+ }
+ return ENGINE_KEY_ENOENT;
+ }
+ }
+ }
+
+ if (v->isLocked(ep_current_time()) &&
+ (getState() == vbucket_state_replica ||
+ getState() == vbucket_state_pending)) {
+ v->unlock();
+ }
+
+ if (itemMeta != nullptr) {
+ itemMeta->cas = v->getCas();
+ }
+
+ MutationStatus delrv;
+ VBNotifyCtx notifyCtx;
+ if (v->isExpired(ep_real_time())) {
+ std::tie(delrv, v, notifyCtx) = processExpiredItem(hbl, *v);
+ } else {
+ ItemMetaData metadata;
+ metadata.revSeqno = v->getRevSeqno() + 1;
+ std::tie(delrv, v, notifyCtx) =
+ processSoftDelete(hbl,
+ *v,
+ cas,
+ metadata,
+ VBQueueItemCtx(GenerateBySeqno::Yes,
+ GenerateCas::Yes,
+ TrackCasDrift::No,
+ /*isBackfillItem*/ false,
+ nullptr /* no pre link */),
+ /*use_meta*/ false,
+ /*bySeqno*/ v->getBySeqno());
+ }
+
+ uint64_t seqno = 0;
+ ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
+ switch (delrv) {
+ case MutationStatus::NoMem:
+ ret = ENGINE_ENOMEM;
+ break;
+ case MutationStatus::InvalidCas:
+ ret = ENGINE_KEY_EEXISTS;
+ break;
+ case MutationStatus::IsLocked:
+ ret = ENGINE_LOCKED_TMPFAIL;
+ break;
+ case MutationStatus::NotFound:
+ ret = ENGINE_KEY_ENOENT;
+ /* Fallthrough:
+ * A NotFound return value at this point indicates that the
+ * item has expired. But, a deletion still needs to be queued
+ * for the item in order to persist it.
+ */
+ case MutationStatus::WasClean:
+ case MutationStatus::WasDirty:
+ if (itemMeta != nullptr) {
+ itemMeta->revSeqno = v->getRevSeqno();
+ itemMeta->flags = v->getFlags();
+ itemMeta->exptime = v->getExptime();
+ }
+
+ notifyNewSeqno(notifyCtx);
+ seqno = static_cast<uint64_t>(v->getBySeqno());
+ cas = v->getCas();
+
+ if (delrv != MutationStatus::NotFound) {
+ if (mutInfo) {
+ mutInfo->seqno = seqno;
+ mutInfo->vbucket_uuid = failovers->getLatestUUID();
+ }
+ if (itemMeta != nullptr) {
+ itemMeta->cas = v->getCas();
+ }
+ }
+ break;
+ case MutationStatus::NeedBgFetch:
+ // We already figured out if a bg fetch is requred for a full-evicted
+ // item above.
+ throw std::logic_error(
+ "VBucket::deleteItem: "
+ "Unexpected NEEDS_BG_FETCH from processSoftDelete");
+ }
+ return ret;
+}
+
+ENGINE_ERROR_CODE VBucket::deleteWithMeta(const DocKey& key,
+ uint64_t& cas,
+ uint64_t* seqno,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ const int bgFetchDelay,
+ const bool force,
+ const ItemMetaData& itemMeta,
+ const bool backfill,
+ const GenerateBySeqno genBySeqno,
+ const GenerateCas generateCas,
+ const uint64_t bySeqno,
+ const bool isReplication) {
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = ht.unlocked_find(
+ key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
+ if (!force) { // Need conflict resolution.
+ if (v) {
+ if (v->isTempInitialItem()) {
+ bgFetch(key, cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ }
+
+ if (!(conflictResolver->resolve(*v,
+ itemMeta,
+ PROTOCOL_BINARY_RAW_BYTES,
+ true))) {
+ ++stats.numOpsDelMetaResolutionFailed;
+ return ENGINE_KEY_EEXISTS;
+ }
+ } else {
+ // Item is 1) deleted or not existent in the value eviction case OR
+ // 2) deleted or evicted in the full eviction.
+ if (maybeKeyExistsInFilter(key)) {
+ return addTempItemAndBGFetch(hbl,
+ key,
+ cookie,
+ engine,
+ bgFetchDelay,
+ true,
+ isReplication);
+ } else {
+ // Even though bloomfilter predicted that item doesn't exist
+ // on disk, we must put this delete on disk if the cas is valid.
+ AddStatus rv = addTempStoredValue(hbl, key, isReplication);
+ if (rv == AddStatus::NoMem) {
+ return ENGINE_ENOMEM;
+ }
+ v = ht.unlocked_find(key,
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+ v->setDeleted();
+ }
+ }
+ } else {
+ if (!v) {
+ // We should always try to persist a delete here.
+ AddStatus rv = addTempStoredValue(hbl, key, isReplication);
+ if (rv == AddStatus::NoMem) {
+ return ENGINE_ENOMEM;
+ }
+ v = ht.unlocked_find(key,
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+ v->setDeleted();
+ v->setCas(cas);
+ } else if (v->isTempInitialItem()) {
+ v->setDeleted();
+ v->setCas(cas);
+ }
+ }
+
+ if (v && v->isLocked(ep_current_time()) &&
+ (getState() == vbucket_state_replica ||
+ getState() == vbucket_state_pending)) {
+ v->unlock();
+ }
+
+ MutationStatus delrv;
+ VBNotifyCtx notifyCtx;
+ if (!v) {
+ if (eviction == FULL_EVICTION) {
+ delrv = MutationStatus::NeedBgFetch;
+ } else {
+ delrv = MutationStatus::NotFound;
+ }
+ } else {
+ VBQueueItemCtx queueItmCtx(genBySeqno,
+ generateCas,
+ TrackCasDrift::Yes,
+ backfill,
+ nullptr /* No pre link step needed */);
+
+ // system xattrs must remain
+ std::unique_ptr<Item> itm;
+ if (mcbp::datatype::is_xattr(v->getDatatype()) &&
+ (itm = pruneXattrDocument(*v, itemMeta))) {
+ std::tie(v, delrv, notifyCtx) =
+ updateStoredValue(hbl, *v, *itm, &queueItmCtx);
+ } else {
+ std::tie(delrv, v, notifyCtx) = processSoftDelete(hbl,
+ *v,
+ cas,
+ itemMeta,
+ queueItmCtx,
+ /*use_meta*/ true,
+ bySeqno);
+ }
+ }
+ cas = v ? v->getCas() : 0;
+
+ switch (delrv) {
+ case MutationStatus::NoMem:
+ return ENGINE_ENOMEM;
+ case MutationStatus::InvalidCas:
+ return ENGINE_KEY_EEXISTS;
+ case MutationStatus::IsLocked:
+ return ENGINE_LOCKED_TMPFAIL;
+ case MutationStatus::NotFound:
+ return ENGINE_KEY_ENOENT;
+ case MutationStatus::WasDirty:
+ case MutationStatus::WasClean: {
+ if (seqno) {
+ *seqno = static_cast<uint64_t>(v->getBySeqno());
+ }
+ // we unlock ht lock here because we want to avoid potential lock
+ // inversions arising from notifyNewSeqno() call
+ hbl.getHTLock().unlock();
+ notifyNewSeqno(notifyCtx);
+ break;
+ }
+ case MutationStatus::NeedBgFetch:
+ hbl.getHTLock().unlock();
+ bgFetch(key, cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ }
+ return ENGINE_SUCCESS;
+}
+
+void VBucket::deleteExpiredItem(const Item& it,
+ time_t startTime,
+ ExpireBy source) {
+
+ // The item is correctly trimmed (by the caller). Fetch the one in the
+ // hashtable and replace it if the CAS match (same item; no race).
+ // If not found in the hashtable we should add it as a deleted item
+ const DocKey& key = it.getKey();
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = ht.unlocked_find(
+ key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
+ if (v) {
+ if (v->getCas() != it.getCas()) {
+ return;
+ }
+
+ if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
+ bool deleted = deleteStoredValue(hbl, *v);
+ if (!deleted) {
+ throw std::logic_error(
+ "VBucket::deleteExpiredItem: "
+ "Failed to delete seqno:" +
+ std::to_string(v->getBySeqno()) + " from bucket " +
+ std::to_string(hbl.getBucketNum()));
+ }
+ } else if (v->isExpired(startTime) && !v->isDeleted()) {
+ VBNotifyCtx notifyCtx;
+ std::tie(std::ignore, std::ignore, notifyCtx) =
+ processExpiredItem(hbl, *v);
+ // we unlock ht lock here because we want to avoid potential lock
+ // inversions arising from notifyNewSeqno() call
+ hbl.getHTLock().unlock();
+ notifyNewSeqno(notifyCtx);
+ }
+ } else {
+ if (eviction == FULL_EVICTION) {
+ // Create a temp item and delete and push it
+ // into the checkpoint queue, only if the bloomfilter
+ // predicts that the item may exist on disk.
+ if (maybeKeyExistsInFilter(key)) {
+ AddStatus rv = addTempStoredValue(hbl, key);
+ if (rv == AddStatus::NoMem) {
+ return;
+ }
+ v = ht.unlocked_find(key,
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+ v->setDeleted();
+ v->setRevSeqno(it.getRevSeqno());
+ v->setValue(it, ht);
+ VBNotifyCtx notifyCtx;
+ std::tie(std::ignore, std::ignore, notifyCtx) =
+ processExpiredItem(hbl, *v);
+ // we unlock ht lock here because we want to avoid potential
+ // lock inversions arising from notifyNewSeqno() call
+ hbl.getHTLock().unlock();
+ notifyNewSeqno(notifyCtx);
+ }
+ }
+ }
+ incExpirationStat(source);
+}
+
+ENGINE_ERROR_CODE VBucket::add(Item& itm,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ const int bgFetchDelay) {
+ auto hbl = ht.getLockedBucket(itm.getKey());
+ StoredValue* v = ht.unlocked_find(itm.getKey(),
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+
+ bool maybeKeyExists = true;
+ if ((v == nullptr || v->isTempInitialItem()) &&
+ (eviction == FULL_EVICTION)) {
+ // Check bloomfilter's prediction
+ if (!maybeKeyExistsInFilter(itm.getKey())) {
+ maybeKeyExists = false;
+ }
+ }
+
+ PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
+ VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
+ GenerateCas::Yes,
+ TrackCasDrift::No,
+ /*isBackfillItem*/ false,
+ &preLinkDocumentContext);
+ AddStatus status;
+ VBNotifyCtx notifyCtx;
+ std::tie(status, notifyCtx) =
+ processAdd(hbl, v, itm, maybeKeyExists, false, &queueItmCtx);
+
+ switch (status) {
+ case AddStatus::NoMem:
+ return ENGINE_ENOMEM;
+ case AddStatus::Exists:
+ return ENGINE_NOT_STORED;
+ case AddStatus::AddTmpAndBgFetch:
+ return addTempItemAndBGFetch(
+ hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
+ case AddStatus::BgFetch:
+ hbl.getHTLock().unlock();
+ bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ case AddStatus::Success:
+ case AddStatus::UnDel:
+ notifyNewSeqno(notifyCtx);
+ itm.setBySeqno(v->getBySeqno());
+ itm.setCas(v->getCas());
+ break;
+ }
+ return ENGINE_SUCCESS;
+}
+
+GetValue VBucket::getAndUpdateTtl(const DocKey& key,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ int bgFetchDelay,
+ time_t exptime) {
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = fetchValidValue(hbl,
+ key,
+ WantsDeleted::Yes,
+ TrackReference::Yes,
+ QueueExpired::Yes);
+
+ if (v) {
+ if (v->isDeleted() || v->isTempDeletedItem() ||
+ v->isTempNonExistentItem()) {
+ return {};
+ }
+
+ if (!v->isResident()) {
+ bgFetch(key, cookie, engine, bgFetchDelay);
+ return GetValue(nullptr, ENGINE_EWOULDBLOCK, v->getBySeqno());
+ }
+ if (v->isLocked(ep_current_time())) {
+ return GetValue(nullptr, ENGINE_KEY_EEXISTS, 0);
+ }
+
+ const bool exptime_mutated = exptime != v->getExptime();
+ if (exptime_mutated) {
+ v->markDirty();
+ v->setExptime(exptime);
+ v->setRevSeqno(v->getRevSeqno() + 1);
+ }
+
+ GetValue rv(
+ v->toItem(v->isLocked(ep_current_time()), getId()).release(),
+ ENGINE_SUCCESS,
+ v->getBySeqno());
+
+ if (exptime_mutated) {
+ VBNotifyCtx notifyCtx = queueDirty(*v);
+ rv.getValue()->setCas(v->getCas());
+ // we unlock ht lock here because we want to avoid potential lock
+ // inversions arising from notifyNewSeqno() call
+ hbl.getHTLock().unlock();
+ notifyNewSeqno(notifyCtx);
+ }
+
+ return rv;
+ } else {
+ if (eviction == VALUE_ONLY) {
+ return {};
+ } else {
+ if (maybeKeyExistsInFilter(key)) {
+ ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
+ hbl, key, cookie, engine, bgFetchDelay, false);
+ return GetValue(NULL, ec, -1, true);
+ } else {
+ // As bloomfilter predicted that item surely doesn't exist
+ // on disk, return ENOENT for getAndUpdateTtl().
+ return {};
+ }
+ }
+ }
+}
+
+MutationStatus VBucket::insertFromWarmup(Item& itm,
+ bool eject,
+ bool keyMetaDataOnly) {
+ if (!StoredValue::hasAvailableSpace(stats, itm)) {
+ return MutationStatus::NoMem;
+ }
+
+ auto hbl = ht.getLockedBucket(itm.getKey());
+ StoredValue* v = ht.unlocked_find(itm.getKey(),
+ hbl.getBucketNum(),
+ WantsDeleted::Yes,
+ TrackReference::No);
+
+ if (v == NULL) {
+ v = addNewStoredValue(hbl, itm, /*queueItmCtx*/ nullptr).first;
+ if (keyMetaDataOnly) {
+ v->markNotResident();
+ /* For now ht stats are updated from outside ht. This seems to be
+ a better option for now than passing a flag to
+ addNewStoredValue() just for this func */
+ ++(ht.numNonResidentItems);
+ }
+ /* For now ht stats are updated from outside ht. This seems to be
+ a better option for now than passing a flag to
+ addNewStoredValue() just for this func.
+ We need to decrNumTotalItems because ht.numTotalItems is already
+ set by warmup when it estimated the item count from disk */
+ ht.decrNumTotalItems();
+ v->setNewCacheItem(false);
+ } else {
+ if (keyMetaDataOnly) {
+ // We don't have a better error code ;)
+ return MutationStatus::InvalidCas;
+ }
+
+ // Verify that the CAS isn't changed
+ if (v->getCas() != itm.getCas()) {
+ if (v->getCas() == 0) {
+ v->setCas(itm.getCas());
+ v->setFlags(itm.getFlags());
+ v->setExptime(itm.getExptime());
+ v->setRevSeqno(itm.getRevSeqno());
+ } else {
+ return MutationStatus::InvalidCas;
+ }
+ }
+ updateStoredValue(hbl, *v, itm, /*queueItmCtx*/ nullptr);
+ }
+
+ v->markClean();
+
+ if (eject && !keyMetaDataOnly) {
+ ht.unlocked_ejectItem(v, eviction);
+ }
+
+ return MutationStatus::NotFound;
+}
+
+GetValue VBucket::getInternal(const DocKey& key,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ int bgFetchDelay,
+ get_options_t options,
+ bool diskFlushAll) {
+ const TrackReference trackReference = (options & TRACK_REFERENCE)
+ ? TrackReference::Yes
+ : TrackReference::No;
+ const bool getDeletedValue = (options & GET_DELETED_VALUE);
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = fetchValidValue(
+ hbl, key, WantsDeleted::Yes, trackReference, QueueExpired::Yes);
+ if (v) {
+ if (v->isDeleted() && !getDeletedValue) {
+ return GetValue();
+ }
+ if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
+ // Delete a temp non-existent item to ensure that
+ // if the get were issued over an item that doesn't
+ // exist, then we dont preserve a temp item.
+ if (options & DELETE_TEMP) {
+ deleteStoredValue(hbl, *v);
+ }
+ return GetValue();
+ }
+
+ // If the value is not resident, wait for it...
+ if (!v->isResident()) {
+ return getInternalNonResident(
+ key, cookie, engine, bgFetchDelay, options, *v);
+ }
+
+ // Should we hide (return -1) for the items' CAS?
+ const bool hide_cas =
+ (options & HIDE_LOCKED_CAS) && v->isLocked(ep_current_time());
+ return GetValue(v->toItem(hide_cas, getId()).release(),
+ ENGINE_SUCCESS,
+ v->getBySeqno(),
+ false,
+ v->getNRUValue());
+ } else {
+ if (!getDeletedValue && (eviction == VALUE_ONLY || diskFlushAll)) {
+ return GetValue();
+ }
+
+ if (maybeKeyExistsInFilter(key)) {
+ ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
+ if (options &
+ QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
+ ec = addTempItemAndBGFetch(
+ hbl, key, cookie, engine, bgFetchDelay, false);
+ }
+ return GetValue(NULL, ec, -1, true);
+ } else {
+ // As bloomfilter predicted that item surely doesn't exist
+ // on disk, return ENOENT, for getInternal().
+ return GetValue();
+ }
+ }
+}
+
+ENGINE_ERROR_CODE VBucket::getMetaData(const DocKey& key,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ int bgFetchDelay,
+ ItemMetaData& metadata,
+ uint32_t& deleted,
+ uint8_t& datatype) {
+ deleted = 0;
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = ht.unlocked_find(
+ key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
+
+ if (v) {
+ stats.numOpsGetMeta++;
+ if (v->isTempInitialItem()) {
+ // Need bg meta fetch.
+ bgFetch(key, cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ } else if (v->isTempNonExistentItem()) {
+ metadata.cas = v->getCas();
+ return ENGINE_KEY_ENOENT;
+ } else {
+ if (v->isTempDeletedItem() || v->isDeleted() ||
+ v->isExpired(ep_real_time())) {
+ deleted |= GET_META_ITEM_DELETED_FLAG;
+ }
+
+ if (v->isLocked(ep_current_time())) {
+ metadata.cas = static_cast<uint64_t>(-1);
+ } else {
+ metadata.cas = v->getCas();
+ }
+ metadata.flags = v->getFlags();
+ metadata.exptime = v->getExptime();
+ metadata.revSeqno = v->getRevSeqno();
+ datatype = v->getDatatype();
+
+ return ENGINE_SUCCESS;
+ }
+ } else {
+ // The key wasn't found. However, this may be because it was previously
+ // deleted or evicted with the full eviction strategy.
+ // So, add a temporary item corresponding to the key to the hash table
+ // and schedule a background fetch for its metadata from the persistent
+ // store. The item's state will be updated after the fetch completes.
+ //
+ // Schedule this bgFetch only if the key is predicted to be may-be
+ // existent on disk by the bloomfilter.
+
+ if (maybeKeyExistsInFilter(key)) {
+ return addTempItemAndBGFetch(
+ hbl, key, cookie, engine, bgFetchDelay, true);
+ } else {
+ stats.numOpsGetMeta++;
+ return ENGINE_KEY_ENOENT;
+ }
+ }
+}
+
+ENGINE_ERROR_CODE VBucket::getKeyStats(const DocKey& key,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ int bgFetchDelay,
+ struct key_stats& kstats,
+ WantsDeleted wantsDeleted) {
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = fetchValidValue(hbl,
+ key,
+ WantsDeleted::Yes,
+ TrackReference::Yes,
+ QueueExpired::Yes);
+
+ if (v) {
+ if ((v->isDeleted() && wantsDeleted == WantsDeleted::No) ||
+ v->isTempNonExistentItem() || v->isTempDeletedItem()) {
+ return ENGINE_KEY_ENOENT;
+ }
+ if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
+ hbl.getHTLock().unlock();
+ bgFetch(key, cookie, engine, bgFetchDelay, true);
+ return ENGINE_EWOULDBLOCK;
+ }
+ kstats.logically_deleted = v->isDeleted();
+ kstats.dirty = v->isDirty();
+ kstats.exptime = v->getExptime();
+ kstats.flags = v->getFlags();
+ kstats.cas = v->getCas();
+ kstats.vb_state = getState();
+ return ENGINE_SUCCESS;
+ } else {
+ if (eviction == VALUE_ONLY) {
+ return ENGINE_KEY_ENOENT;
+ } else {
+ if (maybeKeyExistsInFilter(key)) {
+ return addTempItemAndBGFetch(
+ hbl, key, cookie, engine, bgFetchDelay, true);
+ } else {
+ // If bgFetch were false, or bloomfilter predicted that
+ // item surely doesn't exist on disk, return ENOENT for
+ // getKeyStats().
+ return ENGINE_KEY_ENOENT;
+ }
+ }
+ }
+}
+
+GetValue VBucket::getLocked(const DocKey& key,
+ rel_time_t currentTime,
+ uint32_t lockTimeout,
+ const void* cookie,
+ EventuallyPersistentEngine& engine,
+ int bgFetchDelay) {
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = fetchValidValue(hbl,
+ key,
+ WantsDeleted::Yes,
+ TrackReference::Yes,
+ QueueExpired::Yes);
+
+ if (v) {
+ if (v->isDeleted() || v->isTempNonExistentItem() ||
+ v->isTempDeletedItem()) {
+ return GetValue(NULL, ENGINE_KEY_ENOENT);
+ }
+
+ // if v is locked return error
+ if (v->isLocked(currentTime)) {
+ return GetValue(NULL, ENGINE_TMPFAIL);
+ }
+
+ // If the value is not resident, wait for it...
+ if (!v->isResident()) {
+ if (cookie) {
+ bgFetch(key, cookie, engine, bgFetchDelay);
+ }
+ return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
+ }
+
+ // acquire lock and increment cas value
+ v->lock(currentTime + lockTimeout);
+
+ auto it = v->toItem(false, getId());
+ it->setCas(nextHLCCas());
+ v->setCas(it->getCas());
+
+ return GetValue(it.release());
+
+ } else {
+ // No value found in the hashtable.
+ switch (eviction) {
+ case VALUE_ONLY:
+ return GetValue(NULL, ENGINE_KEY_ENOENT);
+
+ case FULL_EVICTION:
+ if (maybeKeyExistsInFilter(key)) {
+ ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
+ hbl, key, cookie, engine, bgFetchDelay, false);
+ return GetValue(NULL, ec, -1, true);
+ } else {
+ // As bloomfilter predicted that item surely doesn't exist
+ // on disk, return ENOENT for getLocked().
+ return GetValue(NULL, ENGINE_KEY_ENOENT);
+ }
+ }
+ return GetValue(); // just to prevent compiler warning
+ }
+}
+
+void VBucket::deletedOnDiskCbk(const Item& queuedItem, bool deleted) {
+ auto hbl = ht.getLockedBucket(queuedItem.getKey());
+ StoredValue* v = fetchValidValue(hbl,
+ queuedItem.getKey(),
+ WantsDeleted::Yes,
+ TrackReference::No,
+ QueueExpired::Yes);
+ // Delete the item in the hash table iff:
+ // 1. Item is existent in hashtable, and deleted flag is true
+ // 2. rev seqno of queued item matches rev seqno of hash table item
+ if (v && v->isDeleted() && (queuedItem.getRevSeqno() == v->getRevSeqno())) {
+ bool isDeleted = deleteStoredValue(hbl, *v);
+ if (!isDeleted) {
+ throw std::logic_error(
+ "deletedOnDiskCbk:callback: "
+ "Failed to delete key with seqno:" +
+ std::to_string(v->getBySeqno()) + "' from bucket " +
+ std::to_string(hbl.getBucketNum()));
+ }
+
+ /**
+ * Deleted items are to be added to the bloomfilter,
+ * in either eviction policy.
+ */
+ addToFilter(queuedItem.getKey());
+ }
+
+ if (deleted) {
+ ++stats.totalPersisted;
+ ++opsDelete;
+ }
+ doStatsForFlushing(queuedItem, queuedItem.size());
+ --stats.diskQueueSize;
+ decrMetaDataDisk(queuedItem);
+}
+
+bool VBucket::deleteKey(const DocKey& key) {
+ auto hbl = ht.getLockedBucket(key);
+ StoredValue* v = ht.unlocked_find(
+ key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
+ if (!v) {
+ return false;
+ }
+ return deleteStoredValue(hbl, *v);
+}
+
+void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
+ uint64_t prevHighSeqno) {
+ failovers->pruneEntries(rollbackResult.highSeqno);
+ checkpointManager.clear(*this, rollbackResult.highSeqno);
+ setPersistedSnapshot(rollbackResult.snapStartSeqno,
+ rollbackResult.snapEndSeqno);
+ incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
- setBackfillPhase(false);
++ checkpointManager.setOpenCheckpointId(1);
+}
+
+void VBucket::dump() const {
+ std::cerr << "VBucket[" << this << "] with state: " << toString(getState())
+ << " numItems:" << getNumItems()
+ << " numNonResident:" << getNumNonResidentItems()
+ << " ht: " << std::endl << " " << ht << std::endl
+ << "]" << std::endl;
+}
+
+void VBucket::_addStats(bool details, ADD_STAT add_stat, const void* c) {
addStat(NULL, toString(state), add_stat, c);
if (details) {
- size_t numItems = getNumItems(policy);
+ size_t numItems = getNumItems();
size_t tempItems = getNumTempItems();
addStat("num_items", numItems, add_stat, c);
addStat("num_temp_items", tempItems, add_stat, c);