874d3a42305f9b978f6c93bf057b94cd13403aea
[ep-engine.git] / src / vbucket.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <functional>
21 #include <list>
22 #include <set>
23 #include <string>
24 #include <vector>
25
26 #include "atomic.h"
27 #include "bgfetcher.h"
28 #include "conflict_resolution.h"
29 #include "ep_engine.h"
30 #include "ep_types.h"
31 #include "failover-table.h"
32 #include "flusher.h"
33 #include "pre_link_document_context.h"
34
35 #define STATWRITER_NAMESPACE vbucket
36 #include "statwriter.h"
37 #undef STATWRITER_NAMESPACE
38 #include "stored_value_factories.h"
39
40 #include "vbucket.h"
41
42 /* Macros */
43 const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
44 const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
45
46 /* Statics definitions */
47 std::atomic<size_t> VBucket::chkFlushTimeout(MIN_CHK_FLUSH_TIMEOUT);
48
49 VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
50     std::vector<uint16_t> tmp(acceptable.size() + other.size());
51     std::vector<uint16_t>::iterator end;
52     end = std::set_symmetric_difference(acceptable.begin(),
53                                         acceptable.end(),
54                                         other.acceptable.begin(),
55                                         other.acceptable.end(),
56                                         tmp.begin());
57     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
58 }
59
60 VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
61                                                                         const {
62     std::vector<uint16_t> tmp(acceptable.size() + other.size());
63     std::vector<uint16_t>::iterator end;
64
65     end = std::set_intersection(acceptable.begin(), acceptable.end(),
66                                 other.acceptable.begin(),
67                                 other.acceptable.end(),
68                                 tmp.begin());
69     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
70 }
71
72 static bool isRange(std::set<uint16_t>::const_iterator it,
73                     const std::set<uint16_t>::const_iterator &end,
74                     size_t &length)
75 {
76     length = 0;
77     for (uint16_t val = *it;
78          it != end && (val + length) == *it;
79          ++it, ++length) {
80         // empty
81     }
82
83     --length;
84
85     return length > 1;
86 }
87
88 std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
89 {
90     std::set<uint16_t>::const_iterator it;
91
92     if (filter.acceptable.empty()) {
93         out << "{ empty }";
94     } else {
95         bool needcomma = false;
96         out << "{ ";
97         for (it = filter.acceptable.begin();
98              it != filter.acceptable.end();
99              ++it) {
100             if (needcomma) {
101                 out << ", ";
102             }
103
104             size_t length;
105             if (isRange(it, filter.acceptable.end(), length)) {
106                 std::set<uint16_t>::iterator last = it;
107                 for (size_t i = 0; i < length; ++i) {
108                     ++last;
109                 }
110                 out << "[" << *it << "," << *last << "]";
111                 it = last;
112             } else {
113                 out << *it;
114             }
115             needcomma = true;
116         }
117         out << " }";
118     }
119
120     return out;
121 }
122
123 const vbucket_state_t VBucket::ACTIVE =
124                      static_cast<vbucket_state_t>(htonl(vbucket_state_active));
125 const vbucket_state_t VBucket::REPLICA =
126                     static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
127 const vbucket_state_t VBucket::PENDING =
128                     static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
129 const vbucket_state_t VBucket::DEAD =
130                     static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
131
132 VBucket::VBucket(id_type i,
133                  vbucket_state_t newState,
134                  EPStats& st,
135                  CheckpointConfig& chkConfig,
136                  int64_t lastSeqno,
137                  uint64_t lastSnapStart,
138                  uint64_t lastSnapEnd,
139                  std::unique_ptr<FailoverTable> table,
140                  std::shared_ptr<Callback<id_type> > flusherCb,
141                  std::unique_ptr<AbstractStoredValueFactory> valFact,
142                  NewSeqnoCallback newSeqnoCb,
143                  Configuration& config,
144                  item_eviction_policy_t evictionPolicy,
145                  vbucket_state_t initState,
146                  uint64_t purgeSeqno,
147                  uint64_t maxCas,
148                  const std::string& collectionsManifest)
149     : ht(st, std::move(valFact)),
150       checkpointManager(st,
151                         i,
152                         chkConfig,
153                         lastSeqno,
154                         lastSnapStart,
155                         lastSnapEnd,
156                         flusherCb),
157       failovers(std::move(table)),
158       opsCreate(0),
159       opsUpdate(0),
160       opsDelete(0),
161       opsReject(0),
162       dirtyQueueSize(0),
163       dirtyQueueMem(0),
164       dirtyQueueFill(0),
165       dirtyQueueDrain(0),
166       dirtyQueueAge(0),
167       dirtyQueuePendingWrites(0),
168       metaDataDisk(0),
169       numExpiredItems(0),
170       eviction(evictionPolicy),
171       stats(st),
172       persistenceSeqno(0),
173       numHpVBReqs(0),
174       id(i),
175       state(newState),
176       initialState(initState),
177       purge_seqno(purgeSeqno),
178       takeover_backed_up(false),
179       persisted_snapshot_start(lastSnapStart),
180       persisted_snapshot_end(lastSnapEnd),
181       rollbackItemCount(0),
182       hlc(maxCas,
183           std::chrono::microseconds(config.getHlcDriftAheadThresholdUs()),
184           std::chrono::microseconds(config.getHlcDriftBehindThresholdUs())),
185       statPrefix("vb_" + std::to_string(i)),
186       persistenceCheckpointId(0),
187       bucketCreation(false),
188       bucketDeletion(false),
189       newSeqnoCb(std::move(newSeqnoCb)),
190       manifest(collectionsManifest) {
191     if (config.getConflictResolutionType().compare("lww") == 0) {
192         conflictResolver.reset(new LastWriteWinsResolution());
193     } else {
194         conflictResolver.reset(new RevisionSeqnoResolution());
195     }
196
197     backfill.isBackfillPhase = false;
198     pendingOpsStart = 0;
199     stats.memOverhead->fetch_add(sizeof(VBucket)
200                                 + ht.memorySize() + sizeof(CheckpointManager));
201     LOG(EXTENSION_LOG_NOTICE,
202         "VBucket: created vbucket:%" PRIu16 " with state:%s "
203                 "initialState:%s "
204                 "lastSeqno:%" PRIu64 " "
205                 "lastSnapshot:{%" PRIu64 ",%" PRIu64 "} "
206                 "persisted_snapshot:{%" PRIu64 ",%" PRIu64 "} "
207                 "max_cas:%" PRIu64,
208         id, VBucket::toString(state), VBucket::toString(initialState),
209         lastSeqno, lastSnapStart, lastSnapEnd,
210         persisted_snapshot_start, persisted_snapshot_end,
211         getMaxCas());
212 }
213
214 VBucket::~VBucket() {
215     if (!pendingOps.empty()) {
216         LOG(EXTENSION_LOG_WARNING,
217             "~Vbucket(): vbucket:%" PRIu16 " has %ld pending ops",
218             id,
219             pendingOps.size());
220     }
221
222     stats.diskQueueSize.fetch_sub(dirtyQueueSize.load());
223     stats.vbBackfillQueueSize.fetch_sub(getBackfillSize());
224
225     // Clear out the bloomfilter(s)
226     clearFilter();
227
228     stats.memOverhead->fetch_sub(sizeof(VBucket) + ht.memorySize() +
229                                 sizeof(CheckpointManager));
230
231     LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
232 }
233
234 void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
235                          ENGINE_ERROR_CODE code) {
236     std::unique_lock<std::mutex> lh(pendingOpLock);
237
238     if (pendingOpsStart > 0) {
239         hrtime_t now = gethrtime();
240         if (now > pendingOpsStart) {
241             hrtime_t d = (now - pendingOpsStart) / 1000;
242             stats.pendingOpsHisto.add(d);
243             atomic_setIfBigger(stats.pendingOpsMaxDuration, d);
244         }
245     } else {
246         return;
247     }
248
249     pendingOpsStart = 0;
250     stats.pendingOps.fetch_sub(pendingOps.size());
251     atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
252
253     while (!pendingOps.empty()) {
254         const void *pendingOperation = pendingOps.back();
255         pendingOps.pop_back();
256         // We don't want to hold the pendingOpLock when
257         // calling notifyIOComplete.
258         lh.unlock();
259         engine.notifyIOComplete(pendingOperation, code);
260         lh.lock();
261     }
262
263     LOG(EXTENSION_LOG_INFO,
264         "Fired pendings ops for vbucket %" PRIu16 " in state %s\n",
265         id, VBucket::toString(state));
266 }
267
268 void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
269
270     if (state == vbucket_state_active) {
271         fireAllOps(engine, ENGINE_SUCCESS);
272     } else if (state == vbucket_state_pending) {
273         // Nothing
274     } else {
275         fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
276     }
277 }
278
279 void VBucket::setState(vbucket_state_t to) {
280     vbucket_state_t oldstate;
281     {
282         WriterLockHolder wlh(stateLock);
283         oldstate = state;
284
285         if (to == vbucket_state_active &&
286             checkpointManager.getOpenCheckpointId() < 2) {
287             checkpointManager.setOpenCheckpointId(2);
288         }
289
290         LOG(EXTENSION_LOG_NOTICE,
291             "VBucket::setState: transitioning vbucket:%" PRIu16 " from:%s to:%s",
292             id, VBucket::toString(oldstate), VBucket::toString(to));
293
294         state = to;
295     }
296 }
297
298 vbucket_state VBucket::getVBucketState() const {
299      auto persisted_range = getPersistedSnapshot();
300
301      return vbucket_state{getState(),
302                           getPersistenceCheckpointId(), 0, getHighSeqno(),
303                           getPurgeSeqno(),
304                           persisted_range.start, persisted_range.end,
305                           getMaxCas(), failovers->toJSON()};
306 }
307
308
309
310 void VBucket::doStatsForQueueing(const Item& qi, size_t itemBytes)
311 {
312     ++dirtyQueueSize;
313     dirtyQueueMem.fetch_add(sizeof(Item));
314     ++dirtyQueueFill;
315     dirtyQueueAge.fetch_add(qi.getQueuedTime());
316     dirtyQueuePendingWrites.fetch_add(itemBytes);
317 }
318
319 void VBucket::doStatsForFlushing(const Item& qi, size_t itemBytes) {
320     --dirtyQueueSize;
321     decrDirtyQueueMem(sizeof(Item));
322     ++dirtyQueueDrain;
323     decrDirtyQueueAge(qi.getQueuedTime());
324     decrDirtyQueuePendingWrites(itemBytes);
325 }
326
327 void VBucket::incrMetaDataDisk(const Item& qi) {
328     metaDataDisk.fetch_add(qi.getKey().size() + sizeof(ItemMetaData));
329 }
330
331 void VBucket::decrMetaDataDisk(const Item& qi) {
332     // assume couchstore remove approx this much data from disk
333     metaDataDisk.fetch_sub((qi.getKey().size() + sizeof(ItemMetaData)));
334 }
335
336 void VBucket::resetStats() {
337     opsCreate.store(0);
338     opsUpdate.store(0);
339     opsDelete.store(0);
340     opsReject.store(0);
341
342     stats.diskQueueSize.fetch_sub(dirtyQueueSize.exchange(0));
343     dirtyQueueMem.store(0);
344     dirtyQueueFill.store(0);
345     dirtyQueueAge.store(0);
346     dirtyQueuePendingWrites.store(0);
347     dirtyQueueDrain.store(0);
348
349     hlc.resetStats();
350 }
351
352 template <typename T>
353 void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
354                       const void *c) {
355     std::string stat = statPrefix;
356     if (nm != NULL) {
357         add_prefixed_stat(statPrefix, nm, val, add_stat, c);
358     } else {
359         add_casted_stat(statPrefix.data(), val, add_stat, c);
360     }
361 }
362
363 void VBucket::handlePreExpiry(StoredValue& v) {
364     value_t value = v.getValue();
365     if (value) {
366         std::unique_ptr<Item> itm(v.toItem(false, id));
367         item_info itm_info;
368         EventuallyPersistentEngine* engine = ObjectRegistry::getCurrentEngine();
369         itm_info = itm->toItemInfo(failovers->getLatestUUID());
370         value_t new_val(Blob::Copy(*value));
371         itm->setValue(new_val);
372
373         SERVER_HANDLE_V1* sapi = engine->getServerApi();
374         /* TODO: In order to minimize allocations, the callback needs to
375          * allocate an item whose value size will be exactly the size of the
376          * value after pre-expiry is performed.
377          */
378         if (sapi->document->pre_expiry(itm_info)) {
379             char* extMeta = const_cast<char *>(v.getValue()->getExtMeta());
380             Item new_item(v.getKey(), v.getFlags(), v.getExptime(),
381                           itm_info.value[0].iov_base, itm_info.value[0].iov_len,
382                           reinterpret_cast<uint8_t*>(extMeta),
383                           v.getValue()->getExtLen(), v.getCas(),
384                           v.getBySeqno(), id, v.getRevSeqno(),
385                           v.getNRUValue());
386
387             new_item.setDeleted();
388             v.setValue(new_item, ht);
389         }
390     }
391 }
392
393 size_t VBucket::getNumNonResidentItems() const {
394     if (eviction == VALUE_ONLY) {
395         return ht.getNumInMemoryNonResItems();
396     } else {
397         size_t num_items = ht.getNumItems();
398         size_t num_res_items = ht.getNumInMemoryItems() -
399                                ht.getNumInMemoryNonResItems();
400         return num_items > num_res_items ? (num_items - num_res_items) : 0;
401     }
402 }
403
404
405 uint64_t VBucket::getPersistenceCheckpointId() const {
406     return persistenceCheckpointId.load();
407 }
408
409 void VBucket::setPersistenceCheckpointId(uint64_t checkpointId) {
410     persistenceCheckpointId.store(checkpointId);
411 }
412
413 void VBucket::markDirty(const DocKey& key) {
414     auto hbl = ht.getLockedBucket(key);
415     StoredValue* v = ht.unlocked_find(
416             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::Yes);
417     if (v) {
418         v->markDirty();
419     } else {
420         LOG(EXTENSION_LOG_WARNING, "markDirty: Error marking dirty, a key "
421             "is missing from vb:%" PRIu16, id);
422     }
423 }
424
425 bool VBucket::isResidentRatioUnderThreshold(float threshold) {
426     if (eviction != FULL_EVICTION) {
427         throw std::invalid_argument("VBucket::isResidentRatioUnderThreshold: "
428                 "policy (which is " + std::to_string(eviction) +
429                 ") must be FULL_EVICTION");
430     }
431     size_t num_items = getNumItems();
432     size_t num_non_resident_items = getNumNonResidentItems();
433     if (threshold >= ((float)(num_items - num_non_resident_items) /
434                                                                 num_items)) {
435         return true;
436     } else {
437         return false;
438     }
439 }
440
441 void VBucket::createFilter(size_t key_count, double probability) {
442     // Create the actual bloom filter upon vbucket creation during
443     // scenarios:
444     //      - Bucket creation
445     //      - Rebalance
446     LockHolder lh(bfMutex);
447     if (bFilter == nullptr && tempFilter == nullptr) {
448         bFilter = std::make_unique<BloomFilter>(key_count, probability,
449                                         BFILTER_ENABLED);
450     } else {
451         LOG(EXTENSION_LOG_WARNING, "(vb %" PRIu16 ") Bloom filter / Temp filter"
452             " already exist!", id);
453     }
454 }
455
456 void VBucket::initTempFilter(size_t key_count, double probability) {
457     // Create a temp bloom filter with status as COMPACTING,
458     // if the main filter is found to exist, set its state to
459     // COMPACTING as well.
460     LockHolder lh(bfMutex);
461     tempFilter = std::make_unique<BloomFilter>(key_count, probability,
462                                      BFILTER_COMPACTING);
463     if (bFilter) {
464         bFilter->setStatus(BFILTER_COMPACTING);
465     }
466 }
467
468 void VBucket::addToFilter(const DocKey& key) {
469     LockHolder lh(bfMutex);
470     if (bFilter) {
471         bFilter->addKey(key);
472     }
473
474     // If the temp bloom filter is not found to be NULL,
475     // it means that compaction is running on the particular
476     // vbucket. Therefore add the key to the temp filter as
477     // well, as once compaction completes the temp filter
478     // will replace the main bloom filter.
479     if (tempFilter) {
480         tempFilter->addKey(key);
481     }
482 }
483
484 bool VBucket::maybeKeyExistsInFilter(const DocKey& key) {
485     LockHolder lh(bfMutex);
486     if (bFilter) {
487         return bFilter->maybeKeyExists(key);
488     } else {
489         // If filter doesn't exist, allow the BgFetch to go through.
490         return true;
491     }
492 }
493
494 bool VBucket::isTempFilterAvailable() {
495     LockHolder lh(bfMutex);
496     if (tempFilter &&
497         (tempFilter->getStatus() == BFILTER_COMPACTING ||
498          tempFilter->getStatus() == BFILTER_ENABLED)) {
499         return true;
500     } else {
501         return false;
502     }
503 }
504
505 void VBucket::addToTempFilter(const DocKey& key) {
506     // Keys will be added to only the temp filter during
507     // compaction.
508     LockHolder lh(bfMutex);
509     if (tempFilter) {
510         tempFilter->addKey(key);
511     }
512 }
513
514 void VBucket::swapFilter() {
515     // Delete the main bloom filter and replace it with
516     // the temp filter that was populated during compaction,
517     // only if the temp filter's state is found to be either at
518     // COMPACTING or ENABLED (if in the case the user enables
519     // bloomfilters for some reason while compaction was running).
520     // Otherwise, it indicates that the filter's state was
521     // possibly disabled during compaction, therefore clear out
522     // the temp filter. If it gets enabled at some point, a new
523     // bloom filter will be made available after the next
524     // compaction.
525
526     LockHolder lh(bfMutex);
527     if (tempFilter) {
528         bFilter.reset();
529
530         if (tempFilter->getStatus() == BFILTER_COMPACTING ||
531              tempFilter->getStatus() == BFILTER_ENABLED) {
532             bFilter = std::move(tempFilter);
533             bFilter->setStatus(BFILTER_ENABLED);
534         }
535         tempFilter.reset();
536     }
537 }
538
539 void VBucket::clearFilter() {
540     LockHolder lh(bfMutex);
541     bFilter.reset();
542     tempFilter.reset();
543 }
544
545 void VBucket::setFilterStatus(bfilter_status_t to) {
546     LockHolder lh(bfMutex);
547     if (bFilter) {
548         bFilter->setStatus(to);
549     }
550     if (tempFilter) {
551         tempFilter->setStatus(to);
552     }
553 }
554
555 std::string VBucket::getFilterStatusString() {
556     LockHolder lh(bfMutex);
557     if (bFilter) {
558         return bFilter->getStatusString();
559     } else if (tempFilter) {
560         return tempFilter->getStatusString();
561     } else {
562         return "DOESN'T EXIST";
563     }
564 }
565
566 size_t VBucket::getFilterSize() {
567     LockHolder lh(bfMutex);
568     if (bFilter) {
569         return bFilter->getFilterSize();
570     } else {
571         return 0;
572     }
573 }
574
575 size_t VBucket::getNumOfKeysInFilter() {
576     LockHolder lh(bfMutex);
577     if (bFilter) {
578         return bFilter->getNumOfKeysInFilter();
579     } else {
580         return 0;
581     }
582 }
583
584 VBNotifyCtx VBucket::queueDirty(
585         StoredValue& v,
586         const GenerateBySeqno generateBySeqno,
587         const GenerateCas generateCas,
588         const bool isBackfillItem,
589         PreLinkDocumentContext* preLinkDocumentContext) {
590     VBNotifyCtx notifyCtx;
591
592     queued_item qi(v.toItem(false, getId()));
593
594     if (isBackfillItem) {
595         queueBackfillItem(qi, generateBySeqno);
596         notifyCtx.notifyFlusher = true;
597         /* During backfill on a TAP receiver we need to update the snapshot
598          range in the checkpoint. Has to be done here because in case of TAP
599          backfill, above, we use vb.queueBackfillItem() instead of
600          vb.checkpointManager.queueDirty() */
601         if (generateBySeqno == GenerateBySeqno::Yes) {
602             checkpointManager.resetSnapshotRange();
603         }
604     } else {
605         notifyCtx.notifyFlusher =
606                 checkpointManager.queueDirty(*this,
607                                              qi,
608                                              generateBySeqno,
609                                              generateCas,
610                                              preLinkDocumentContext);
611         notifyCtx.notifyReplication = true;
612         if (GenerateCas::Yes == generateCas) {
613             v.setCas(qi->getCas());
614         }
615     }
616
617     v.setBySeqno(qi->getBySeqno());
618     notifyCtx.bySeqno = qi->getBySeqno();
619
620     return notifyCtx;
621 }
622
623 StoredValue* VBucket::fetchValidValue(HashTable::HashBucketLock& hbl,
624                                       const DocKey& key,
625                                       WantsDeleted wantsDeleted,
626                                       TrackReference trackReference,
627                                       QueueExpired queueExpired) {
628     if (!hbl.getHTLock()) {
629         throw std::logic_error(
630                 "Hash bucket lock not held in "
631                 "VBucket::fetchValidValue() for hash bucket: " +
632                 std::to_string(hbl.getBucketNum()) + "for key: " +
633                 std::string(reinterpret_cast<const char*>(key.data()),
634                             key.size()));
635     }
636     StoredValue* v = ht.unlocked_find(
637             key, hbl.getBucketNum(), wantsDeleted, trackReference);
638     if (v && !v->isDeleted() && !v->isTempItem()) {
639         // In the deleted case, we ignore expiration time.
640         if (v->isExpired(ep_real_time())) {
641             if (getState() != vbucket_state_active) {
642                 return wantsDeleted == WantsDeleted::Yes ? v : NULL;
643             }
644
645             // queueDirty only allowed on active VB
646             if (queueExpired == QueueExpired::Yes &&
647                 getState() == vbucket_state_active) {
648                 incExpirationStat(ExpireBy::Access);
649                 handlePreExpiry(*v);
650                 VBNotifyCtx notifyCtx;
651                 std::tie(std::ignore, v, notifyCtx) =
652                         processExpiredItem(hbl, *v);
653                 notifyNewSeqno(notifyCtx);
654             }
655             return wantsDeleted == WantsDeleted::Yes ? v : NULL;
656         }
657     }
658     return v;
659 }
660
661 void VBucket::incExpirationStat(const ExpireBy source) {
662     switch (source) {
663     case ExpireBy::Pager:
664         ++stats.expired_pager;
665         break;
666     case ExpireBy::Compactor:
667         ++stats.expired_compactor;
668         break;
669     case ExpireBy::Access:
670         ++stats.expired_access;
671         break;
672     }
673     ++numExpiredItems;
674 }
675
676 MutationStatus VBucket::setFromInternal(Item& itm) {
677     return ht.set(itm);
678 }
679
680 ENGINE_ERROR_CODE VBucket::set(Item& itm,
681                                const void* cookie,
682                                EventuallyPersistentEngine& engine,
683                                const int bgFetchDelay) {
684     bool cas_op = (itm.getCas() != 0);
685     auto hbl = ht.getLockedBucket(itm.getKey());
686     StoredValue* v = ht.unlocked_find(itm.getKey(),
687                                       hbl.getBucketNum(),
688                                       WantsDeleted::Yes,
689                                       TrackReference::No);
690     if (v && v->isLocked(ep_current_time()) &&
691         (getState() == vbucket_state_replica ||
692          getState() == vbucket_state_pending)) {
693         v->unlock();
694     }
695
696     bool maybeKeyExists = true;
697     // If we didn't find a valid item, check Bloomfilter's prediction if in
698     // full eviction policy and for a CAS operation.
699     if ((v == nullptr || v->isTempInitialItem()) &&
700         (eviction == FULL_EVICTION) && (itm.getCas() != 0)) {
701         // Check Bloomfilter's prediction
702         if (!maybeKeyExistsInFilter(itm.getKey())) {
703             maybeKeyExists = false;
704         }
705     }
706
707     PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
708     VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
709                                GenerateCas::Yes,
710                                TrackCasDrift::No,
711                                /*isBackfillItem*/ false,
712                                &preLinkDocumentContext);
713
714     MutationStatus status;
715     VBNotifyCtx notifyCtx;
716     std::tie(status, notifyCtx) = processSet(hbl,
717                                              v,
718                                              itm,
719                                              itm.getCas(),
720                                              /*allowExisting*/ true,
721                                              /*hashMetaData*/ false,
722                                              &queueItmCtx,
723                                              maybeKeyExists);
724
725     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
726     switch (status) {
727     case MutationStatus::NoMem:
728         ret = ENGINE_ENOMEM;
729         break;
730     case MutationStatus::InvalidCas:
731         ret = ENGINE_KEY_EEXISTS;
732         break;
733     case MutationStatus::IsLocked:
734         ret = ENGINE_LOCKED;
735         break;
736     case MutationStatus::NotFound:
737         if (cas_op) {
738             ret = ENGINE_KEY_ENOENT;
739             break;
740         }
741     // FALLTHROUGH
742     case MutationStatus::WasDirty:
743     // Even if the item was dirty, push it into the vbucket's open
744     // checkpoint.
745     case MutationStatus::WasClean:
746         notifyNewSeqno(notifyCtx);
747
748         itm.setBySeqno(v->getBySeqno());
749         itm.setCas(v->getCas());
750         break;
751     case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
752         // +
753         // full eviction.
754         if (v) {
755             // temp item is already created. Simply schedule a bg fetch job
756             hbl.getHTLock().unlock();
757             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
758             return ENGINE_EWOULDBLOCK;
759         }
760         ret = addTempItemAndBGFetch(
761                 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
762         break;
763     }
764     }
765
766     return ret;
767 }
768
769 ENGINE_ERROR_CODE VBucket::replace(Item& itm,
770                                    const void* cookie,
771                                    EventuallyPersistentEngine& engine,
772                                    const int bgFetchDelay) {
773     auto hbl = ht.getLockedBucket(itm.getKey());
774     StoredValue* v = ht.unlocked_find(itm.getKey(),
775                                       hbl.getBucketNum(),
776                                       WantsDeleted::Yes,
777                                       TrackReference::No);
778     if (v) {
779         if (v->isDeleted() || v->isTempDeletedItem() ||
780             v->isTempNonExistentItem()) {
781             return ENGINE_KEY_ENOENT;
782         }
783
784         MutationStatus mtype;
785         VBNotifyCtx notifyCtx;
786         if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
787             mtype = MutationStatus::NeedBgFetch;
788         } else {
789             PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
790             VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
791                                        GenerateCas::Yes,
792                                        TrackCasDrift::No,
793                                        /*isBackfillItem*/ false,
794                                        &preLinkDocumentContext);
795             std::tie(mtype, notifyCtx) = processSet(hbl,
796                                                     v,
797                                                     itm,
798                                                     0,
799                                                     /*allowExisting*/ true,
800                                                     /*hasMetaData*/ false,
801                                                     &queueItmCtx);
802         }
803
804         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
805         switch (mtype) {
806         case MutationStatus::NoMem:
807             ret = ENGINE_ENOMEM;
808             break;
809         case MutationStatus::IsLocked:
810             ret = ENGINE_LOCKED;
811             break;
812         case MutationStatus::InvalidCas:
813         case MutationStatus::NotFound:
814             ret = ENGINE_NOT_STORED;
815             break;
816         // FALLTHROUGH
817         case MutationStatus::WasDirty:
818         // Even if the item was dirty, push it into the vbucket's open
819         // checkpoint.
820         case MutationStatus::WasClean:
821             notifyNewSeqno(notifyCtx);
822
823             itm.setBySeqno(v->getBySeqno());
824             itm.setCas(v->getCas());
825             break;
826         case MutationStatus::NeedBgFetch: {
827             // temp item is already created. Simply schedule a bg fetch job
828             hbl.getHTLock().unlock();
829             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
830             ret = ENGINE_EWOULDBLOCK;
831             break;
832         }
833         }
834
835         return ret;
836     } else {
837         if (eviction == VALUE_ONLY) {
838             return ENGINE_KEY_ENOENT;
839         }
840
841         if (maybeKeyExistsInFilter(itm.getKey())) {
842             return addTempItemAndBGFetch(
843                     hbl, itm.getKey(), cookie, engine, bgFetchDelay, false);
844         } else {
845             // As bloomfilter predicted that item surely doesn't exist
846             // on disk, return ENOENT for replace().
847             return ENGINE_KEY_ENOENT;
848         }
849     }
850 }
851
852 ENGINE_ERROR_CODE VBucket::addBackfillItem(Item& itm,
853                                            const GenerateBySeqno genBySeqno) {
854     auto hbl = ht.getLockedBucket(itm.getKey());
855     StoredValue* v = ht.unlocked_find(itm.getKey(),
856                                       hbl.getBucketNum(),
857                                       WantsDeleted::Yes,
858                                       TrackReference::No);
859
860     // Note that this function is only called on replica or pending vbuckets.
861     if (v && v->isLocked(ep_current_time())) {
862         v->unlock();
863     }
864
865     VBQueueItemCtx queueItmCtx(genBySeqno,
866                                GenerateCas::No,
867                                TrackCasDrift::No,
868                                /*isBackfillItem*/ true,
869                                nullptr /* No pre link should happen */);
870     MutationStatus status;
871     VBNotifyCtx notifyCtx;
872     std::tie(status, notifyCtx) = processSet(hbl,
873                                              v,
874                                              itm,
875                                              0,
876                                              /*allowExisting*/ true,
877                                              /*hasMetaData*/ true,
878                                              &queueItmCtx);
879
880     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
881     switch (status) {
882     case MutationStatus::NoMem:
883         ret = ENGINE_ENOMEM;
884         break;
885     case MutationStatus::InvalidCas:
886     case MutationStatus::IsLocked:
887         ret = ENGINE_KEY_EEXISTS;
888         break;
889     case MutationStatus::WasDirty:
890     // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
891     // set correctly, and also the sequence numbers are ordered correctly.
892     // (MB-14003)
893     case MutationStatus::NotFound:
894     // FALLTHROUGH
895     case MutationStatus::WasClean: {
896         setMaxCas(v->getCas());
897         // we unlock ht lock here because we want to avoid potential lock
898         // inversions arising from notifyNewSeqno() call
899         hbl.getHTLock().unlock();
900         notifyNewSeqno(notifyCtx);
901     } break;
902     case MutationStatus::NeedBgFetch:
903         throw std::logic_error(
904                 "VBucket::addBackfillItem: "
905                 "SET on a non-active vbucket should not require a "
906                 "bg_metadata_fetch.");
907     }
908
909     return ret;
910 }
911
912 ENGINE_ERROR_CODE VBucket::setWithMeta(Item& itm,
913                                        const uint64_t cas,
914                                        uint64_t* seqno,
915                                        const void* cookie,
916                                        EventuallyPersistentEngine& engine,
917                                        const int bgFetchDelay,
918                                        const bool force,
919                                        const bool allowExisting,
920                                        const GenerateBySeqno genBySeqno,
921                                        const GenerateCas genCas,
922                                        const bool isReplication) {
923     auto hbl = ht.getLockedBucket(itm.getKey());
924     StoredValue* v = ht.unlocked_find(itm.getKey(),
925                                       hbl.getBucketNum(),
926                                       WantsDeleted::Yes,
927                                       TrackReference::No);
928
929     bool maybeKeyExists = true;
930     if (!force) {
931         if (v) {
932             if (v->isTempInitialItem()) {
933                 bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
934                 return ENGINE_EWOULDBLOCK;
935             }
936
937             if (!(conflictResolver->resolve(*v,
938                                             itm.getMetaData(),
939                                             itm.getDataType(),
940                                             itm.isDeleted()))) {
941                 ++stats.numOpsSetMetaResolutionFailed;
942                 return ENGINE_KEY_EEXISTS;
943             }
944         } else {
945             if (maybeKeyExistsInFilter(itm.getKey())) {
946                 return addTempItemAndBGFetch(hbl,
947                                              itm.getKey(),
948                                              cookie,
949                                              engine,
950                                              bgFetchDelay,
951                                              true,
952                                              isReplication);
953             } else {
954                 maybeKeyExists = false;
955             }
956         }
957     } else {
958         if (eviction == FULL_EVICTION) {
959             // Check Bloomfilter's prediction
960             if (!maybeKeyExistsInFilter(itm.getKey())) {
961                 maybeKeyExists = false;
962             }
963         }
964     }
965
966     if (v && v->isLocked(ep_current_time()) &&
967         (getState() == vbucket_state_replica ||
968          getState() == vbucket_state_pending)) {
969         v->unlock();
970     }
971
972     VBQueueItemCtx queueItmCtx(genBySeqno,
973                                genCas,
974                                TrackCasDrift::Yes,
975                                /*isBackfillItem*/ false,
976                                nullptr /* No pre link step needed */);
977     MutationStatus status;
978     VBNotifyCtx notifyCtx;
979     std::tie(status, notifyCtx) = processSet(hbl,
980                                              v,
981                                              itm,
982                                              cas,
983                                              allowExisting,
984                                              true,
985                                              &queueItmCtx,
986                                              maybeKeyExists,
987                                              isReplication);
988
989     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
990     switch (status) {
991     case MutationStatus::NoMem:
992         ret = ENGINE_ENOMEM;
993         break;
994     case MutationStatus::InvalidCas:
995         ret = ENGINE_KEY_EEXISTS;
996         break;
997     case MutationStatus::IsLocked:
998         ret = ENGINE_LOCKED;
999         break;
1000     case MutationStatus::WasDirty:
1001     case MutationStatus::WasClean: {
1002         if (seqno) {
1003             *seqno = static_cast<uint64_t>(v->getBySeqno());
1004         }
1005         // we unlock ht lock here because we want to avoid potential lock
1006         // inversions arising from notifyNewSeqno() call
1007         hbl.getHTLock().unlock();
1008         notifyNewSeqno(notifyCtx);
1009     } break;
1010     case MutationStatus::NotFound:
1011         ret = ENGINE_KEY_ENOENT;
1012         break;
1013     case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
1014         // + full eviction.
1015         if (v) { // temp item is already created. Simply schedule a
1016             hbl.getHTLock().unlock(); // bg fetch job.
1017             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1018             return ENGINE_EWOULDBLOCK;
1019         }
1020         ret = addTempItemAndBGFetch(hbl,
1021                                     itm.getKey(),
1022                                     cookie,
1023                                     engine,
1024                                     bgFetchDelay,
1025                                     true,
1026                                     isReplication);
1027     }
1028     }
1029
1030     return ret;
1031 }
1032
1033 ENGINE_ERROR_CODE VBucket::deleteItem(const DocKey& key,
1034                                       uint64_t& cas,
1035                                       const void* cookie,
1036                                       EventuallyPersistentEngine& engine,
1037                                       const int bgFetchDelay,
1038                                       Item* itm,
1039                                       ItemMetaData* itemMeta,
1040                                       mutation_descr_t* mutInfo) {
1041     auto hbl = ht.getLockedBucket(key);
1042     StoredValue* v = ht.unlocked_find(
1043             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1044
1045     if (!itm) {
1046         if (!v || v->isDeleted() || v->isTempItem()) {
1047             if (eviction == VALUE_ONLY) {
1048                 return ENGINE_KEY_ENOENT;
1049             } else { // Full eviction.
1050                 if (!v) { // Item might be evicted from cache.
1051                     if (maybeKeyExistsInFilter(key)) {
1052                         return addTempItemAndBGFetch(
1053                                 hbl, key, cookie, engine, bgFetchDelay, true);
1054                     } else {
1055                         // As bloomfilter predicted that item surely doesn't
1056                         // exist on disk, return ENOENT for deleteItem().
1057                         return ENGINE_KEY_ENOENT;
1058                     }
1059                 } else if (v->isTempInitialItem()) {
1060                     hbl.getHTLock().unlock();
1061                     bgFetch(key, cookie, engine, bgFetchDelay, true);
1062                     return ENGINE_EWOULDBLOCK;
1063                 } else { // Non-existent or deleted key.
1064                     if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1065                         // Delete a temp non-existent item to ensure that
1066                         // if a delete were issued over an item that doesn't
1067                         // exist, then we don't preserve a temp item.
1068                         deleteStoredValue(hbl, *v);
1069                     }
1070                     return ENGINE_KEY_ENOENT;
1071                 }
1072             }
1073         }
1074     } else {
1075         /* if an item containing a deleted value is present, set the value
1076          * as part of the stored value so that a value is set as part of the
1077          * delete.
1078          */
1079         if (v) {
1080             if (cas != 0 && cas != v->getCas()) {
1081                 return ENGINE_KEY_EEXISTS;
1082             }
1083             itm->setRevSeqno(v->getRevSeqno());
1084             v->setValue(*itm, ht);
1085         } else {
1086             /* retrieve the item, if it is present in disk, for CAS comparison */
1087             if (maybeKeyExistsInFilter(key)) {
1088                 return addTempItemAndBGFetch(
1089                                 hbl, key, cookie, engine, bgFetchDelay, true);
1090             } else {
1091                 AddStatus rv = addTempStoredValue(hbl, key);
1092                 if (rv == AddStatus::NoMem) {
1093                     return ENGINE_ENOMEM;
1094                 }
1095                 v = ht.unlocked_find(key,
1096                                      hbl.getBucketNum(),
1097                                      WantsDeleted::Yes,
1098                                      TrackReference::No);
1099                 v->setValue(*itm, ht);
1100                 /* Due to the above setValue() v is no longer a temp stored value*/
1101             }
1102         }
1103     }
1104
1105     if (v->isLocked(ep_current_time()) &&
1106         (getState() == vbucket_state_replica ||
1107          getState() == vbucket_state_pending)) {
1108         v->unlock();
1109     }
1110
1111     if (itemMeta != nullptr) {
1112         itemMeta->cas = v->getCas();
1113     }
1114
1115     MutationStatus delrv;
1116     VBNotifyCtx notifyCtx;
1117     if (v->isExpired(ep_real_time())) {
1118         std::tie(delrv, v, notifyCtx) = processExpiredItem(hbl, *v);
1119     } else {
1120         ItemMetaData metadata;
1121         metadata.revSeqno = v->getRevSeqno() + 1;
1122         std::tie(delrv, v, notifyCtx) =
1123                 processSoftDelete(hbl,
1124                                   *v,
1125                                   cas,
1126                                   metadata,
1127                                   VBQueueItemCtx(GenerateBySeqno::Yes,
1128                                                  GenerateCas::Yes,
1129                                                  TrackCasDrift::No,
1130                                                  /*isBackfillItem*/ false,
1131                                                  nullptr /* no pre link */),
1132                                   /*use_meta*/ false,
1133                                   /*bySeqno*/ v->getBySeqno());
1134     }
1135
1136     uint64_t seqno = 0;
1137     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1138     switch (delrv) {
1139     case MutationStatus::NoMem:
1140         ret = ENGINE_ENOMEM;
1141         break;
1142     case MutationStatus::InvalidCas:
1143         ret = ENGINE_KEY_EEXISTS;
1144         break;
1145     case MutationStatus::IsLocked:
1146         ret = ENGINE_LOCKED_TMPFAIL;
1147         break;
1148     case MutationStatus::NotFound:
1149         ret = ENGINE_KEY_ENOENT;
1150     /* Fallthrough:
1151      * A NotFound return value at this point indicates that the
1152      * item has expired. But, a deletion still needs to be queued
1153      * for the item in order to persist it.
1154      */
1155     case MutationStatus::WasClean:
1156     case MutationStatus::WasDirty:
1157         if (itemMeta != nullptr) {
1158             itemMeta->revSeqno = v->getRevSeqno();
1159             itemMeta->flags = v->getFlags();
1160             itemMeta->exptime = v->getExptime();
1161         }
1162
1163         notifyNewSeqno(notifyCtx);
1164         seqno = static_cast<uint64_t>(v->getBySeqno());
1165         cas = v->getCas();
1166
1167         if (delrv != MutationStatus::NotFound) {
1168             if (mutInfo) {
1169                 mutInfo->seqno = seqno;
1170                 mutInfo->vbucket_uuid = failovers->getLatestUUID();
1171             }
1172             if (itemMeta != nullptr) {
1173                 itemMeta->cas = v->getCas();
1174             }
1175         }
1176         break;
1177     case MutationStatus::NeedBgFetch:
1178         // We already figured out if a bg fetch is requred for a full-evicted
1179         // item above.
1180         throw std::logic_error(
1181                 "VBucket::deleteItem: "
1182                 "Unexpected NEEDS_BG_FETCH from processSoftDelete");
1183     }
1184     return ret;
1185 }
1186
1187 ENGINE_ERROR_CODE VBucket::deleteWithMeta(const DocKey& key,
1188                                           uint64_t& cas,
1189                                           uint64_t* seqno,
1190                                           const void* cookie,
1191                                           EventuallyPersistentEngine& engine,
1192                                           const int bgFetchDelay,
1193                                           const bool force,
1194                                           const ItemMetaData& itemMeta,
1195                                           const bool backfill,
1196                                           const GenerateBySeqno genBySeqno,
1197                                           const GenerateCas generateCas,
1198                                           const uint64_t bySeqno,
1199                                           const bool isReplication) {
1200     auto hbl = ht.getLockedBucket(key);
1201     StoredValue* v = ht.unlocked_find(
1202             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1203     if (!force) { // Need conflict resolution.
1204         if (v) {
1205             if (v->isTempInitialItem()) {
1206                 bgFetch(key, cookie, engine, bgFetchDelay, true);
1207                 return ENGINE_EWOULDBLOCK;
1208             }
1209
1210             if (!(conflictResolver->resolve(*v,
1211                                             itemMeta,
1212                                             PROTOCOL_BINARY_RAW_BYTES,
1213                                             true))) {
1214                 ++stats.numOpsDelMetaResolutionFailed;
1215                 return ENGINE_KEY_EEXISTS;
1216             }
1217         } else {
1218             // Item is 1) deleted or not existent in the value eviction case OR
1219             // 2) deleted or evicted in the full eviction.
1220             if (maybeKeyExistsInFilter(key)) {
1221                 return addTempItemAndBGFetch(hbl,
1222                                              key,
1223                                              cookie,
1224                                              engine,
1225                                              bgFetchDelay,
1226                                              true,
1227                                              isReplication);
1228             } else {
1229                 // Even though bloomfilter predicted that item doesn't exist
1230                 // on disk, we must put this delete on disk if the cas is valid.
1231                 AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1232                 if (rv == AddStatus::NoMem) {
1233                     return ENGINE_ENOMEM;
1234                 }
1235                 v = ht.unlocked_find(key,
1236                                      hbl.getBucketNum(),
1237                                      WantsDeleted::Yes,
1238                                      TrackReference::No);
1239                 v->setDeleted();
1240             }
1241         }
1242     } else {
1243         if (!v) {
1244             // We should always try to persist a delete here.
1245             AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1246             if (rv == AddStatus::NoMem) {
1247                 return ENGINE_ENOMEM;
1248             }
1249             v = ht.unlocked_find(key,
1250                                  hbl.getBucketNum(),
1251                                  WantsDeleted::Yes,
1252                                  TrackReference::No);
1253             v->setDeleted();
1254             v->setCas(cas);
1255         } else if (v->isTempInitialItem()) {
1256             v->setDeleted();
1257             v->setCas(cas);
1258         }
1259     }
1260
1261     if (v && v->isLocked(ep_current_time()) &&
1262         (getState() == vbucket_state_replica ||
1263          getState() == vbucket_state_pending)) {
1264         v->unlock();
1265     }
1266
1267     MutationStatus delrv;
1268     VBNotifyCtx notifyCtx;
1269     if (!v) {
1270         if (eviction == FULL_EVICTION) {
1271             delrv = MutationStatus::NeedBgFetch;
1272         } else {
1273             delrv = MutationStatus::NotFound;
1274         }
1275     } else {
1276         VBQueueItemCtx queueItmCtx(genBySeqno,
1277                                    generateCas,
1278                                    TrackCasDrift::Yes,
1279                                    backfill,
1280                                    nullptr /* No pre link step needed */);
1281         std::tie(delrv, v, notifyCtx) = processSoftDelete(hbl,
1282                                                           *v,
1283                                                           cas,
1284                                                           itemMeta,
1285                                                           queueItmCtx,
1286                                                           /*use_meta*/ true,
1287                                                           bySeqno);
1288     }
1289     cas = v ? v->getCas() : 0;
1290
1291     switch (delrv) {
1292     case MutationStatus::NoMem:
1293         return ENGINE_ENOMEM;
1294     case MutationStatus::InvalidCas:
1295         return ENGINE_KEY_EEXISTS;
1296     case MutationStatus::IsLocked:
1297         return ENGINE_LOCKED_TMPFAIL;
1298     case MutationStatus::NotFound:
1299         return ENGINE_KEY_ENOENT;
1300     case MutationStatus::WasDirty:
1301     case MutationStatus::WasClean: {
1302         if (seqno) {
1303             *seqno = static_cast<uint64_t>(v->getBySeqno());
1304         }
1305         // we unlock ht lock here because we want to avoid potential lock
1306         // inversions arising from notifyNewSeqno() call
1307         hbl.getHTLock().unlock();
1308         notifyNewSeqno(notifyCtx);
1309         break;
1310     }
1311     case MutationStatus::NeedBgFetch:
1312         hbl.getHTLock().unlock();
1313         bgFetch(key, cookie, engine, bgFetchDelay, true);
1314         return ENGINE_EWOULDBLOCK;
1315     }
1316     return ENGINE_SUCCESS;
1317 }
1318
1319 void VBucket::deleteExpiredItem(const DocKey& key,
1320                                 time_t startTime,
1321                                 uint64_t revSeqno,
1322                                 ExpireBy source) {
1323     auto hbl = ht.getLockedBucket(key);
1324     StoredValue* v = ht.unlocked_find(
1325             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1326     if (v) {
1327         if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1328             // This is a temporary item whose background fetch for metadata
1329             // has completed.
1330             bool deleted = deleteStoredValue(hbl, *v);
1331             if (!deleted) {
1332                 throw std::logic_error(
1333                         "VBucket::deleteExpiredItem: "
1334                         "Failed to delete seqno:" +
1335                         std::to_string(v->getBySeqno()) + " from bucket " +
1336                         std::to_string(hbl.getBucketNum()));
1337             }
1338         } else if (v->isExpired(startTime) && !v->isDeleted()) {
1339             handlePreExpiry(*v);
1340             VBNotifyCtx notifyCtx;
1341             std::tie(std::ignore, std::ignore, notifyCtx) =
1342                     processExpiredItem(hbl, *v);
1343             // we unlock ht lock here because we want to avoid potential lock
1344             // inversions arising from notifyNewSeqno() call
1345             hbl.getHTLock().unlock();
1346             notifyNewSeqno(notifyCtx);
1347         }
1348     } else {
1349         if (eviction == FULL_EVICTION) {
1350             // Create a temp item and delete and push it
1351             // into the checkpoint queue, only if the bloomfilter
1352             // predicts that the item may exist on disk.
1353             if (maybeKeyExistsInFilter(key)) {
1354                 AddStatus rv = addTempStoredValue(hbl, key);
1355                 if (rv == AddStatus::NoMem) {
1356                     return;
1357                 }
1358                 v = ht.unlocked_find(key,
1359                                      hbl.getBucketNum(),
1360                                      WantsDeleted::Yes,
1361                                      TrackReference::No);
1362                 v->setDeleted();
1363                 v->setRevSeqno(revSeqno);
1364                 VBNotifyCtx notifyCtx;
1365                 std::tie(std::ignore, std::ignore, notifyCtx) =
1366                         processExpiredItem(hbl, *v);
1367                 // we unlock ht lock here because we want to avoid potential
1368                 // lock inversions arising from notifyNewSeqno() call
1369                 hbl.getHTLock().unlock();
1370                 notifyNewSeqno(notifyCtx);
1371             }
1372         }
1373     }
1374     incExpirationStat(source);
1375 }
1376
1377 ENGINE_ERROR_CODE VBucket::add(Item& itm,
1378                                const void* cookie,
1379                                EventuallyPersistentEngine& engine,
1380                                const int bgFetchDelay) {
1381     auto hbl = ht.getLockedBucket(itm.getKey());
1382     StoredValue* v = ht.unlocked_find(itm.getKey(),
1383                                       hbl.getBucketNum(),
1384                                       WantsDeleted::Yes,
1385                                       TrackReference::No);
1386
1387     bool maybeKeyExists = true;
1388     if ((v == nullptr || v->isTempInitialItem()) &&
1389         (eviction == FULL_EVICTION)) {
1390         // Check bloomfilter's prediction
1391         if (!maybeKeyExistsInFilter(itm.getKey())) {
1392             maybeKeyExists = false;
1393         }
1394     }
1395
1396     PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
1397     VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
1398                                GenerateCas::Yes,
1399                                TrackCasDrift::No,
1400                                /*isBackfillItem*/ false,
1401                                &preLinkDocumentContext);
1402     AddStatus status;
1403     VBNotifyCtx notifyCtx;
1404     std::tie(status, notifyCtx) =
1405             processAdd(hbl, v, itm, maybeKeyExists, false, &queueItmCtx);
1406
1407     switch (status) {
1408     case AddStatus::NoMem:
1409         return ENGINE_ENOMEM;
1410     case AddStatus::Exists:
1411         return ENGINE_NOT_STORED;
1412     case AddStatus::AddTmpAndBgFetch:
1413         return addTempItemAndBGFetch(
1414                 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
1415     case AddStatus::BgFetch:
1416         hbl.getHTLock().unlock();
1417         bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1418         return ENGINE_EWOULDBLOCK;
1419     case AddStatus::Success:
1420     case AddStatus::UnDel:
1421         notifyNewSeqno(notifyCtx);
1422         itm.setBySeqno(v->getBySeqno());
1423         itm.setCas(v->getCas());
1424         break;
1425     }
1426     return ENGINE_SUCCESS;
1427 }
1428
1429 GetValue VBucket::getAndUpdateTtl(const DocKey& key,
1430                                   const void* cookie,
1431                                   EventuallyPersistentEngine& engine,
1432                                   int bgFetchDelay,
1433                                   time_t exptime) {
1434     auto hbl = ht.getLockedBucket(key);
1435     StoredValue* v = fetchValidValue(hbl,
1436                                      key,
1437                                      WantsDeleted::Yes,
1438                                      TrackReference::Yes,
1439                                      QueueExpired::Yes);
1440
1441     if (v) {
1442         if (v->isDeleted() || v->isTempDeletedItem() ||
1443             v->isTempNonExistentItem()) {
1444             return {};
1445         }
1446
1447         if (!v->isResident()) {
1448             bgFetch(key, cookie, engine, bgFetchDelay);
1449             return GetValue(nullptr, ENGINE_EWOULDBLOCK, v->getBySeqno());
1450         }
1451         if (v->isLocked(ep_current_time())) {
1452             return GetValue(nullptr, ENGINE_KEY_EEXISTS, 0);
1453         }
1454
1455         const bool exptime_mutated = exptime != v->getExptime();
1456         if (exptime_mutated) {
1457             v->markDirty();
1458             v->setExptime(exptime);
1459             v->setRevSeqno(v->getRevSeqno() + 1);
1460         }
1461
1462         GetValue rv(
1463                 v->toItem(v->isLocked(ep_current_time()), getId()).release(),
1464                 ENGINE_SUCCESS,
1465                 v->getBySeqno());
1466
1467         if (exptime_mutated) {
1468             VBNotifyCtx notifyCtx = queueDirty(*v);
1469             // we unlock ht lock here because we want to avoid potential lock
1470             // inversions arising from notifyNewSeqno() call
1471             hbl.getHTLock().unlock();
1472             notifyNewSeqno(notifyCtx);
1473         }
1474
1475         return rv;
1476     } else {
1477         if (eviction == VALUE_ONLY) {
1478             return {};
1479         } else {
1480             if (maybeKeyExistsInFilter(key)) {
1481                 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1482                         hbl, key, cookie, engine, bgFetchDelay, false);
1483                 return GetValue(NULL, ec, -1, true);
1484             } else {
1485                 // As bloomfilter predicted that item surely doesn't exist
1486                 // on disk, return ENOENT for getAndUpdateTtl().
1487                 return {};
1488             }
1489         }
1490     }
1491 }
1492
1493 MutationStatus VBucket::insertFromWarmup(Item& itm,
1494                                          bool eject,
1495                                          bool keyMetaDataOnly) {
1496     if (!StoredValue::hasAvailableSpace(stats, itm)) {
1497         return MutationStatus::NoMem;
1498     }
1499
1500     auto hbl = ht.getLockedBucket(itm.getKey());
1501     StoredValue* v = ht.unlocked_find(itm.getKey(),
1502                                       hbl.getBucketNum(),
1503                                       WantsDeleted::Yes,
1504                                       TrackReference::No);
1505
1506     if (v == NULL) {
1507         v = addNewStoredValue(hbl, itm, /*queueItmCtx*/ nullptr).first;
1508         if (keyMetaDataOnly) {
1509             v->markNotResident();
1510             /* For now ht stats are updated from outside ht. This seems to be
1511                a better option for now than passing a flag to
1512                addNewStoredValue() just for this func */
1513             ++(ht.numNonResidentItems);
1514         }
1515         /* For now ht stats are updated from outside ht. This seems to be
1516            a better option for now than passing a flag to
1517            addNewStoredValue() just for this func.
1518            We need to decrNumTotalItems because ht.numTotalItems is already
1519            set by warmup when it estimated the item count from disk */
1520         ht.decrNumTotalItems();
1521         v->setNewCacheItem(false);
1522     } else {
1523         if (keyMetaDataOnly) {
1524             // We don't have a better error code ;)
1525             return MutationStatus::InvalidCas;
1526         }
1527
1528         // Verify that the CAS isn't changed
1529         if (v->getCas() != itm.getCas()) {
1530             if (v->getCas() == 0) {
1531                 v->setCas(itm.getCas());
1532                 v->setFlags(itm.getFlags());
1533                 v->setExptime(itm.getExptime());
1534                 v->setRevSeqno(itm.getRevSeqno());
1535             } else {
1536                 return MutationStatus::InvalidCas;
1537             }
1538         }
1539         updateStoredValue(hbl, *v, itm, /*queueItmCtx*/ nullptr);
1540     }
1541
1542     v->markClean();
1543
1544     if (eject && !keyMetaDataOnly) {
1545         ht.unlocked_ejectItem(v, eviction);
1546     }
1547
1548     return MutationStatus::NotFound;
1549 }
1550
1551 GetValue VBucket::getInternal(const DocKey& key,
1552                               const void* cookie,
1553                               EventuallyPersistentEngine& engine,
1554                               int bgFetchDelay,
1555                               get_options_t options,
1556                               bool diskFlushAll) {
1557     const TrackReference trackReference = (options & TRACK_REFERENCE)
1558                                                   ? TrackReference::Yes
1559                                                   : TrackReference::No;
1560     const bool getDeletedValue = (options & GET_DELETED_VALUE);
1561     auto hbl = ht.getLockedBucket(key);
1562     StoredValue* v = fetchValidValue(
1563             hbl, key, WantsDeleted::Yes, trackReference, QueueExpired::Yes);
1564     if (v) {
1565         if (v->isDeleted() && !getDeletedValue) {
1566             return GetValue();
1567         }
1568         if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1569             // Delete a temp non-existent item to ensure that
1570             // if the get were issued over an item that doesn't
1571             // exist, then we dont preserve a temp item.
1572             if (options & DELETE_TEMP) {
1573                 deleteStoredValue(hbl, *v);
1574             }
1575             return GetValue();
1576         }
1577
1578         // If the value is not resident, wait for it...
1579         if (!v->isResident()) {
1580             return getInternalNonResident(
1581                     key, cookie, engine, bgFetchDelay, options, *v);
1582         }
1583
1584         // Should we hide (return -1) for the items' CAS?
1585         const bool hide_cas =
1586                 (options & HIDE_LOCKED_CAS) && v->isLocked(ep_current_time());
1587         return GetValue(v->toItem(hide_cas, getId()).release(),
1588                         ENGINE_SUCCESS,
1589                         v->getBySeqno(),
1590                         false,
1591                         v->getNRUValue());
1592     } else {
1593         if (!getDeletedValue && (eviction == VALUE_ONLY || diskFlushAll)) {
1594             return GetValue();
1595         }
1596
1597         if (maybeKeyExistsInFilter(key)) {
1598             ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1599             if (options &
1600                 QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
1601                 ec = addTempItemAndBGFetch(
1602                         hbl, key, cookie, engine, bgFetchDelay, false);
1603             }
1604             return GetValue(NULL, ec, -1, true);
1605         } else {
1606             // As bloomfilter predicted that item surely doesn't exist
1607             // on disk, return ENOENT, for getInternal().
1608             return GetValue();
1609         }
1610     }
1611 }
1612
1613 ENGINE_ERROR_CODE VBucket::getMetaData(const DocKey& key,
1614                                        const void* cookie,
1615                                        EventuallyPersistentEngine& engine,
1616                                        int bgFetchDelay,
1617                                        bool fetchDatatype,
1618                                        ItemMetaData& metadata,
1619                                        uint32_t& deleted,
1620                                        uint8_t& datatype) {
1621     deleted = 0;
1622     auto hbl = ht.getLockedBucket(key);
1623     StoredValue* v = ht.unlocked_find(
1624             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1625
1626     if (v) {
1627         stats.numOpsGetMeta++;
1628         if (v->isTempInitialItem() || (fetchDatatype && !v->isResident())) {
1629             // Need bg meta fetch.
1630             bgFetch(key, cookie, engine, bgFetchDelay, !fetchDatatype);
1631             return ENGINE_EWOULDBLOCK;
1632         } else if (v->isTempNonExistentItem()) {
1633             metadata.cas = v->getCas();
1634             return ENGINE_KEY_ENOENT;
1635         } else {
1636             if (v->isTempDeletedItem() || v->isDeleted() ||
1637                 v->isExpired(ep_real_time())) {
1638                 deleted |= GET_META_ITEM_DELETED_FLAG;
1639             }
1640
1641             if (v->isLocked(ep_current_time())) {
1642                 metadata.cas = static_cast<uint64_t>(-1);
1643             } else {
1644                 metadata.cas = v->getCas();
1645             }
1646             metadata.flags = v->getFlags();
1647             metadata.exptime = v->getExptime();
1648             metadata.revSeqno = v->getRevSeqno();
1649
1650             if (fetchDatatype) {
1651                 value_t value = v->getValue();
1652                 if (value) {
1653                     datatype = value->getDataType();
1654                 }
1655             }
1656
1657             return ENGINE_SUCCESS;
1658         }
1659     } else {
1660         // The key wasn't found. However, this may be because it was previously
1661         // deleted or evicted with the full eviction strategy.
1662         // So, add a temporary item corresponding to the key to the hash table
1663         // and schedule a background fetch for its metadata from the persistent
1664         // store. The item's state will be updated after the fetch completes.
1665         //
1666         // Schedule this bgFetch only if the key is predicted to be may-be
1667         // existent on disk by the bloomfilter.
1668
1669         if (maybeKeyExistsInFilter(key)) {
1670             return addTempItemAndBGFetch(
1671                     hbl, key, cookie, engine, bgFetchDelay, !fetchDatatype);
1672         } else {
1673             stats.numOpsGetMeta++;
1674             return ENGINE_KEY_ENOENT;
1675         }
1676     }
1677 }
1678
1679 ENGINE_ERROR_CODE VBucket::getKeyStats(const DocKey& key,
1680                                        const void* cookie,
1681                                        EventuallyPersistentEngine& engine,
1682                                        int bgFetchDelay,
1683                                        struct key_stats& kstats,
1684                                        WantsDeleted wantsDeleted) {
1685     auto hbl = ht.getLockedBucket(key);
1686     StoredValue* v = fetchValidValue(hbl,
1687                                      key,
1688                                      WantsDeleted::Yes,
1689                                      TrackReference::Yes,
1690                                      QueueExpired::Yes);
1691
1692     if (v) {
1693         if ((v->isDeleted() && wantsDeleted == WantsDeleted::No) ||
1694             v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1695             return ENGINE_KEY_ENOENT;
1696         }
1697         if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
1698             hbl.getHTLock().unlock();
1699             bgFetch(key, cookie, engine, bgFetchDelay, true);
1700             return ENGINE_EWOULDBLOCK;
1701         }
1702         kstats.logically_deleted = v->isDeleted();
1703         kstats.dirty = v->isDirty();
1704         kstats.exptime = v->getExptime();
1705         kstats.flags = v->getFlags();
1706         kstats.cas = v->getCas();
1707         kstats.vb_state = getState();
1708         return ENGINE_SUCCESS;
1709     } else {
1710         if (eviction == VALUE_ONLY) {
1711             return ENGINE_KEY_ENOENT;
1712         } else {
1713             if (maybeKeyExistsInFilter(key)) {
1714                 return addTempItemAndBGFetch(
1715                         hbl, key, cookie, engine, bgFetchDelay, true);
1716             } else {
1717                 // If bgFetch were false, or bloomfilter predicted that
1718                 // item surely doesn't exist on disk, return ENOENT for
1719                 // getKeyStats().
1720                 return ENGINE_KEY_ENOENT;
1721             }
1722         }
1723     }
1724 }
1725
1726 GetValue VBucket::getLocked(const DocKey& key,
1727                             rel_time_t currentTime,
1728                             uint32_t lockTimeout,
1729                             const void* cookie,
1730                             EventuallyPersistentEngine& engine,
1731                             int bgFetchDelay) {
1732     auto hbl = ht.getLockedBucket(key);
1733     StoredValue* v = fetchValidValue(hbl,
1734                                      key,
1735                                      WantsDeleted::Yes,
1736                                      TrackReference::Yes,
1737                                      QueueExpired::Yes);
1738
1739     if (v) {
1740         if (v->isDeleted() || v->isTempNonExistentItem() ||
1741             v->isTempDeletedItem()) {
1742             return GetValue(NULL, ENGINE_KEY_ENOENT);
1743         }
1744
1745         // if v is locked return error
1746         if (v->isLocked(currentTime)) {
1747             return GetValue(NULL, ENGINE_TMPFAIL);
1748         }
1749
1750         // If the value is not resident, wait for it...
1751         if (!v->isResident()) {
1752             if (cookie) {
1753                 bgFetch(key, cookie, engine, bgFetchDelay);
1754             }
1755             return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
1756         }
1757
1758         // acquire lock and increment cas value
1759         v->lock(currentTime + lockTimeout);
1760
1761         auto it = v->toItem(false, getId());
1762         it->setCas(nextHLCCas());
1763         v->setCas(it->getCas());
1764
1765         return GetValue(it.release());
1766
1767     } else {
1768         // No value found in the hashtable.
1769         switch (eviction) {
1770         case VALUE_ONLY:
1771             return GetValue(NULL, ENGINE_KEY_ENOENT);
1772
1773         case FULL_EVICTION:
1774             if (maybeKeyExistsInFilter(key)) {
1775                 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1776                         hbl, key, cookie, engine, bgFetchDelay, false);
1777                 return GetValue(NULL, ec, -1, true);
1778             } else {
1779                 // As bloomfilter predicted that item surely doesn't exist
1780                 // on disk, return ENOENT for getLocked().
1781                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1782             }
1783         }
1784         return GetValue(); // just to prevent compiler warning
1785     }
1786 }
1787
1788 void VBucket::deletedOnDiskCbk(const Item& queuedItem, bool deleted) {
1789     auto hbl = ht.getLockedBucket(queuedItem.getKey());
1790     StoredValue* v = fetchValidValue(hbl,
1791                                      queuedItem.getKey(),
1792                                      WantsDeleted::Yes,
1793                                      TrackReference::No,
1794                                      QueueExpired::Yes);
1795     // Delete the item in the hash table iff:
1796     //  1. Item is existent in hashtable, and deleted flag is true
1797     //  2. rev seqno of queued item matches rev seqno of hash table item
1798     if (v && v->isDeleted() && (queuedItem.getRevSeqno() == v->getRevSeqno())) {
1799         bool isDeleted = deleteStoredValue(hbl, *v);
1800         if (!isDeleted) {
1801             throw std::logic_error(
1802                     "deletedOnDiskCbk:callback: "
1803                     "Failed to delete key with seqno:" +
1804                     std::to_string(v->getBySeqno()) + "' from bucket " +
1805                     std::to_string(hbl.getBucketNum()));
1806         }
1807
1808         /**
1809          * Deleted items are to be added to the bloomfilter,
1810          * in either eviction policy.
1811          */
1812         addToFilter(queuedItem.getKey());
1813     }
1814
1815     if (deleted) {
1816         ++stats.totalPersisted;
1817         ++opsDelete;
1818     }
1819     doStatsForFlushing(queuedItem, queuedItem.size());
1820     --stats.diskQueueSize;
1821     decrMetaDataDisk(queuedItem);
1822 }
1823
1824 bool VBucket::deleteKey(const DocKey& key) {
1825     auto hbl = ht.getLockedBucket(key);
1826     StoredValue* v = ht.unlocked_find(
1827             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1828     if (!v) {
1829         return false;
1830     }
1831     return deleteStoredValue(hbl, *v);
1832 }
1833
1834 void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
1835                                   uint64_t prevHighSeqno) {
1836     failovers->pruneEntries(rollbackResult.highSeqno);
1837     checkpointManager.clear(*this, rollbackResult.highSeqno);
1838     setPersistedSnapshot(rollbackResult.snapStartSeqno,
1839                          rollbackResult.snapEndSeqno);
1840     incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
1841     setBackfillPhase(false);
1842 }
1843
1844 void VBucket::dump() const {
1845     std::cerr << "VBucket[" << this << "] with state: " << toString(getState())
1846               << " numItems:" << getNumItems()
1847               << " numNonResident:" << getNumNonResidentItems() << std::endl;
1848 }
1849
1850 void VBucket::_addStats(bool details, ADD_STAT add_stat, const void* c) {
1851     addStat(NULL, toString(state), add_stat, c);
1852     if (details) {
1853         size_t numItems = getNumItems();
1854         size_t tempItems = getNumTempItems();
1855         addStat("num_items", numItems, add_stat, c);
1856         addStat("num_temp_items", tempItems, add_stat, c);
1857         addStat("num_non_resident", getNumNonResidentItems(), add_stat, c);
1858         addStat("ht_memory", ht.memorySize(), add_stat, c);
1859         addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
1860         addStat("ht_cache_size", ht.cacheSize.load(), add_stat, c);
1861         addStat("num_ejects", ht.getNumEjects(), add_stat, c);
1862         addStat("ops_create", opsCreate.load(), add_stat, c);
1863         addStat("ops_update", opsUpdate.load(), add_stat, c);
1864         addStat("ops_delete", opsDelete.load(), add_stat, c);
1865         addStat("ops_reject", opsReject.load(), add_stat, c);
1866         addStat("queue_size", dirtyQueueSize.load(), add_stat, c);
1867         addStat("backfill_queue_size", getBackfillSize(), add_stat, c);
1868         addStat("queue_memory", dirtyQueueMem.load(), add_stat, c);
1869         addStat("queue_fill", dirtyQueueFill.load(), add_stat, c);
1870         addStat("queue_drain", dirtyQueueDrain.load(), add_stat, c);
1871         addStat("queue_age", getQueueAge(), add_stat, c);
1872         addStat("pending_writes", dirtyQueuePendingWrites.load(), add_stat, c);
1873
1874         addStat("high_seqno", getHighSeqno(), add_stat, c);
1875         addStat("uuid", failovers->getLatestUUID(), add_stat, c);
1876         addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
1877         addStat("bloom_filter", getFilterStatusString().data(),
1878                 add_stat, c);
1879         addStat("bloom_filter_size", getFilterSize(), add_stat, c);
1880         addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
1881         addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
1882         addStat("hp_vb_req_size", getHighPriorityChkSize(), add_stat, c);
1883         hlc.addStats(statPrefix, add_stat, c);
1884     }
1885 }
1886
1887 void VBucket::decrDirtyQueueMem(size_t decrementBy)
1888 {
1889     size_t oldVal, newVal;
1890     do {
1891         oldVal = dirtyQueueMem.load(std::memory_order_relaxed);
1892         if (oldVal < decrementBy) {
1893             newVal = 0;
1894         } else {
1895             newVal = oldVal - decrementBy;
1896         }
1897     } while (!dirtyQueueMem.compare_exchange_strong(oldVal, newVal));
1898 }
1899
1900 void VBucket::decrDirtyQueueAge(uint32_t decrementBy)
1901 {
1902     uint64_t oldVal, newVal;
1903     do {
1904         oldVal = dirtyQueueAge.load(std::memory_order_relaxed);
1905         if (oldVal < decrementBy) {
1906             newVal = 0;
1907         } else {
1908             newVal = oldVal - decrementBy;
1909         }
1910     } while (!dirtyQueueAge.compare_exchange_strong(oldVal, newVal));
1911 }
1912
1913 void VBucket::decrDirtyQueuePendingWrites(size_t decrementBy)
1914 {
1915     size_t oldVal, newVal;
1916     do {
1917         oldVal = dirtyQueuePendingWrites.load(std::memory_order_relaxed);
1918         if (oldVal < decrementBy) {
1919             newVal = 0;
1920         } else {
1921             newVal = oldVal - decrementBy;
1922         }
1923     } while (!dirtyQueuePendingWrites.compare_exchange_strong(oldVal, newVal));
1924 }
1925
1926 std::pair<MutationStatus, VBNotifyCtx> VBucket::processSet(
1927         const HashTable::HashBucketLock& hbl,
1928         StoredValue*& v,
1929         Item& itm,
1930         uint64_t cas,
1931         bool allowExisting,
1932         bool hasMetaData,
1933         const VBQueueItemCtx* queueItmCtx,
1934         bool maybeKeyExists,
1935         bool isReplication) {
1936     if (!hbl.getHTLock()) {
1937         throw std::invalid_argument(
1938                 "VBucket::processSet: htLock not held for "
1939                 "VBucket " +
1940                 std::to_string(getId()));
1941     }
1942
1943     if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
1944         return {MutationStatus::NoMem, VBNotifyCtx()};
1945     }
1946
1947     if (cas && eviction == FULL_EVICTION && maybeKeyExists) {
1948         if (!v || v->isTempInitialItem()) {
1949             return {MutationStatus::NeedBgFetch, VBNotifyCtx()};
1950         }
1951     }
1952
1953     /*
1954      * prior to checking for the lock, we should check if this object
1955      * has expired. If so, then check if CAS value has been provided
1956      * for this set op. In this case the operation should be denied since
1957      * a cas operation for a key that doesn't exist is not a very cool
1958      * thing to do. See MB 3252
1959      */
1960     if (v && v->isExpired(ep_real_time()) && !hasMetaData) {
1961         if (v->isLocked(ep_current_time())) {
1962             v->unlock();
1963         }
1964         if (cas) {
1965             /* item has expired and cas value provided. Deny ! */
1966             return {MutationStatus::NotFound, VBNotifyCtx()};
1967         }
1968     }
1969
1970     if (v) {
1971         if (!allowExisting && !v->isTempItem()) {
1972             return {MutationStatus::InvalidCas, VBNotifyCtx()};
1973         }
1974         if (v->isLocked(ep_current_time())) {
1975             /*
1976              * item is locked, deny if there is cas value mismatch
1977              * or no cas value is provided by the user
1978              */
1979             if (cas != v->getCas()) {
1980                 return {MutationStatus::IsLocked, VBNotifyCtx()};
1981             }
1982             /* allow operation*/
1983             v->unlock();
1984         } else if (cas && cas != v->getCas()) {
1985             if (v->isTempDeletedItem() || v->isTempNonExistentItem() ||
1986                 v->isDeleted()) {
1987                 return {MutationStatus::NotFound, VBNotifyCtx()};
1988             }
1989             return {MutationStatus::InvalidCas, VBNotifyCtx()};
1990         }
1991         if (!hasMetaData) {
1992             itm.setRevSeqno(v->getRevSeqno() + 1);
1993             /* MB-23530: We must ensure that a replace operation (i.e.
1994              * set with a CAS) /fails/ if the old document is deleted; it
1995              * logically "doesn't exist". However, if the new value is deleted
1996              * this op is a /delete/ with a CAS and we must permit a
1997              * deleted -> deleted transition for Deleted Bodies.
1998              */
1999             if (cas && (v->isDeleted() || v->isTempDeletedItem()) &&
2000                 !itm.isDeleted()) {
2001                 return {MutationStatus::NotFound, VBNotifyCtx()};
2002             }
2003         }
2004
2005         MutationStatus status;
2006         VBNotifyCtx notifyCtx;
2007         std::tie(v, status, notifyCtx) =
2008                 updateStoredValue(hbl, *v, itm, queueItmCtx);
2009         return {status, notifyCtx};
2010     } else if (cas != 0) {
2011         return {MutationStatus::NotFound, VBNotifyCtx()};
2012     } else {
2013         VBNotifyCtx notifyCtx;
2014         std::tie(v, notifyCtx) = addNewStoredValue(hbl, itm, queueItmCtx);
2015         if (!hasMetaData) {
2016             updateRevSeqNoOfNewStoredValue(*v);
2017             itm.setRevSeqno(v->getRevSeqno());
2018         }
2019         return {MutationStatus::WasClean, notifyCtx};
2020     }
2021 }
2022
2023 std::pair<AddStatus, VBNotifyCtx> VBucket::processAdd(
2024         const HashTable::HashBucketLock& hbl,
2025         StoredValue*& v,
2026         Item& itm,
2027         bool maybeKeyExists,
2028         bool isReplication,
2029         const VBQueueItemCtx* queueItmCtx) {
2030     if (!hbl.getHTLock()) {
2031         throw std::invalid_argument(
2032                 "VBucket::processAdd: htLock not held for "
2033                 "VBucket " +
2034                 std::to_string(getId()));
2035     }
2036
2037     if (v && !v->isDeleted() && !v->isExpired(ep_real_time()) &&
2038         !v->isTempItem()) {
2039         return {AddStatus::Exists, VBNotifyCtx()};
2040     }
2041     if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
2042         return {AddStatus::NoMem, VBNotifyCtx()};
2043     }
2044
2045     std::pair<AddStatus, VBNotifyCtx> rv = {AddStatus::Success, VBNotifyCtx()};
2046
2047     if (v) {
2048         if (v->isTempInitialItem() && eviction == FULL_EVICTION &&
2049             maybeKeyExists) {
2050             // Need to figure out if an item exists on disk
2051             return {AddStatus::BgFetch, VBNotifyCtx()};
2052         }
2053
2054         rv.first = (v->isDeleted() || v->isExpired(ep_real_time()))
2055                            ? AddStatus::UnDel
2056                            : AddStatus::Success;
2057
2058         if (v->isTempDeletedItem()) {
2059             itm.setRevSeqno(v->getRevSeqno() + 1);
2060         } else {
2061             itm.setRevSeqno(ht.getMaxDeletedRevSeqno() + 1);
2062         }
2063
2064         if (!v->isTempItem()) {
2065             itm.setRevSeqno(v->getRevSeqno() + 1);
2066         }
2067
2068         std::tie(v, std::ignore, rv.second) =
2069                 updateStoredValue(hbl, *v, itm, queueItmCtx);
2070     } else {
2071         if (itm.getBySeqno() != StoredValue::state_temp_init) {
2072             if (eviction == FULL_EVICTION && maybeKeyExists) {
2073                 return {AddStatus::AddTmpAndBgFetch, VBNotifyCtx()};
2074             }
2075         }
2076         std::tie(v, rv.second) = addNewStoredValue(hbl, itm, queueItmCtx);
2077         updateRevSeqNoOfNewStoredValue(*v);
2078         itm.setRevSeqno(v->getRevSeqno());
2079         if (v->isTempItem()) {
2080             rv.first = AddStatus::BgFetch;
2081         }
2082     }
2083
2084     if (v->isTempItem()) {
2085         v->setNRUValue(MAX_NRU_VALUE);
2086     }
2087     return rv;
2088 }
2089
2090 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2091 VBucket::processSoftDelete(const HashTable::HashBucketLock& hbl,
2092                            StoredValue& v,
2093                            uint64_t cas,
2094                            const ItemMetaData& metadata,
2095                            const VBQueueItemCtx& queueItmCtx,
2096                            bool use_meta,
2097                            uint64_t bySeqno) {
2098     if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2099         return std::make_tuple(MutationStatus::NeedBgFetch, &v, VBNotifyCtx());
2100     }
2101
2102     if (v.isLocked(ep_current_time())) {
2103         if (cas != v.getCas()) {
2104             return std::make_tuple(MutationStatus::IsLocked, &v, VBNotifyCtx());
2105         }
2106         v.unlock();
2107     }
2108
2109     if (cas != 0 && cas != v.getCas()) {
2110         return std::make_tuple(MutationStatus::InvalidCas, &v, VBNotifyCtx());
2111     }
2112
2113     /* allow operation */
2114     v.unlock();
2115
2116     MutationStatus rv =
2117             v.isDirty() ? MutationStatus::WasDirty : MutationStatus::WasClean;
2118
2119     if (use_meta) {
2120         v.setCas(metadata.cas);
2121         v.setFlags(metadata.flags);
2122         v.setExptime(metadata.exptime);
2123     }
2124
2125     v.setRevSeqno(metadata.revSeqno);
2126     VBNotifyCtx notifyCtx;
2127     StoredValue* newSv;
2128     std::tie(newSv, notifyCtx) =
2129             softDeleteStoredValue(hbl,
2130                                   v,
2131                                   /*onlyMarkDeleted*/ false,
2132                                   queueItmCtx,
2133                                   bySeqno);
2134     ht.updateMaxDeletedRevSeqno(metadata.revSeqno);
2135     return std::make_tuple(rv, newSv, notifyCtx);
2136 }
2137
2138 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2139 VBucket::processExpiredItem(const HashTable::HashBucketLock& hbl,
2140                             StoredValue& v) {
2141     if (!hbl.getHTLock()) {
2142         throw std::invalid_argument(
2143                 "VBucket::processExpiredItem: htLock not held for VBucket " +
2144                 std::to_string(getId()));
2145     }
2146
2147     if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2148         return std::make_tuple(MutationStatus::NeedBgFetch,
2149                                &v,
2150                                queueDirty(v,
2151                                           GenerateBySeqno::Yes,
2152                                           GenerateCas::Yes,
2153                                           /*isBackfillItem*/ false));
2154     }
2155
2156     /* If the datatype is XATTR, mark the item as deleted
2157      * but don't delete the value as system xattrs can
2158      * still be queried by mobile clients even after
2159      * deletion.
2160      * TODO: The current implementation is inefficient
2161      * but functionally correct and for performance reasons
2162      * only the system xattrs need to be stored.
2163      */
2164     value_t value = v.getValue();
2165     bool onlyMarkDeleted =
2166             value && mcbp::datatype::is_xattr(value->getDataType());
2167     v.setRevSeqno(v.getRevSeqno() + 1);
2168     VBNotifyCtx notifyCtx;
2169     StoredValue* newSv;
2170     std::tie(newSv, notifyCtx) =
2171             softDeleteStoredValue(hbl,
2172                                   v,
2173                                   onlyMarkDeleted,
2174                                   VBQueueItemCtx(GenerateBySeqno::Yes,
2175                                                  GenerateCas::Yes,
2176                                                  TrackCasDrift::No,
2177                                                  /*isBackfillItem*/ false,
2178                                                  nullptr /* no pre link */),
2179                                   v.getBySeqno());
2180     ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno() + 1);
2181     return std::make_tuple(MutationStatus::NotFound, newSv, notifyCtx);
2182 }
2183
2184 bool VBucket::deleteStoredValue(const HashTable::HashBucketLock& hbl,
2185                                 StoredValue& v) {
2186     if (!v.isDeleted() && v.isLocked(ep_current_time())) {
2187         return false;
2188     }
2189
2190     /* StoredValue deleted here. If any other in-memory data structures are
2191        using the StoredValue intrusively then they must have handled the delete
2192        by this point */
2193     ht.unlocked_del(hbl, v.getKey());
2194     return true;
2195 }
2196
2197 AddStatus VBucket::addTempStoredValue(const HashTable::HashBucketLock& hbl,
2198                                       const DocKey& key,
2199                                       bool isReplication) {
2200     uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_RAW_BYTES};
2201     static_assert(sizeof(ext_meta) == 1,
2202                   "VBucket::addTempStoredValue(): expected "
2203                   "EXT_META_LEN to be 1");
2204     Item itm(key,
2205              /*flags*/ 0,
2206              /*exp*/ 0,
2207              /*data*/ NULL,
2208              /*size*/ 0,
2209              ext_meta,
2210              sizeof(ext_meta),
2211              0,
2212              StoredValue::state_temp_init);
2213
2214     /* if a temp item for a possibly deleted, set it non-resident by resetting
2215        the value cuz normally a new item added is considered resident which
2216        does not apply for temp item. */
2217     StoredValue* v = nullptr;
2218     return processAdd(hbl, v, itm, true, isReplication, nullptr).first;
2219 }
2220
2221 void VBucket::notifyNewSeqno(const VBNotifyCtx& notifyCtx) {
2222     if (newSeqnoCb) {
2223         newSeqnoCb->callback(getId(), notifyCtx);
2224     }
2225 }
2226
2227 /*
2228  * Queue the item to the checkpoint and return the seqno the item was
2229  * allocated.
2230  */
2231 int64_t VBucket::queueItem(Item* item, OptionalSeqno seqno) {
2232     item->setVBucketId(id);
2233     queued_item qi(item);
2234     checkpointManager.queueDirty(
2235             *this,
2236             qi,
2237             seqno ? GenerateBySeqno::No : GenerateBySeqno::Yes,
2238             GenerateCas::Yes,
2239             nullptr /* No pre link step as this is for system events */);
2240     VBNotifyCtx notifyCtx;
2241     // If the seqno is initialized, skip replication notification
2242     notifyCtx.notifyReplication = !seqno.is_initialized();
2243     notifyCtx.notifyFlusher = true;
2244     notifyCtx.bySeqno = qi->getBySeqno();
2245     notifyNewSeqno(notifyCtx);
2246     return qi->getBySeqno();
2247 }
2248
2249 VBNotifyCtx VBucket::queueDirty(StoredValue& v,
2250                                 const VBQueueItemCtx& queueItmCtx) {
2251     if (queueItmCtx.trackCasDrift == TrackCasDrift::Yes) {
2252         setMaxCasAndTrackDrift(v.getCas());
2253     }
2254     return queueDirty(v,
2255                       queueItmCtx.genBySeqno,
2256                       queueItmCtx.genCas,
2257                       queueItmCtx.isBackfillItem,
2258                       queueItmCtx.preLinkDocumentContext);
2259 }
2260
2261 void VBucket::updateRevSeqNoOfNewStoredValue(StoredValue& v) {
2262     /**
2263      * Possibly, this item is being recreated. Conservatively assign it
2264      * a seqno that is greater than the greatest seqno of all deleted
2265      * items seen so far.
2266      */
2267     uint64_t seqno = ht.getMaxDeletedRevSeqno();
2268     if (!v.isTempItem()) {
2269         ++seqno;
2270     }
2271     v.setRevSeqno(seqno);
2272 }
2273
2274 void VBucket::addHighPriorityVBEntry(uint64_t seqnoOrChkId,
2275                                      const void* cookie,
2276                                      HighPriorityVBNotify reqType) {
2277     std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2278     hpVBReqs.push_back(HighPriorityVBEntry(cookie, seqnoOrChkId, reqType));
2279     numHpVBReqs.store(hpVBReqs.size());
2280
2281     LOG(EXTENSION_LOG_NOTICE,
2282         "Added high priority async request %s "
2283         "for vb:%" PRIu16 ", Check for:%" PRIu64 ", "
2284         "Persisted upto:%" PRIu64 ", cookie:%p",
2285         to_string(reqType).c_str(),
2286         getId(),
2287         seqnoOrChkId,
2288         getPersistenceSeqno(),
2289         cookie);
2290 }
2291
2292 std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifications(
2293         EventuallyPersistentEngine& engine,
2294         uint64_t idNum,
2295         HighPriorityVBNotify notifyType) {
2296     std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2297     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2298
2299     auto entry = hpVBReqs.begin();
2300
2301     while (entry != hpVBReqs.end()) {
2302         if (notifyType != entry->reqType) {
2303             ++entry;
2304             continue;
2305         }
2306
2307         std::string logStr(to_string(notifyType));
2308
2309         hrtime_t wall_time(gethrtime() - entry->start);
2310         size_t spent = wall_time / 1000000000;
2311         if (entry->id <= idNum) {
2312             toNotify[entry->cookie] = ENGINE_SUCCESS;
2313             stats.chkPersistenceHisto.add(wall_time / 1000);
2314             adjustCheckpointFlushTimeout(wall_time / 1000000000);
2315             LOG(EXTENSION_LOG_NOTICE,
2316                 "Notified the completion of %s "
2317                 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2318                 ", "
2319                 "Persisted upto: %" PRIu64 ", cookie %p",
2320                 logStr.c_str(),
2321                 getId(),
2322                 entry->id,
2323                 idNum,
2324                 entry->cookie);
2325             entry = hpVBReqs.erase(entry);
2326         } else if (spent > getCheckpointFlushTimeout()) {
2327             adjustCheckpointFlushTimeout(spent);
2328             engine.storeEngineSpecific(entry->cookie, NULL);
2329             toNotify[entry->cookie] = ENGINE_TMPFAIL;
2330             LOG(EXTENSION_LOG_WARNING,
2331                 "Notified the timeout on %s "
2332                 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2333                 ", "
2334                 "Persisted upto: %" PRIu64 ", cookie %p",
2335                 logStr.c_str(),
2336                 getId(),
2337                 entry->id,
2338                 idNum,
2339                 entry->cookie);
2340             entry = hpVBReqs.erase(entry);
2341         } else {
2342             ++entry;
2343         }
2344     }
2345     numHpVBReqs.store(hpVBReqs.size());
2346     return toNotify;
2347 }
2348
2349 std::map<const void*, ENGINE_ERROR_CODE> VBucket::tmpFailAndGetAllHpNotifies(
2350         EventuallyPersistentEngine& engine) {
2351     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2352
2353     LockHolder lh(hpVBReqsMutex);
2354
2355     for (auto& entry : hpVBReqs) {
2356         toNotify[entry.cookie] = ENGINE_TMPFAIL;
2357         engine.storeEngineSpecific(entry.cookie, NULL);
2358     }
2359     hpVBReqs.clear();
2360
2361     return toNotify;
2362 }
2363
2364 void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
2365     size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
2366
2367     if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
2368         chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
2369     } else if (wall_time <= middle) {
2370         chkFlushTimeout = middle;
2371     } else {
2372         chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
2373     }
2374 }
2375
2376 size_t VBucket::getCheckpointFlushTimeout() {
2377     return chkFlushTimeout;
2378 }