Merge branch 'watson' 81/77881/2
authorDave Rigby <daver@couchbase.com>
Tue, 9 May 2017 10:43:12 +0000 (11:43 +0100)
committerDave Rigby <daver@couchbase.com>
Tue, 9 May 2017 10:45:44 +0000 (11:45 +0100)
* watson:
  MB-24066/MB-22178: Set opencheckpointid to 1 after rollback
  MB-24066: Partial revert "MB-22178: Don't use opencheckpointid to determine if in backfill phase"

Change-Id: If7d42bd933abf24a737d2718a4056e4536df2fd8

1  2 
src/dcp/producer.cc
src/dcp/stream.cc
src/vbucket.cc
tests/module_tests/evp_store_rollback_test.cc

@@@ -235,14 -199,7 +235,14 @@@ ENGINE_ERROR_CODE DcpProducer::streamRe
          return ENGINE_NOT_MY_VBUCKET;
      }
  
-     if (vb->isBackfillPhase()) {
 +    if ((flags & DCP_ADD_STREAM_ACTIVE_VB_ONLY) &&
 +        (vb->getState() != vbucket_state_active)) {
 +        LOG(EXTENSION_LOG_NOTICE, "%s (vb %d) Stream request failed because "
 +            "the vbucket is in %s state, only active vbuckets were requested",
 +            logHeader(), vbucket, vb->toString(vb->getState()));
 +        return ENGINE_NOT_MY_VBUCKET;
 +    }
+     if (vb->checkpointManager.getOpenCheckpointId() == 0) {
          LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
              "this vbucket is in backfill state", logHeader(), vbucket);
          return ENGINE_TMPFAIL;
Simple merge
diff --cc src/vbucket.cc
@@@ -586,1255 -630,11 +586,1255 @@@ size_t VBucket::getNumOfKeysInFilter() 
      }
  }
  
 -void VBucket::addStats(bool details, ADD_STAT add_stat, const void *c,
 -                       item_eviction_policy_t policy) {
 +VBNotifyCtx VBucket::queueDirty(
 +        StoredValue& v,
 +        const GenerateBySeqno generateBySeqno,
 +        const GenerateCas generateCas,
 +        const bool isBackfillItem,
 +        PreLinkDocumentContext* preLinkDocumentContext) {
 +    VBNotifyCtx notifyCtx;
 +
 +    queued_item qi(v.toItem(false, getId()));
 +
 +    if (isBackfillItem) {
 +        queueBackfillItem(qi, generateBySeqno);
 +        notifyCtx.notifyFlusher = true;
 +        /* During backfill on a TAP receiver we need to update the snapshot
 +         range in the checkpoint. Has to be done here because in case of TAP
 +         backfill, above, we use vb.queueBackfillItem() instead of
 +         vb.checkpointManager.queueDirty() */
 +        if (generateBySeqno == GenerateBySeqno::Yes) {
 +            checkpointManager.resetSnapshotRange();
 +        }
 +    } else {
 +        notifyCtx.notifyFlusher =
 +                checkpointManager.queueDirty(*this,
 +                                             qi,
 +                                             generateBySeqno,
 +                                             generateCas,
 +                                             preLinkDocumentContext);
 +        notifyCtx.notifyReplication = true;
 +        if (GenerateCas::Yes == generateCas) {
 +            v.setCas(qi->getCas());
 +        }
 +    }
 +
 +    v.setBySeqno(qi->getBySeqno());
 +    notifyCtx.bySeqno = qi->getBySeqno();
 +
 +    return notifyCtx;
 +}
 +
 +StoredValue* VBucket::fetchValidValue(HashTable::HashBucketLock& hbl,
 +                                      const DocKey& key,
 +                                      WantsDeleted wantsDeleted,
 +                                      TrackReference trackReference,
 +                                      QueueExpired queueExpired) {
 +    if (!hbl.getHTLock()) {
 +        throw std::logic_error(
 +                "Hash bucket lock not held in "
 +                "VBucket::fetchValidValue() for hash bucket: " +
 +                std::to_string(hbl.getBucketNum()) + "for key: " +
 +                std::string(reinterpret_cast<const char*>(key.data()),
 +                            key.size()));
 +    }
 +    StoredValue* v = ht.unlocked_find(
 +            key, hbl.getBucketNum(), wantsDeleted, trackReference);
 +    if (v && !v->isDeleted() && !v->isTempItem()) {
 +        // In the deleted case, we ignore expiration time.
 +        if (v->isExpired(ep_real_time())) {
 +            if (getState() != vbucket_state_active) {
 +                return wantsDeleted == WantsDeleted::Yes ? v : NULL;
 +            }
 +
 +            // queueDirty only allowed on active VB
 +            if (queueExpired == QueueExpired::Yes &&
 +                getState() == vbucket_state_active) {
 +                incExpirationStat(ExpireBy::Access);
 +                handlePreExpiry(*v);
 +                VBNotifyCtx notifyCtx;
 +                std::tie(std::ignore, v, notifyCtx) =
 +                        processExpiredItem(hbl, *v);
 +                notifyNewSeqno(notifyCtx);
 +            }
 +            return wantsDeleted == WantsDeleted::Yes ? v : NULL;
 +        }
 +    }
 +    return v;
 +}
 +
 +void VBucket::incExpirationStat(const ExpireBy source) {
 +    switch (source) {
 +    case ExpireBy::Pager:
 +        ++stats.expired_pager;
 +        break;
 +    case ExpireBy::Compactor:
 +        ++stats.expired_compactor;
 +        break;
 +    case ExpireBy::Access:
 +        ++stats.expired_access;
 +        break;
 +    }
 +    ++numExpiredItems;
 +}
 +
 +MutationStatus VBucket::setFromInternal(Item& itm) {
 +    return ht.set(itm);
 +}
 +
 +ENGINE_ERROR_CODE VBucket::set(Item& itm,
 +                               const void* cookie,
 +                               EventuallyPersistentEngine& engine,
 +                               const int bgFetchDelay) {
 +    bool cas_op = (itm.getCas() != 0);
 +    auto hbl = ht.getLockedBucket(itm.getKey());
 +    StoredValue* v = ht.unlocked_find(itm.getKey(),
 +                                      hbl.getBucketNum(),
 +                                      WantsDeleted::Yes,
 +                                      TrackReference::No);
 +    if (v && v->isLocked(ep_current_time()) &&
 +        (getState() == vbucket_state_replica ||
 +         getState() == vbucket_state_pending)) {
 +        v->unlock();
 +    }
 +
 +    bool maybeKeyExists = true;
 +    // If we didn't find a valid item, check Bloomfilter's prediction if in
 +    // full eviction policy and for a CAS operation.
 +    if ((v == nullptr || v->isTempInitialItem()) &&
 +        (eviction == FULL_EVICTION) && (itm.getCas() != 0)) {
 +        // Check Bloomfilter's prediction
 +        if (!maybeKeyExistsInFilter(itm.getKey())) {
 +            maybeKeyExists = false;
 +        }
 +    }
 +
 +    PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
 +    VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
 +                               GenerateCas::Yes,
 +                               TrackCasDrift::No,
 +                               /*isBackfillItem*/ false,
 +                               &preLinkDocumentContext);
 +
 +    MutationStatus status;
 +    VBNotifyCtx notifyCtx;
 +    std::tie(status, notifyCtx) = processSet(hbl,
 +                                             v,
 +                                             itm,
 +                                             itm.getCas(),
 +                                             /*allowExisting*/ true,
 +                                             /*hashMetaData*/ false,
 +                                             &queueItmCtx,
 +                                             maybeKeyExists);
 +
 +    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
 +    switch (status) {
 +    case MutationStatus::NoMem:
 +        ret = ENGINE_ENOMEM;
 +        break;
 +    case MutationStatus::InvalidCas:
 +        ret = ENGINE_KEY_EEXISTS;
 +        break;
 +    case MutationStatus::IsLocked:
 +        ret = ENGINE_LOCKED;
 +        break;
 +    case MutationStatus::NotFound:
 +        if (cas_op) {
 +            ret = ENGINE_KEY_ENOENT;
 +            break;
 +        }
 +    // FALLTHROUGH
 +    case MutationStatus::WasDirty:
 +    // Even if the item was dirty, push it into the vbucket's open
 +    // checkpoint.
 +    case MutationStatus::WasClean:
 +        notifyNewSeqno(notifyCtx);
 +
 +        itm.setBySeqno(v->getBySeqno());
 +        itm.setCas(v->getCas());
 +        break;
 +    case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
 +        // +
 +        // full eviction.
 +        if (v) {
 +            // temp item is already created. Simply schedule a bg fetch job
 +            hbl.getHTLock().unlock();
 +            bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
 +            return ENGINE_EWOULDBLOCK;
 +        }
 +        ret = addTempItemAndBGFetch(
 +                hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
 +        break;
 +    }
 +    }
 +
 +    return ret;
 +}
 +
 +ENGINE_ERROR_CODE VBucket::replace(Item& itm,
 +                                   const void* cookie,
 +                                   EventuallyPersistentEngine& engine,
 +                                   const int bgFetchDelay) {
 +    auto hbl = ht.getLockedBucket(itm.getKey());
 +    StoredValue* v = ht.unlocked_find(itm.getKey(),
 +                                      hbl.getBucketNum(),
 +                                      WantsDeleted::Yes,
 +                                      TrackReference::No);
 +    if (v) {
 +        if (v->isDeleted() || v->isTempDeletedItem() ||
 +            v->isTempNonExistentItem()) {
 +            return ENGINE_KEY_ENOENT;
 +        }
 +
 +        MutationStatus mtype;
 +        VBNotifyCtx notifyCtx;
 +        if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
 +            mtype = MutationStatus::NeedBgFetch;
 +        } else {
 +            PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
 +            VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
 +                                       GenerateCas::Yes,
 +                                       TrackCasDrift::No,
 +                                       /*isBackfillItem*/ false,
 +                                       &preLinkDocumentContext);
 +            std::tie(mtype, notifyCtx) = processSet(hbl,
 +                                                    v,
 +                                                    itm,
 +                                                    0,
 +                                                    /*allowExisting*/ true,
 +                                                    /*hasMetaData*/ false,
 +                                                    &queueItmCtx);
 +        }
 +
 +        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
 +        switch (mtype) {
 +        case MutationStatus::NoMem:
 +            ret = ENGINE_ENOMEM;
 +            break;
 +        case MutationStatus::IsLocked:
 +            ret = ENGINE_LOCKED;
 +            break;
 +        case MutationStatus::InvalidCas:
 +        case MutationStatus::NotFound:
 +            ret = ENGINE_NOT_STORED;
 +            break;
 +        // FALLTHROUGH
 +        case MutationStatus::WasDirty:
 +        // Even if the item was dirty, push it into the vbucket's open
 +        // checkpoint.
 +        case MutationStatus::WasClean:
 +            notifyNewSeqno(notifyCtx);
 +
 +            itm.setBySeqno(v->getBySeqno());
 +            itm.setCas(v->getCas());
 +            break;
 +        case MutationStatus::NeedBgFetch: {
 +            // temp item is already created. Simply schedule a bg fetch job
 +            hbl.getHTLock().unlock();
 +            bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
 +            ret = ENGINE_EWOULDBLOCK;
 +            break;
 +        }
 +        }
 +
 +        return ret;
 +    } else {
 +        if (eviction == VALUE_ONLY) {
 +            return ENGINE_KEY_ENOENT;
 +        }
 +
 +        if (maybeKeyExistsInFilter(itm.getKey())) {
 +            return addTempItemAndBGFetch(
 +                    hbl, itm.getKey(), cookie, engine, bgFetchDelay, false);
 +        } else {
 +            // As bloomfilter predicted that item surely doesn't exist
 +            // on disk, return ENOENT for replace().
 +            return ENGINE_KEY_ENOENT;
 +        }
 +    }
 +}
 +
 +ENGINE_ERROR_CODE VBucket::addBackfillItem(Item& itm,
 +                                           const GenerateBySeqno genBySeqno) {
 +    auto hbl = ht.getLockedBucket(itm.getKey());
 +    StoredValue* v = ht.unlocked_find(itm.getKey(),
 +                                      hbl.getBucketNum(),
 +                                      WantsDeleted::Yes,
 +                                      TrackReference::No);
 +
 +    // Note that this function is only called on replica or pending vbuckets.
 +    if (v && v->isLocked(ep_current_time())) {
 +        v->unlock();
 +    }
 +
 +    VBQueueItemCtx queueItmCtx(genBySeqno,
 +                               GenerateCas::No,
 +                               TrackCasDrift::No,
 +                               /*isBackfillItem*/ true,
 +                               nullptr /* No pre link should happen */);
 +    MutationStatus status;
 +    VBNotifyCtx notifyCtx;
 +    std::tie(status, notifyCtx) = processSet(hbl,
 +                                             v,
 +                                             itm,
 +                                             0,
 +                                             /*allowExisting*/ true,
 +                                             /*hasMetaData*/ true,
 +                                             &queueItmCtx);
 +
 +    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
 +    switch (status) {
 +    case MutationStatus::NoMem:
 +        ret = ENGINE_ENOMEM;
 +        break;
 +    case MutationStatus::InvalidCas:
 +    case MutationStatus::IsLocked:
 +        ret = ENGINE_KEY_EEXISTS;
 +        break;
 +    case MutationStatus::WasDirty:
 +    // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
 +    // set correctly, and also the sequence numbers are ordered correctly.
 +    // (MB-14003)
 +    case MutationStatus::NotFound:
 +    // FALLTHROUGH
 +    case MutationStatus::WasClean: {
 +        setMaxCas(v->getCas());
 +        // we unlock ht lock here because we want to avoid potential lock
 +        // inversions arising from notifyNewSeqno() call
 +        hbl.getHTLock().unlock();
 +        notifyNewSeqno(notifyCtx);
 +    } break;
 +    case MutationStatus::NeedBgFetch:
 +        throw std::logic_error(
 +                "VBucket::addBackfillItem: "
 +                "SET on a non-active vbucket should not require a "
 +                "bg_metadata_fetch.");
 +    }
 +
 +    return ret;
 +}
 +
 +ENGINE_ERROR_CODE VBucket::setWithMeta(Item& itm,
 +                                       const uint64_t cas,
 +                                       uint64_t* seqno,
 +                                       const void* cookie,
 +                                       EventuallyPersistentEngine& engine,
 +                                       const int bgFetchDelay,
 +                                       const bool force,
 +                                       const bool allowExisting,
 +                                       const GenerateBySeqno genBySeqno,
 +                                       const GenerateCas genCas,
 +                                       const bool isReplication) {
 +    auto hbl = ht.getLockedBucket(itm.getKey());
 +    StoredValue* v = ht.unlocked_find(itm.getKey(),
 +                                      hbl.getBucketNum(),
 +                                      WantsDeleted::Yes,
 +                                      TrackReference::No);
 +
 +    bool maybeKeyExists = true;
 +    if (!force) {
 +        if (v) {
 +            if (v->isTempInitialItem()) {
 +                bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
 +                return ENGINE_EWOULDBLOCK;
 +            }
 +
 +            if (!(conflictResolver->resolve(*v,
 +                                            itm.getMetaData(),
 +                                            itm.getDataType(),
 +                                            itm.isDeleted()))) {
 +                ++stats.numOpsSetMetaResolutionFailed;
 +                return ENGINE_KEY_EEXISTS;
 +            }
 +        } else {
 +            if (maybeKeyExistsInFilter(itm.getKey())) {
 +                return addTempItemAndBGFetch(hbl,
 +                                             itm.getKey(),
 +                                             cookie,
 +                                             engine,
 +                                             bgFetchDelay,
 +                                             true,
 +                                             isReplication);
 +            } else {
 +                maybeKeyExists = false;
 +            }
 +        }
 +    } else {
 +        if (eviction == FULL_EVICTION) {
 +            // Check Bloomfilter's prediction
 +            if (!maybeKeyExistsInFilter(itm.getKey())) {
 +                maybeKeyExists = false;
 +            }
 +        }
 +    }
 +
 +    if (v && v->isLocked(ep_current_time()) &&
 +        (getState() == vbucket_state_replica ||
 +         getState() == vbucket_state_pending)) {
 +        v->unlock();
 +    }
 +
 +    VBQueueItemCtx queueItmCtx(genBySeqno,
 +                               genCas,
 +                               TrackCasDrift::Yes,
 +                               /*isBackfillItem*/ false,
 +                               nullptr /* No pre link step needed */);
 +    MutationStatus status;
 +    VBNotifyCtx notifyCtx;
 +    std::tie(status, notifyCtx) = processSet(hbl,
 +                                             v,
 +                                             itm,
 +                                             cas,
 +                                             allowExisting,
 +                                             true,
 +                                             &queueItmCtx,
 +                                             maybeKeyExists,
 +                                             isReplication);
 +
 +    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
 +    switch (status) {
 +    case MutationStatus::NoMem:
 +        ret = ENGINE_ENOMEM;
 +        break;
 +    case MutationStatus::InvalidCas:
 +        ret = ENGINE_KEY_EEXISTS;
 +        break;
 +    case MutationStatus::IsLocked:
 +        ret = ENGINE_LOCKED;
 +        break;
 +    case MutationStatus::WasDirty:
 +    case MutationStatus::WasClean: {
 +        if (seqno) {
 +            *seqno = static_cast<uint64_t>(v->getBySeqno());
 +        }
 +        // we unlock ht lock here because we want to avoid potential lock
 +        // inversions arising from notifyNewSeqno() call
 +        hbl.getHTLock().unlock();
 +        notifyNewSeqno(notifyCtx);
 +    } break;
 +    case MutationStatus::NotFound:
 +        ret = ENGINE_KEY_ENOENT;
 +        break;
 +    case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
 +        // + full eviction.
 +        if (v) { // temp item is already created. Simply schedule a
 +            hbl.getHTLock().unlock(); // bg fetch job.
 +            bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
 +            return ENGINE_EWOULDBLOCK;
 +        }
 +        ret = addTempItemAndBGFetch(hbl,
 +                                    itm.getKey(),
 +                                    cookie,
 +                                    engine,
 +                                    bgFetchDelay,
 +                                    true,
 +                                    isReplication);
 +    }
 +    }
 +
 +    return ret;
 +}
 +
 +ENGINE_ERROR_CODE VBucket::deleteItem(const DocKey& key,
 +                                      uint64_t& cas,
 +                                      const void* cookie,
 +                                      EventuallyPersistentEngine& engine,
 +                                      const int bgFetchDelay,
 +                                      ItemMetaData* itemMeta,
 +                                      mutation_descr_t* mutInfo) {
 +    auto hbl = ht.getLockedBucket(key);
 +    StoredValue* v = ht.unlocked_find(
 +            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
 +
 +    if (!v || v->isDeleted() || v->isTempItem()) {
 +        if (eviction == VALUE_ONLY) {
 +            return ENGINE_KEY_ENOENT;
 +        } else { // Full eviction.
 +            if (!v) { // Item might be evicted from cache.
 +                if (maybeKeyExistsInFilter(key)) {
 +                    return addTempItemAndBGFetch(
 +                            hbl, key, cookie, engine, bgFetchDelay, true);
 +                } else {
 +                    // As bloomfilter predicted that item surely doesn't
 +                    // exist on disk, return ENOENT for deleteItem().
 +                    return ENGINE_KEY_ENOENT;
 +                }
 +            } else if (v->isTempInitialItem()) {
 +                hbl.getHTLock().unlock();
 +                bgFetch(key, cookie, engine, bgFetchDelay, true);
 +                return ENGINE_EWOULDBLOCK;
 +            } else { // Non-existent or deleted key.
 +                if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
 +                    // Delete a temp non-existent item to ensure that
 +                    // if a delete were issued over an item that doesn't
 +                    // exist, then we don't preserve a temp item.
 +                    deleteStoredValue(hbl, *v);
 +                }
 +                return ENGINE_KEY_ENOENT;
 +            }
 +        }
 +    }
 +
 +    if (v->isLocked(ep_current_time()) &&
 +        (getState() == vbucket_state_replica ||
 +         getState() == vbucket_state_pending)) {
 +        v->unlock();
 +    }
 +
 +    if (itemMeta != nullptr) {
 +        itemMeta->cas = v->getCas();
 +    }
 +
 +    MutationStatus delrv;
 +    VBNotifyCtx notifyCtx;
 +    if (v->isExpired(ep_real_time())) {
 +        std::tie(delrv, v, notifyCtx) = processExpiredItem(hbl, *v);
 +    } else {
 +        ItemMetaData metadata;
 +        metadata.revSeqno = v->getRevSeqno() + 1;
 +        std::tie(delrv, v, notifyCtx) =
 +                processSoftDelete(hbl,
 +                                  *v,
 +                                  cas,
 +                                  metadata,
 +                                  VBQueueItemCtx(GenerateBySeqno::Yes,
 +                                                 GenerateCas::Yes,
 +                                                 TrackCasDrift::No,
 +                                                 /*isBackfillItem*/ false,
 +                                                 nullptr /* no pre link */),
 +                                  /*use_meta*/ false,
 +                                  /*bySeqno*/ v->getBySeqno());
 +    }
 +
 +    uint64_t seqno = 0;
 +    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
 +    switch (delrv) {
 +    case MutationStatus::NoMem:
 +        ret = ENGINE_ENOMEM;
 +        break;
 +    case MutationStatus::InvalidCas:
 +        ret = ENGINE_KEY_EEXISTS;
 +        break;
 +    case MutationStatus::IsLocked:
 +        ret = ENGINE_LOCKED_TMPFAIL;
 +        break;
 +    case MutationStatus::NotFound:
 +        ret = ENGINE_KEY_ENOENT;
 +    /* Fallthrough:
 +     * A NotFound return value at this point indicates that the
 +     * item has expired. But, a deletion still needs to be queued
 +     * for the item in order to persist it.
 +     */
 +    case MutationStatus::WasClean:
 +    case MutationStatus::WasDirty:
 +        if (itemMeta != nullptr) {
 +            itemMeta->revSeqno = v->getRevSeqno();
 +            itemMeta->flags = v->getFlags();
 +            itemMeta->exptime = v->getExptime();
 +        }
 +
 +        notifyNewSeqno(notifyCtx);
 +        seqno = static_cast<uint64_t>(v->getBySeqno());
 +        cas = v->getCas();
 +
 +        if (delrv != MutationStatus::NotFound) {
 +            if (mutInfo) {
 +                mutInfo->seqno = seqno;
 +                mutInfo->vbucket_uuid = failovers->getLatestUUID();
 +            }
 +            if (itemMeta != nullptr) {
 +                itemMeta->cas = v->getCas();
 +            }
 +        }
 +        break;
 +    case MutationStatus::NeedBgFetch:
 +        // We already figured out if a bg fetch is requred for a full-evicted
 +        // item above.
 +        throw std::logic_error(
 +                "VBucket::deleteItem: "
 +                "Unexpected NEEDS_BG_FETCH from processSoftDelete");
 +    }
 +    return ret;
 +}
 +
 +ENGINE_ERROR_CODE VBucket::deleteWithMeta(const DocKey& key,
 +                                          uint64_t& cas,
 +                                          uint64_t* seqno,
 +                                          const void* cookie,
 +                                          EventuallyPersistentEngine& engine,
 +                                          const int bgFetchDelay,
 +                                          const bool force,
 +                                          const ItemMetaData& itemMeta,
 +                                          const bool backfill,
 +                                          const GenerateBySeqno genBySeqno,
 +                                          const GenerateCas generateCas,
 +                                          const uint64_t bySeqno,
 +                                          const bool isReplication) {
 +    auto hbl = ht.getLockedBucket(key);
 +    StoredValue* v = ht.unlocked_find(
 +            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
 +    if (!force) { // Need conflict resolution.
 +        if (v) {
 +            if (v->isTempInitialItem()) {
 +                bgFetch(key, cookie, engine, bgFetchDelay, true);
 +                return ENGINE_EWOULDBLOCK;
 +            }
 +
 +            if (!(conflictResolver->resolve(*v,
 +                                            itemMeta,
 +                                            PROTOCOL_BINARY_RAW_BYTES,
 +                                            true))) {
 +                ++stats.numOpsDelMetaResolutionFailed;
 +                return ENGINE_KEY_EEXISTS;
 +            }
 +        } else {
 +            // Item is 1) deleted or not existent in the value eviction case OR
 +            // 2) deleted or evicted in the full eviction.
 +            if (maybeKeyExistsInFilter(key)) {
 +                return addTempItemAndBGFetch(hbl,
 +                                             key,
 +                                             cookie,
 +                                             engine,
 +                                             bgFetchDelay,
 +                                             true,
 +                                             isReplication);
 +            } else {
 +                // Even though bloomfilter predicted that item doesn't exist
 +                // on disk, we must put this delete on disk if the cas is valid.
 +                AddStatus rv = addTempStoredValue(hbl, key, isReplication);
 +                if (rv == AddStatus::NoMem) {
 +                    return ENGINE_ENOMEM;
 +                }
 +                v = ht.unlocked_find(key,
 +                                     hbl.getBucketNum(),
 +                                     WantsDeleted::Yes,
 +                                     TrackReference::No);
 +                v->setDeleted();
 +            }
 +        }
 +    } else {
 +        if (!v) {
 +            // We should always try to persist a delete here.
 +            AddStatus rv = addTempStoredValue(hbl, key, isReplication);
 +            if (rv == AddStatus::NoMem) {
 +                return ENGINE_ENOMEM;
 +            }
 +            v = ht.unlocked_find(key,
 +                                 hbl.getBucketNum(),
 +                                 WantsDeleted::Yes,
 +                                 TrackReference::No);
 +            v->setDeleted();
 +            v->setCas(cas);
 +        } else if (v->isTempInitialItem()) {
 +            v->setDeleted();
 +            v->setCas(cas);
 +        }
 +    }
 +
 +    if (v && v->isLocked(ep_current_time()) &&
 +        (getState() == vbucket_state_replica ||
 +         getState() == vbucket_state_pending)) {
 +        v->unlock();
 +    }
 +
 +    MutationStatus delrv;
 +    VBNotifyCtx notifyCtx;
 +    if (!v) {
 +        if (eviction == FULL_EVICTION) {
 +            delrv = MutationStatus::NeedBgFetch;
 +        } else {
 +            delrv = MutationStatus::NotFound;
 +        }
 +    } else {
 +        VBQueueItemCtx queueItmCtx(genBySeqno,
 +                                   generateCas,
 +                                   TrackCasDrift::Yes,
 +                                   backfill,
 +                                   nullptr /* No pre link step needed */);
 +
 +        // system xattrs must remain
 +        std::unique_ptr<Item> itm;
 +        if (mcbp::datatype::is_xattr(v->getDatatype()) &&
 +            (itm = pruneXattrDocument(*v, itemMeta))) {
 +            std::tie(v, delrv, notifyCtx) =
 +                    updateStoredValue(hbl, *v, *itm, &queueItmCtx);
 +        } else {
 +            std::tie(delrv, v, notifyCtx) = processSoftDelete(hbl,
 +                                                              *v,
 +                                                              cas,
 +                                                              itemMeta,
 +                                                              queueItmCtx,
 +                                                              /*use_meta*/ true,
 +                                                              bySeqno);
 +        }
 +    }
 +    cas = v ? v->getCas() : 0;
 +
 +    switch (delrv) {
 +    case MutationStatus::NoMem:
 +        return ENGINE_ENOMEM;
 +    case MutationStatus::InvalidCas:
 +        return ENGINE_KEY_EEXISTS;
 +    case MutationStatus::IsLocked:
 +        return ENGINE_LOCKED_TMPFAIL;
 +    case MutationStatus::NotFound:
 +        return ENGINE_KEY_ENOENT;
 +    case MutationStatus::WasDirty:
 +    case MutationStatus::WasClean: {
 +        if (seqno) {
 +            *seqno = static_cast<uint64_t>(v->getBySeqno());
 +        }
 +        // we unlock ht lock here because we want to avoid potential lock
 +        // inversions arising from notifyNewSeqno() call
 +        hbl.getHTLock().unlock();
 +        notifyNewSeqno(notifyCtx);
 +        break;
 +    }
 +    case MutationStatus::NeedBgFetch:
 +        hbl.getHTLock().unlock();
 +        bgFetch(key, cookie, engine, bgFetchDelay, true);
 +        return ENGINE_EWOULDBLOCK;
 +    }
 +    return ENGINE_SUCCESS;
 +}
 +
 +void VBucket::deleteExpiredItem(const Item& it,
 +                                time_t startTime,
 +                                ExpireBy source) {
 +
 +    // The item is correctly trimmed (by the caller). Fetch the one in the
 +    // hashtable and replace it if the CAS match (same item; no race).
 +    // If not found in the hashtable we should add it as a deleted item
 +    const DocKey& key = it.getKey();
 +    auto hbl = ht.getLockedBucket(key);
 +    StoredValue* v = ht.unlocked_find(
 +            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
 +    if (v) {
 +        if (v->getCas() != it.getCas()) {
 +            return;
 +        }
 +
 +        if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
 +            bool deleted = deleteStoredValue(hbl, *v);
 +            if (!deleted) {
 +                throw std::logic_error(
 +                        "VBucket::deleteExpiredItem: "
 +                        "Failed to delete seqno:" +
 +                        std::to_string(v->getBySeqno()) + " from bucket " +
 +                        std::to_string(hbl.getBucketNum()));
 +            }
 +        } else if (v->isExpired(startTime) && !v->isDeleted()) {
 +            VBNotifyCtx notifyCtx;
 +            std::tie(std::ignore, std::ignore, notifyCtx) =
 +                    processExpiredItem(hbl, *v);
 +            // we unlock ht lock here because we want to avoid potential lock
 +            // inversions arising from notifyNewSeqno() call
 +            hbl.getHTLock().unlock();
 +            notifyNewSeqno(notifyCtx);
 +        }
 +    } else {
 +        if (eviction == FULL_EVICTION) {
 +            // Create a temp item and delete and push it
 +            // into the checkpoint queue, only if the bloomfilter
 +            // predicts that the item may exist on disk.
 +            if (maybeKeyExistsInFilter(key)) {
 +                AddStatus rv = addTempStoredValue(hbl, key);
 +                if (rv == AddStatus::NoMem) {
 +                    return;
 +                }
 +                v = ht.unlocked_find(key,
 +                                     hbl.getBucketNum(),
 +                                     WantsDeleted::Yes,
 +                                     TrackReference::No);
 +                v->setDeleted();
 +                v->setRevSeqno(it.getRevSeqno());
 +                v->setValue(it, ht);
 +                VBNotifyCtx notifyCtx;
 +                std::tie(std::ignore, std::ignore, notifyCtx) =
 +                        processExpiredItem(hbl, *v);
 +                // we unlock ht lock here because we want to avoid potential
 +                // lock inversions arising from notifyNewSeqno() call
 +                hbl.getHTLock().unlock();
 +                notifyNewSeqno(notifyCtx);
 +            }
 +        }
 +    }
 +    incExpirationStat(source);
 +}
 +
 +ENGINE_ERROR_CODE VBucket::add(Item& itm,
 +                               const void* cookie,
 +                               EventuallyPersistentEngine& engine,
 +                               const int bgFetchDelay) {
 +    auto hbl = ht.getLockedBucket(itm.getKey());
 +    StoredValue* v = ht.unlocked_find(itm.getKey(),
 +                                      hbl.getBucketNum(),
 +                                      WantsDeleted::Yes,
 +                                      TrackReference::No);
 +
 +    bool maybeKeyExists = true;
 +    if ((v == nullptr || v->isTempInitialItem()) &&
 +        (eviction == FULL_EVICTION)) {
 +        // Check bloomfilter's prediction
 +        if (!maybeKeyExistsInFilter(itm.getKey())) {
 +            maybeKeyExists = false;
 +        }
 +    }
 +
 +    PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
 +    VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
 +                               GenerateCas::Yes,
 +                               TrackCasDrift::No,
 +                               /*isBackfillItem*/ false,
 +                               &preLinkDocumentContext);
 +    AddStatus status;
 +    VBNotifyCtx notifyCtx;
 +    std::tie(status, notifyCtx) =
 +            processAdd(hbl, v, itm, maybeKeyExists, false, &queueItmCtx);
 +
 +    switch (status) {
 +    case AddStatus::NoMem:
 +        return ENGINE_ENOMEM;
 +    case AddStatus::Exists:
 +        return ENGINE_NOT_STORED;
 +    case AddStatus::AddTmpAndBgFetch:
 +        return addTempItemAndBGFetch(
 +                hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
 +    case AddStatus::BgFetch:
 +        hbl.getHTLock().unlock();
 +        bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
 +        return ENGINE_EWOULDBLOCK;
 +    case AddStatus::Success:
 +    case AddStatus::UnDel:
 +        notifyNewSeqno(notifyCtx);
 +        itm.setBySeqno(v->getBySeqno());
 +        itm.setCas(v->getCas());
 +        break;
 +    }
 +    return ENGINE_SUCCESS;
 +}
 +
 +GetValue VBucket::getAndUpdateTtl(const DocKey& key,
 +                                  const void* cookie,
 +                                  EventuallyPersistentEngine& engine,
 +                                  int bgFetchDelay,
 +                                  time_t exptime) {
 +    auto hbl = ht.getLockedBucket(key);
 +    StoredValue* v = fetchValidValue(hbl,
 +                                     key,
 +                                     WantsDeleted::Yes,
 +                                     TrackReference::Yes,
 +                                     QueueExpired::Yes);
 +
 +    if (v) {
 +        if (v->isDeleted() || v->isTempDeletedItem() ||
 +            v->isTempNonExistentItem()) {
 +            return {};
 +        }
 +
 +        if (!v->isResident()) {
 +            bgFetch(key, cookie, engine, bgFetchDelay);
 +            return GetValue(nullptr, ENGINE_EWOULDBLOCK, v->getBySeqno());
 +        }
 +        if (v->isLocked(ep_current_time())) {
 +            return GetValue(nullptr, ENGINE_KEY_EEXISTS, 0);
 +        }
 +
 +        const bool exptime_mutated = exptime != v->getExptime();
 +        if (exptime_mutated) {
 +            v->markDirty();
 +            v->setExptime(exptime);
 +            v->setRevSeqno(v->getRevSeqno() + 1);
 +        }
 +
 +        GetValue rv(
 +                v->toItem(v->isLocked(ep_current_time()), getId()).release(),
 +                ENGINE_SUCCESS,
 +                v->getBySeqno());
 +
 +        if (exptime_mutated) {
 +            VBNotifyCtx notifyCtx = queueDirty(*v);
 +            rv.getValue()->setCas(v->getCas());
 +            // we unlock ht lock here because we want to avoid potential lock
 +            // inversions arising from notifyNewSeqno() call
 +            hbl.getHTLock().unlock();
 +            notifyNewSeqno(notifyCtx);
 +        }
 +
 +        return rv;
 +    } else {
 +        if (eviction == VALUE_ONLY) {
 +            return {};
 +        } else {
 +            if (maybeKeyExistsInFilter(key)) {
 +                ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
 +                        hbl, key, cookie, engine, bgFetchDelay, false);
 +                return GetValue(NULL, ec, -1, true);
 +            } else {
 +                // As bloomfilter predicted that item surely doesn't exist
 +                // on disk, return ENOENT for getAndUpdateTtl().
 +                return {};
 +            }
 +        }
 +    }
 +}
 +
 +MutationStatus VBucket::insertFromWarmup(Item& itm,
 +                                         bool eject,
 +                                         bool keyMetaDataOnly) {
 +    if (!StoredValue::hasAvailableSpace(stats, itm)) {
 +        return MutationStatus::NoMem;
 +    }
 +
 +    auto hbl = ht.getLockedBucket(itm.getKey());
 +    StoredValue* v = ht.unlocked_find(itm.getKey(),
 +                                      hbl.getBucketNum(),
 +                                      WantsDeleted::Yes,
 +                                      TrackReference::No);
 +
 +    if (v == NULL) {
 +        v = addNewStoredValue(hbl, itm, /*queueItmCtx*/ nullptr).first;
 +        if (keyMetaDataOnly) {
 +            v->markNotResident();
 +            /* For now ht stats are updated from outside ht. This seems to be
 +               a better option for now than passing a flag to
 +               addNewStoredValue() just for this func */
 +            ++(ht.numNonResidentItems);
 +        }
 +        /* For now ht stats are updated from outside ht. This seems to be
 +           a better option for now than passing a flag to
 +           addNewStoredValue() just for this func.
 +           We need to decrNumTotalItems because ht.numTotalItems is already
 +           set by warmup when it estimated the item count from disk */
 +        ht.decrNumTotalItems();
 +        v->setNewCacheItem(false);
 +    } else {
 +        if (keyMetaDataOnly) {
 +            // We don't have a better error code ;)
 +            return MutationStatus::InvalidCas;
 +        }
 +
 +        // Verify that the CAS isn't changed
 +        if (v->getCas() != itm.getCas()) {
 +            if (v->getCas() == 0) {
 +                v->setCas(itm.getCas());
 +                v->setFlags(itm.getFlags());
 +                v->setExptime(itm.getExptime());
 +                v->setRevSeqno(itm.getRevSeqno());
 +            } else {
 +                return MutationStatus::InvalidCas;
 +            }
 +        }
 +        updateStoredValue(hbl, *v, itm, /*queueItmCtx*/ nullptr);
 +    }
 +
 +    v->markClean();
 +
 +    if (eject && !keyMetaDataOnly) {
 +        ht.unlocked_ejectItem(v, eviction);
 +    }
 +
 +    return MutationStatus::NotFound;
 +}
 +
 +GetValue VBucket::getInternal(const DocKey& key,
 +                              const void* cookie,
 +                              EventuallyPersistentEngine& engine,
 +                              int bgFetchDelay,
 +                              get_options_t options,
 +                              bool diskFlushAll) {
 +    const TrackReference trackReference = (options & TRACK_REFERENCE)
 +                                                  ? TrackReference::Yes
 +                                                  : TrackReference::No;
 +    const bool getDeletedValue = (options & GET_DELETED_VALUE);
 +    auto hbl = ht.getLockedBucket(key);
 +    StoredValue* v = fetchValidValue(
 +            hbl, key, WantsDeleted::Yes, trackReference, QueueExpired::Yes);
 +    if (v) {
 +        if (v->isDeleted() && !getDeletedValue) {
 +            return GetValue();
 +        }
 +        if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
 +            // Delete a temp non-existent item to ensure that
 +            // if the get were issued over an item that doesn't
 +            // exist, then we dont preserve a temp item.
 +            if (options & DELETE_TEMP) {
 +                deleteStoredValue(hbl, *v);
 +            }
 +            return GetValue();
 +        }
 +
 +        // If the value is not resident, wait for it...
 +        if (!v->isResident()) {
 +            return getInternalNonResident(
 +                    key, cookie, engine, bgFetchDelay, options, *v);
 +        }
 +
 +        // Should we hide (return -1) for the items' CAS?
 +        const bool hide_cas =
 +                (options & HIDE_LOCKED_CAS) && v->isLocked(ep_current_time());
 +        return GetValue(v->toItem(hide_cas, getId()).release(),
 +                        ENGINE_SUCCESS,
 +                        v->getBySeqno(),
 +                        false,
 +                        v->getNRUValue());
 +    } else {
 +        if (!getDeletedValue && (eviction == VALUE_ONLY || diskFlushAll)) {
 +            return GetValue();
 +        }
 +
 +        if (maybeKeyExistsInFilter(key)) {
 +            ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
 +            if (options &
 +                QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
 +                ec = addTempItemAndBGFetch(
 +                        hbl, key, cookie, engine, bgFetchDelay, false);
 +            }
 +            return GetValue(NULL, ec, -1, true);
 +        } else {
 +            // As bloomfilter predicted that item surely doesn't exist
 +            // on disk, return ENOENT, for getInternal().
 +            return GetValue();
 +        }
 +    }
 +}
 +
 +ENGINE_ERROR_CODE VBucket::getMetaData(const DocKey& key,
 +                                       const void* cookie,
 +                                       EventuallyPersistentEngine& engine,
 +                                       int bgFetchDelay,
 +                                       ItemMetaData& metadata,
 +                                       uint32_t& deleted,
 +                                       uint8_t& datatype) {
 +    deleted = 0;
 +    auto hbl = ht.getLockedBucket(key);
 +    StoredValue* v = ht.unlocked_find(
 +            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
 +
 +    if (v) {
 +        stats.numOpsGetMeta++;
 +        if (v->isTempInitialItem()) {
 +            // Need bg meta fetch.
 +            bgFetch(key, cookie, engine, bgFetchDelay, true);
 +            return ENGINE_EWOULDBLOCK;
 +        } else if (v->isTempNonExistentItem()) {
 +            metadata.cas = v->getCas();
 +            return ENGINE_KEY_ENOENT;
 +        } else {
 +            if (v->isTempDeletedItem() || v->isDeleted() ||
 +                v->isExpired(ep_real_time())) {
 +                deleted |= GET_META_ITEM_DELETED_FLAG;
 +            }
 +
 +            if (v->isLocked(ep_current_time())) {
 +                metadata.cas = static_cast<uint64_t>(-1);
 +            } else {
 +                metadata.cas = v->getCas();
 +            }
 +            metadata.flags = v->getFlags();
 +            metadata.exptime = v->getExptime();
 +            metadata.revSeqno = v->getRevSeqno();
 +            datatype = v->getDatatype();
 +
 +            return ENGINE_SUCCESS;
 +        }
 +    } else {
 +        // The key wasn't found. However, this may be because it was previously
 +        // deleted or evicted with the full eviction strategy.
 +        // So, add a temporary item corresponding to the key to the hash table
 +        // and schedule a background fetch for its metadata from the persistent
 +        // store. The item's state will be updated after the fetch completes.
 +        //
 +        // Schedule this bgFetch only if the key is predicted to be may-be
 +        // existent on disk by the bloomfilter.
 +
 +        if (maybeKeyExistsInFilter(key)) {
 +            return addTempItemAndBGFetch(
 +                    hbl, key, cookie, engine, bgFetchDelay, true);
 +        } else {
 +            stats.numOpsGetMeta++;
 +            return ENGINE_KEY_ENOENT;
 +        }
 +    }
 +}
 +
 +ENGINE_ERROR_CODE VBucket::getKeyStats(const DocKey& key,
 +                                       const void* cookie,
 +                                       EventuallyPersistentEngine& engine,
 +                                       int bgFetchDelay,
 +                                       struct key_stats& kstats,
 +                                       WantsDeleted wantsDeleted) {
 +    auto hbl = ht.getLockedBucket(key);
 +    StoredValue* v = fetchValidValue(hbl,
 +                                     key,
 +                                     WantsDeleted::Yes,
 +                                     TrackReference::Yes,
 +                                     QueueExpired::Yes);
 +
 +    if (v) {
 +        if ((v->isDeleted() && wantsDeleted == WantsDeleted::No) ||
 +            v->isTempNonExistentItem() || v->isTempDeletedItem()) {
 +            return ENGINE_KEY_ENOENT;
 +        }
 +        if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
 +            hbl.getHTLock().unlock();
 +            bgFetch(key, cookie, engine, bgFetchDelay, true);
 +            return ENGINE_EWOULDBLOCK;
 +        }
 +        kstats.logically_deleted = v->isDeleted();
 +        kstats.dirty = v->isDirty();
 +        kstats.exptime = v->getExptime();
 +        kstats.flags = v->getFlags();
 +        kstats.cas = v->getCas();
 +        kstats.vb_state = getState();
 +        return ENGINE_SUCCESS;
 +    } else {
 +        if (eviction == VALUE_ONLY) {
 +            return ENGINE_KEY_ENOENT;
 +        } else {
 +            if (maybeKeyExistsInFilter(key)) {
 +                return addTempItemAndBGFetch(
 +                        hbl, key, cookie, engine, bgFetchDelay, true);
 +            } else {
 +                // If bgFetch were false, or bloomfilter predicted that
 +                // item surely doesn't exist on disk, return ENOENT for
 +                // getKeyStats().
 +                return ENGINE_KEY_ENOENT;
 +            }
 +        }
 +    }
 +}
 +
 +GetValue VBucket::getLocked(const DocKey& key,
 +                            rel_time_t currentTime,
 +                            uint32_t lockTimeout,
 +                            const void* cookie,
 +                            EventuallyPersistentEngine& engine,
 +                            int bgFetchDelay) {
 +    auto hbl = ht.getLockedBucket(key);
 +    StoredValue* v = fetchValidValue(hbl,
 +                                     key,
 +                                     WantsDeleted::Yes,
 +                                     TrackReference::Yes,
 +                                     QueueExpired::Yes);
 +
 +    if (v) {
 +        if (v->isDeleted() || v->isTempNonExistentItem() ||
 +            v->isTempDeletedItem()) {
 +            return GetValue(NULL, ENGINE_KEY_ENOENT);
 +        }
 +
 +        // if v is locked return error
 +        if (v->isLocked(currentTime)) {
 +            return GetValue(NULL, ENGINE_TMPFAIL);
 +        }
 +
 +        // If the value is not resident, wait for it...
 +        if (!v->isResident()) {
 +            if (cookie) {
 +                bgFetch(key, cookie, engine, bgFetchDelay);
 +            }
 +            return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
 +        }
 +
 +        // acquire lock and increment cas value
 +        v->lock(currentTime + lockTimeout);
 +
 +        auto it = v->toItem(false, getId());
 +        it->setCas(nextHLCCas());
 +        v->setCas(it->getCas());
 +
 +        return GetValue(it.release());
 +
 +    } else {
 +        // No value found in the hashtable.
 +        switch (eviction) {
 +        case VALUE_ONLY:
 +            return GetValue(NULL, ENGINE_KEY_ENOENT);
 +
 +        case FULL_EVICTION:
 +            if (maybeKeyExistsInFilter(key)) {
 +                ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
 +                        hbl, key, cookie, engine, bgFetchDelay, false);
 +                return GetValue(NULL, ec, -1, true);
 +            } else {
 +                // As bloomfilter predicted that item surely doesn't exist
 +                // on disk, return ENOENT for getLocked().
 +                return GetValue(NULL, ENGINE_KEY_ENOENT);
 +            }
 +        }
 +        return GetValue(); // just to prevent compiler warning
 +    }
 +}
 +
 +void VBucket::deletedOnDiskCbk(const Item& queuedItem, bool deleted) {
 +    auto hbl = ht.getLockedBucket(queuedItem.getKey());
 +    StoredValue* v = fetchValidValue(hbl,
 +                                     queuedItem.getKey(),
 +                                     WantsDeleted::Yes,
 +                                     TrackReference::No,
 +                                     QueueExpired::Yes);
 +    // Delete the item in the hash table iff:
 +    //  1. Item is existent in hashtable, and deleted flag is true
 +    //  2. rev seqno of queued item matches rev seqno of hash table item
 +    if (v && v->isDeleted() && (queuedItem.getRevSeqno() == v->getRevSeqno())) {
 +        bool isDeleted = deleteStoredValue(hbl, *v);
 +        if (!isDeleted) {
 +            throw std::logic_error(
 +                    "deletedOnDiskCbk:callback: "
 +                    "Failed to delete key with seqno:" +
 +                    std::to_string(v->getBySeqno()) + "' from bucket " +
 +                    std::to_string(hbl.getBucketNum()));
 +        }
 +
 +        /**
 +         * Deleted items are to be added to the bloomfilter,
 +         * in either eviction policy.
 +         */
 +        addToFilter(queuedItem.getKey());
 +    }
 +
 +    if (deleted) {
 +        ++stats.totalPersisted;
 +        ++opsDelete;
 +    }
 +    doStatsForFlushing(queuedItem, queuedItem.size());
 +    --stats.diskQueueSize;
 +    decrMetaDataDisk(queuedItem);
 +}
 +
 +bool VBucket::deleteKey(const DocKey& key) {
 +    auto hbl = ht.getLockedBucket(key);
 +    StoredValue* v = ht.unlocked_find(
 +            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
 +    if (!v) {
 +        return false;
 +    }
 +    return deleteStoredValue(hbl, *v);
 +}
 +
 +void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
 +                                  uint64_t prevHighSeqno) {
 +    failovers->pruneEntries(rollbackResult.highSeqno);
 +    checkpointManager.clear(*this, rollbackResult.highSeqno);
 +    setPersistedSnapshot(rollbackResult.snapStartSeqno,
 +                         rollbackResult.snapEndSeqno);
 +    incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
-     setBackfillPhase(false);
++    checkpointManager.setOpenCheckpointId(1);
 +}
 +
 +void VBucket::dump() const {
 +    std::cerr << "VBucket[" << this << "] with state: " << toString(getState())
 +              << " numItems:" << getNumItems()
 +              << " numNonResident:" << getNumNonResidentItems()
 +              << " ht: " << std::endl << "  " << ht << std::endl
 +              << "]" << std::endl;
 +}
 +
 +void VBucket::_addStats(bool details, ADD_STAT add_stat, const void* c) {
      addStat(NULL, toString(state), add_stat, c);
      if (details) {
 -        size_t numItems = getNumItems(policy);
 +        size_t numItems = getNumItems();
          size_t tempItems = getNumTempItems();
          addStat("num_items", numItems, add_stat, c);
          addStat("num_temp_items", tempItems, add_stat, c);
@@@ -301,10 -288,10 +298,10 @@@ TEST_P(RollbackTest, MB21784) 
          << "rollback did not return ENGINE_SUCCESS";
  
      // Assert the checkpointmanager clear function (called during rollback)
-     // has set the opencheckpointid to zero
+     // has set the opencheckpointid to one
 -    auto vb = store->getVbMap().getBucket(vbid);
 +    auto vb = store->getVBucket(vbid);
      auto& ckpt_mgr = vb->checkpointManager;
-     EXPECT_EQ(0, ckpt_mgr.getOpenCheckpointId()) << "opencheckpointId not zero";
+     EXPECT_EQ(1, ckpt_mgr.getOpenCheckpointId()) << "opencheckpointId not one";
  
      // Create a new Dcp producer, reserving its cookie.
      get_mock_server_api()->cookie->reserve(cookie);