MB-23906: Implement delete-with-value with store() instead of delete()
[ep-engine.git] / src / vbucket.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <functional>
21 #include <list>
22 #include <set>
23 #include <string>
24 #include <vector>
25
26 #include "atomic.h"
27 #include "bgfetcher.h"
28 #include "conflict_resolution.h"
29 #include "ep_engine.h"
30 #include "ep_types.h"
31 #include "failover-table.h"
32 #include "flusher.h"
33 #include "pre_link_document_context.h"
34
35 #define STATWRITER_NAMESPACE vbucket
36 #include "statwriter.h"
37 #undef STATWRITER_NAMESPACE
38 #include "stored_value_factories.h"
39
40 #include "vbucket.h"
41
42 /* Macros */
43 const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
44 const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
45
46 /* Statics definitions */
47 std::atomic<size_t> VBucket::chkFlushTimeout(MIN_CHK_FLUSH_TIMEOUT);
48
49 VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
50     std::vector<uint16_t> tmp(acceptable.size() + other.size());
51     std::vector<uint16_t>::iterator end;
52     end = std::set_symmetric_difference(acceptable.begin(),
53                                         acceptable.end(),
54                                         other.acceptable.begin(),
55                                         other.acceptable.end(),
56                                         tmp.begin());
57     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
58 }
59
60 VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
61                                                                         const {
62     std::vector<uint16_t> tmp(acceptable.size() + other.size());
63     std::vector<uint16_t>::iterator end;
64
65     end = std::set_intersection(acceptable.begin(), acceptable.end(),
66                                 other.acceptable.begin(),
67                                 other.acceptable.end(),
68                                 tmp.begin());
69     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
70 }
71
72 static bool isRange(std::set<uint16_t>::const_iterator it,
73                     const std::set<uint16_t>::const_iterator &end,
74                     size_t &length)
75 {
76     length = 0;
77     for (uint16_t val = *it;
78          it != end && (val + length) == *it;
79          ++it, ++length) {
80         // empty
81     }
82
83     --length;
84
85     return length > 1;
86 }
87
88 std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
89 {
90     std::set<uint16_t>::const_iterator it;
91
92     if (filter.acceptable.empty()) {
93         out << "{ empty }";
94     } else {
95         bool needcomma = false;
96         out << "{ ";
97         for (it = filter.acceptable.begin();
98              it != filter.acceptable.end();
99              ++it) {
100             if (needcomma) {
101                 out << ", ";
102             }
103
104             size_t length;
105             if (isRange(it, filter.acceptable.end(), length)) {
106                 std::set<uint16_t>::iterator last = it;
107                 for (size_t i = 0; i < length; ++i) {
108                     ++last;
109                 }
110                 out << "[" << *it << "," << *last << "]";
111                 it = last;
112             } else {
113                 out << *it;
114             }
115             needcomma = true;
116         }
117         out << " }";
118     }
119
120     return out;
121 }
122
123 const vbucket_state_t VBucket::ACTIVE =
124                      static_cast<vbucket_state_t>(htonl(vbucket_state_active));
125 const vbucket_state_t VBucket::REPLICA =
126                     static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
127 const vbucket_state_t VBucket::PENDING =
128                     static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
129 const vbucket_state_t VBucket::DEAD =
130                     static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
131
132 VBucket::VBucket(id_type i,
133                  vbucket_state_t newState,
134                  EPStats& st,
135                  CheckpointConfig& chkConfig,
136                  int64_t lastSeqno,
137                  uint64_t lastSnapStart,
138                  uint64_t lastSnapEnd,
139                  std::unique_ptr<FailoverTable> table,
140                  std::shared_ptr<Callback<id_type> > flusherCb,
141                  std::unique_ptr<AbstractStoredValueFactory> valFact,
142                  NewSeqnoCallback newSeqnoCb,
143                  Configuration& config,
144                  item_eviction_policy_t evictionPolicy,
145                  vbucket_state_t initState,
146                  uint64_t purgeSeqno,
147                  uint64_t maxCas,
148                  const std::string& collectionsManifest)
149     : ht(st, std::move(valFact)),
150       checkpointManager(st,
151                         i,
152                         chkConfig,
153                         lastSeqno,
154                         lastSnapStart,
155                         lastSnapEnd,
156                         flusherCb),
157       failovers(std::move(table)),
158       opsCreate(0),
159       opsUpdate(0),
160       opsDelete(0),
161       opsReject(0),
162       dirtyQueueSize(0),
163       dirtyQueueMem(0),
164       dirtyQueueFill(0),
165       dirtyQueueDrain(0),
166       dirtyQueueAge(0),
167       dirtyQueuePendingWrites(0),
168       metaDataDisk(0),
169       numExpiredItems(0),
170       eviction(evictionPolicy),
171       stats(st),
172       persistenceSeqno(0),
173       numHpVBReqs(0),
174       id(i),
175       state(newState),
176       initialState(initState),
177       purge_seqno(purgeSeqno),
178       takeover_backed_up(false),
179       persisted_snapshot_start(lastSnapStart),
180       persisted_snapshot_end(lastSnapEnd),
181       rollbackItemCount(0),
182       hlc(maxCas,
183           std::chrono::microseconds(config.getHlcDriftAheadThresholdUs()),
184           std::chrono::microseconds(config.getHlcDriftBehindThresholdUs())),
185       statPrefix("vb_" + std::to_string(i)),
186       persistenceCheckpointId(0),
187       bucketCreation(false),
188       bucketDeletion(false),
189       newSeqnoCb(std::move(newSeqnoCb)),
190       manifest(collectionsManifest) {
191     if (config.getConflictResolutionType().compare("lww") == 0) {
192         conflictResolver.reset(new LastWriteWinsResolution());
193     } else {
194         conflictResolver.reset(new RevisionSeqnoResolution());
195     }
196
197     backfill.isBackfillPhase = false;
198     pendingOpsStart = 0;
199     stats.memOverhead->fetch_add(sizeof(VBucket)
200                                 + ht.memorySize() + sizeof(CheckpointManager));
201     LOG(EXTENSION_LOG_NOTICE,
202         "VBucket: created vbucket:%" PRIu16 " with state:%s "
203                 "initialState:%s "
204                 "lastSeqno:%" PRIu64 " "
205                 "lastSnapshot:{%" PRIu64 ",%" PRIu64 "} "
206                 "persisted_snapshot:{%" PRIu64 ",%" PRIu64 "} "
207                 "max_cas:%" PRIu64,
208         id, VBucket::toString(state), VBucket::toString(initialState),
209         lastSeqno, lastSnapStart, lastSnapEnd,
210         persisted_snapshot_start, persisted_snapshot_end,
211         getMaxCas());
212 }
213
214 VBucket::~VBucket() {
215     if (!pendingOps.empty()) {
216         LOG(EXTENSION_LOG_WARNING,
217             "~Vbucket(): vbucket:%" PRIu16 " has %ld pending ops",
218             id,
219             pendingOps.size());
220     }
221
222     stats.diskQueueSize.fetch_sub(dirtyQueueSize.load());
223     stats.vbBackfillQueueSize.fetch_sub(getBackfillSize());
224
225     // Clear out the bloomfilter(s)
226     clearFilter();
227
228     stats.memOverhead->fetch_sub(sizeof(VBucket) + ht.memorySize() +
229                                 sizeof(CheckpointManager));
230
231     LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
232 }
233
234 void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
235                          ENGINE_ERROR_CODE code) {
236     std::unique_lock<std::mutex> lh(pendingOpLock);
237
238     if (pendingOpsStart > 0) {
239         hrtime_t now = gethrtime();
240         if (now > pendingOpsStart) {
241             hrtime_t d = (now - pendingOpsStart) / 1000;
242             stats.pendingOpsHisto.add(d);
243             atomic_setIfBigger(stats.pendingOpsMaxDuration, d);
244         }
245     } else {
246         return;
247     }
248
249     pendingOpsStart = 0;
250     stats.pendingOps.fetch_sub(pendingOps.size());
251     atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
252
253     while (!pendingOps.empty()) {
254         const void *pendingOperation = pendingOps.back();
255         pendingOps.pop_back();
256         // We don't want to hold the pendingOpLock when
257         // calling notifyIOComplete.
258         lh.unlock();
259         engine.notifyIOComplete(pendingOperation, code);
260         lh.lock();
261     }
262
263     LOG(EXTENSION_LOG_INFO,
264         "Fired pendings ops for vbucket %" PRIu16 " in state %s\n",
265         id, VBucket::toString(state));
266 }
267
268 void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
269
270     if (state == vbucket_state_active) {
271         fireAllOps(engine, ENGINE_SUCCESS);
272     } else if (state == vbucket_state_pending) {
273         // Nothing
274     } else {
275         fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
276     }
277 }
278
279 void VBucket::setState(vbucket_state_t to) {
280     vbucket_state_t oldstate;
281     {
282         WriterLockHolder wlh(stateLock);
283         oldstate = state;
284
285         if (to == vbucket_state_active &&
286             checkpointManager.getOpenCheckpointId() < 2) {
287             checkpointManager.setOpenCheckpointId(2);
288         }
289
290         LOG(EXTENSION_LOG_NOTICE,
291             "VBucket::setState: transitioning vbucket:%" PRIu16 " from:%s to:%s",
292             id, VBucket::toString(oldstate), VBucket::toString(to));
293
294         state = to;
295     }
296 }
297
298 vbucket_state VBucket::getVBucketState() const {
299      auto persisted_range = getPersistedSnapshot();
300
301      return vbucket_state{getState(),
302                           getPersistenceCheckpointId(), 0, getHighSeqno(),
303                           getPurgeSeqno(),
304                           persisted_range.start, persisted_range.end,
305                           getMaxCas(), failovers->toJSON()};
306 }
307
308
309
310 void VBucket::doStatsForQueueing(const Item& qi, size_t itemBytes)
311 {
312     ++dirtyQueueSize;
313     dirtyQueueMem.fetch_add(sizeof(Item));
314     ++dirtyQueueFill;
315     dirtyQueueAge.fetch_add(qi.getQueuedTime());
316     dirtyQueuePendingWrites.fetch_add(itemBytes);
317 }
318
319 void VBucket::doStatsForFlushing(const Item& qi, size_t itemBytes) {
320     --dirtyQueueSize;
321     decrDirtyQueueMem(sizeof(Item));
322     ++dirtyQueueDrain;
323     decrDirtyQueueAge(qi.getQueuedTime());
324     decrDirtyQueuePendingWrites(itemBytes);
325 }
326
327 void VBucket::incrMetaDataDisk(const Item& qi) {
328     metaDataDisk.fetch_add(qi.getKey().size() + sizeof(ItemMetaData));
329 }
330
331 void VBucket::decrMetaDataDisk(const Item& qi) {
332     // assume couchstore remove approx this much data from disk
333     metaDataDisk.fetch_sub((qi.getKey().size() + sizeof(ItemMetaData)));
334 }
335
336 void VBucket::resetStats() {
337     opsCreate.store(0);
338     opsUpdate.store(0);
339     opsDelete.store(0);
340     opsReject.store(0);
341
342     stats.diskQueueSize.fetch_sub(dirtyQueueSize.exchange(0));
343     dirtyQueueMem.store(0);
344     dirtyQueueFill.store(0);
345     dirtyQueueAge.store(0);
346     dirtyQueuePendingWrites.store(0);
347     dirtyQueueDrain.store(0);
348
349     hlc.resetStats();
350 }
351
352 template <typename T>
353 void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
354                       const void *c) {
355     std::string stat = statPrefix;
356     if (nm != NULL) {
357         add_prefixed_stat(statPrefix, nm, val, add_stat, c);
358     } else {
359         add_casted_stat(statPrefix.data(), val, add_stat, c);
360     }
361 }
362
363 void VBucket::handlePreExpiry(StoredValue& v) {
364     value_t value = v.getValue();
365     if (value) {
366         std::unique_ptr<Item> itm(v.toItem(false, id));
367         item_info itm_info;
368         EventuallyPersistentEngine* engine = ObjectRegistry::getCurrentEngine();
369         itm_info = itm->toItemInfo(failovers->getLatestUUID());
370         value_t new_val(Blob::Copy(*value));
371         itm->setValue(new_val);
372
373         SERVER_HANDLE_V1* sapi = engine->getServerApi();
374         /* TODO: In order to minimize allocations, the callback needs to
375          * allocate an item whose value size will be exactly the size of the
376          * value after pre-expiry is performed.
377          */
378         if (sapi->document->pre_expiry(itm_info)) {
379             char* extMeta = const_cast<char *>(v.getValue()->getExtMeta());
380             Item new_item(v.getKey(), v.getFlags(), v.getExptime(),
381                           itm_info.value[0].iov_base, itm_info.value[0].iov_len,
382                           reinterpret_cast<uint8_t*>(extMeta),
383                           v.getValue()->getExtLen(), v.getCas(),
384                           v.getBySeqno(), id, v.getRevSeqno(),
385                           v.getNRUValue());
386
387             new_item.setDeleted();
388             v.setValue(new_item, ht);
389         }
390     }
391 }
392
393 size_t VBucket::getNumNonResidentItems() const {
394     if (eviction == VALUE_ONLY) {
395         return ht.getNumInMemoryNonResItems();
396     } else {
397         size_t num_items = ht.getNumItems();
398         size_t num_res_items = ht.getNumInMemoryItems() -
399                                ht.getNumInMemoryNonResItems();
400         return num_items > num_res_items ? (num_items - num_res_items) : 0;
401     }
402 }
403
404
405 uint64_t VBucket::getPersistenceCheckpointId() const {
406     return persistenceCheckpointId.load();
407 }
408
409 void VBucket::setPersistenceCheckpointId(uint64_t checkpointId) {
410     persistenceCheckpointId.store(checkpointId);
411 }
412
413 void VBucket::markDirty(const DocKey& key) {
414     auto hbl = ht.getLockedBucket(key);
415     StoredValue* v = ht.unlocked_find(
416             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::Yes);
417     if (v) {
418         v->markDirty();
419     } else {
420         LOG(EXTENSION_LOG_WARNING, "markDirty: Error marking dirty, a key "
421             "is missing from vb:%" PRIu16, id);
422     }
423 }
424
425 bool VBucket::isResidentRatioUnderThreshold(float threshold) {
426     if (eviction != FULL_EVICTION) {
427         throw std::invalid_argument("VBucket::isResidentRatioUnderThreshold: "
428                 "policy (which is " + std::to_string(eviction) +
429                 ") must be FULL_EVICTION");
430     }
431     size_t num_items = getNumItems();
432     size_t num_non_resident_items = getNumNonResidentItems();
433     if (threshold >= ((float)(num_items - num_non_resident_items) /
434                                                                 num_items)) {
435         return true;
436     } else {
437         return false;
438     }
439 }
440
441 void VBucket::createFilter(size_t key_count, double probability) {
442     // Create the actual bloom filter upon vbucket creation during
443     // scenarios:
444     //      - Bucket creation
445     //      - Rebalance
446     LockHolder lh(bfMutex);
447     if (bFilter == nullptr && tempFilter == nullptr) {
448         bFilter = std::make_unique<BloomFilter>(key_count, probability,
449                                         BFILTER_ENABLED);
450     } else {
451         LOG(EXTENSION_LOG_WARNING, "(vb %" PRIu16 ") Bloom filter / Temp filter"
452             " already exist!", id);
453     }
454 }
455
456 void VBucket::initTempFilter(size_t key_count, double probability) {
457     // Create a temp bloom filter with status as COMPACTING,
458     // if the main filter is found to exist, set its state to
459     // COMPACTING as well.
460     LockHolder lh(bfMutex);
461     tempFilter = std::make_unique<BloomFilter>(key_count, probability,
462                                      BFILTER_COMPACTING);
463     if (bFilter) {
464         bFilter->setStatus(BFILTER_COMPACTING);
465     }
466 }
467
468 void VBucket::addToFilter(const DocKey& key) {
469     LockHolder lh(bfMutex);
470     if (bFilter) {
471         bFilter->addKey(key);
472     }
473
474     // If the temp bloom filter is not found to be NULL,
475     // it means that compaction is running on the particular
476     // vbucket. Therefore add the key to the temp filter as
477     // well, as once compaction completes the temp filter
478     // will replace the main bloom filter.
479     if (tempFilter) {
480         tempFilter->addKey(key);
481     }
482 }
483
484 bool VBucket::maybeKeyExistsInFilter(const DocKey& key) {
485     LockHolder lh(bfMutex);
486     if (bFilter) {
487         return bFilter->maybeKeyExists(key);
488     } else {
489         // If filter doesn't exist, allow the BgFetch to go through.
490         return true;
491     }
492 }
493
494 bool VBucket::isTempFilterAvailable() {
495     LockHolder lh(bfMutex);
496     if (tempFilter &&
497         (tempFilter->getStatus() == BFILTER_COMPACTING ||
498          tempFilter->getStatus() == BFILTER_ENABLED)) {
499         return true;
500     } else {
501         return false;
502     }
503 }
504
505 void VBucket::addToTempFilter(const DocKey& key) {
506     // Keys will be added to only the temp filter during
507     // compaction.
508     LockHolder lh(bfMutex);
509     if (tempFilter) {
510         tempFilter->addKey(key);
511     }
512 }
513
514 void VBucket::swapFilter() {
515     // Delete the main bloom filter and replace it with
516     // the temp filter that was populated during compaction,
517     // only if the temp filter's state is found to be either at
518     // COMPACTING or ENABLED (if in the case the user enables
519     // bloomfilters for some reason while compaction was running).
520     // Otherwise, it indicates that the filter's state was
521     // possibly disabled during compaction, therefore clear out
522     // the temp filter. If it gets enabled at some point, a new
523     // bloom filter will be made available after the next
524     // compaction.
525
526     LockHolder lh(bfMutex);
527     if (tempFilter) {
528         bFilter.reset();
529
530         if (tempFilter->getStatus() == BFILTER_COMPACTING ||
531              tempFilter->getStatus() == BFILTER_ENABLED) {
532             bFilter = std::move(tempFilter);
533             bFilter->setStatus(BFILTER_ENABLED);
534         }
535         tempFilter.reset();
536     }
537 }
538
539 void VBucket::clearFilter() {
540     LockHolder lh(bfMutex);
541     bFilter.reset();
542     tempFilter.reset();
543 }
544
545 void VBucket::setFilterStatus(bfilter_status_t to) {
546     LockHolder lh(bfMutex);
547     if (bFilter) {
548         bFilter->setStatus(to);
549     }
550     if (tempFilter) {
551         tempFilter->setStatus(to);
552     }
553 }
554
555 std::string VBucket::getFilterStatusString() {
556     LockHolder lh(bfMutex);
557     if (bFilter) {
558         return bFilter->getStatusString();
559     } else if (tempFilter) {
560         return tempFilter->getStatusString();
561     } else {
562         return "DOESN'T EXIST";
563     }
564 }
565
566 size_t VBucket::getFilterSize() {
567     LockHolder lh(bfMutex);
568     if (bFilter) {
569         return bFilter->getFilterSize();
570     } else {
571         return 0;
572     }
573 }
574
575 size_t VBucket::getNumOfKeysInFilter() {
576     LockHolder lh(bfMutex);
577     if (bFilter) {
578         return bFilter->getNumOfKeysInFilter();
579     } else {
580         return 0;
581     }
582 }
583
584 VBNotifyCtx VBucket::queueDirty(
585         StoredValue& v,
586         const GenerateBySeqno generateBySeqno,
587         const GenerateCas generateCas,
588         const bool isBackfillItem,
589         PreLinkDocumentContext* preLinkDocumentContext) {
590     VBNotifyCtx notifyCtx;
591
592     queued_item qi(v.toItem(false, getId()));
593
594     if (isBackfillItem) {
595         queueBackfillItem(qi, generateBySeqno);
596         notifyCtx.notifyFlusher = true;
597         /* During backfill on a TAP receiver we need to update the snapshot
598          range in the checkpoint. Has to be done here because in case of TAP
599          backfill, above, we use vb.queueBackfillItem() instead of
600          vb.checkpointManager.queueDirty() */
601         if (generateBySeqno == GenerateBySeqno::Yes) {
602             checkpointManager.resetSnapshotRange();
603         }
604     } else {
605         notifyCtx.notifyFlusher =
606                 checkpointManager.queueDirty(*this,
607                                              qi,
608                                              generateBySeqno,
609                                              generateCas,
610                                              preLinkDocumentContext);
611         notifyCtx.notifyReplication = true;
612         if (GenerateCas::Yes == generateCas) {
613             v.setCas(qi->getCas());
614         }
615     }
616
617     v.setBySeqno(qi->getBySeqno());
618     notifyCtx.bySeqno = qi->getBySeqno();
619
620     return notifyCtx;
621 }
622
623 StoredValue* VBucket::fetchValidValue(HashTable::HashBucketLock& hbl,
624                                       const DocKey& key,
625                                       WantsDeleted wantsDeleted,
626                                       TrackReference trackReference,
627                                       QueueExpired queueExpired) {
628     if (!hbl.getHTLock()) {
629         throw std::logic_error(
630                 "Hash bucket lock not held in "
631                 "VBucket::fetchValidValue() for hash bucket: " +
632                 std::to_string(hbl.getBucketNum()) + "for key: " +
633                 std::string(reinterpret_cast<const char*>(key.data()),
634                             key.size()));
635     }
636     StoredValue* v = ht.unlocked_find(
637             key, hbl.getBucketNum(), wantsDeleted, trackReference);
638     if (v && !v->isDeleted() && !v->isTempItem()) {
639         // In the deleted case, we ignore expiration time.
640         if (v->isExpired(ep_real_time())) {
641             if (getState() != vbucket_state_active) {
642                 return wantsDeleted == WantsDeleted::Yes ? v : NULL;
643             }
644
645             // queueDirty only allowed on active VB
646             if (queueExpired == QueueExpired::Yes &&
647                 getState() == vbucket_state_active) {
648                 incExpirationStat(ExpireBy::Access);
649                 handlePreExpiry(*v);
650                 VBNotifyCtx notifyCtx;
651                 std::tie(std::ignore, v, notifyCtx) =
652                         processExpiredItem(hbl, *v);
653                 notifyNewSeqno(notifyCtx);
654             }
655             return wantsDeleted == WantsDeleted::Yes ? v : NULL;
656         }
657     }
658     return v;
659 }
660
661 void VBucket::incExpirationStat(const ExpireBy source) {
662     switch (source) {
663     case ExpireBy::Pager:
664         ++stats.expired_pager;
665         break;
666     case ExpireBy::Compactor:
667         ++stats.expired_compactor;
668         break;
669     case ExpireBy::Access:
670         ++stats.expired_access;
671         break;
672     }
673     ++numExpiredItems;
674 }
675
676 MutationStatus VBucket::setFromInternal(Item& itm) {
677     return ht.set(itm);
678 }
679
680 ENGINE_ERROR_CODE VBucket::set(Item& itm,
681                                const void* cookie,
682                                EventuallyPersistentEngine& engine,
683                                const int bgFetchDelay) {
684     bool cas_op = (itm.getCas() != 0);
685     auto hbl = ht.getLockedBucket(itm.getKey());
686     StoredValue* v = ht.unlocked_find(itm.getKey(),
687                                       hbl.getBucketNum(),
688                                       WantsDeleted::Yes,
689                                       TrackReference::No);
690     if (v && v->isLocked(ep_current_time()) &&
691         (getState() == vbucket_state_replica ||
692          getState() == vbucket_state_pending)) {
693         v->unlock();
694     }
695
696     bool maybeKeyExists = true;
697     // If we didn't find a valid item, check Bloomfilter's prediction if in
698     // full eviction policy and for a CAS operation.
699     if ((v == nullptr || v->isTempInitialItem()) &&
700         (eviction == FULL_EVICTION) && (itm.getCas() != 0)) {
701         // Check Bloomfilter's prediction
702         if (!maybeKeyExistsInFilter(itm.getKey())) {
703             maybeKeyExists = false;
704         }
705     }
706
707     PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
708     VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
709                                GenerateCas::Yes,
710                                TrackCasDrift::No,
711                                /*isBackfillItem*/ false,
712                                &preLinkDocumentContext);
713
714     MutationStatus status;
715     VBNotifyCtx notifyCtx;
716     std::tie(status, notifyCtx) = processSet(hbl,
717                                              v,
718                                              itm,
719                                              itm.getCas(),
720                                              /*allowExisting*/ true,
721                                              /*hashMetaData*/ false,
722                                              &queueItmCtx,
723                                              maybeKeyExists);
724
725     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
726     switch (status) {
727     case MutationStatus::NoMem:
728         ret = ENGINE_ENOMEM;
729         break;
730     case MutationStatus::InvalidCas:
731         ret = ENGINE_KEY_EEXISTS;
732         break;
733     case MutationStatus::IsLocked:
734         ret = ENGINE_LOCKED;
735         break;
736     case MutationStatus::NotFound:
737         if (cas_op) {
738             ret = ENGINE_KEY_ENOENT;
739             break;
740         }
741     // FALLTHROUGH
742     case MutationStatus::WasDirty:
743     // Even if the item was dirty, push it into the vbucket's open
744     // checkpoint.
745     case MutationStatus::WasClean:
746         notifyNewSeqno(notifyCtx);
747
748         itm.setBySeqno(v->getBySeqno());
749         itm.setCas(v->getCas());
750         break;
751     case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
752         // +
753         // full eviction.
754         if (v) {
755             // temp item is already created. Simply schedule a bg fetch job
756             hbl.getHTLock().unlock();
757             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
758             return ENGINE_EWOULDBLOCK;
759         }
760         ret = addTempItemAndBGFetch(
761                 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
762         break;
763     }
764     }
765
766     return ret;
767 }
768
769 ENGINE_ERROR_CODE VBucket::replace(Item& itm,
770                                    const void* cookie,
771                                    EventuallyPersistentEngine& engine,
772                                    const int bgFetchDelay) {
773     auto hbl = ht.getLockedBucket(itm.getKey());
774     StoredValue* v = ht.unlocked_find(itm.getKey(),
775                                       hbl.getBucketNum(),
776                                       WantsDeleted::Yes,
777                                       TrackReference::No);
778     if (v) {
779         if (v->isDeleted() || v->isTempDeletedItem() ||
780             v->isTempNonExistentItem()) {
781             return ENGINE_KEY_ENOENT;
782         }
783
784         MutationStatus mtype;
785         VBNotifyCtx notifyCtx;
786         if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
787             mtype = MutationStatus::NeedBgFetch;
788         } else {
789             PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
790             VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
791                                        GenerateCas::Yes,
792                                        TrackCasDrift::No,
793                                        /*isBackfillItem*/ false,
794                                        &preLinkDocumentContext);
795             std::tie(mtype, notifyCtx) = processSet(hbl,
796                                                     v,
797                                                     itm,
798                                                     0,
799                                                     /*allowExisting*/ true,
800                                                     /*hasMetaData*/ false,
801                                                     &queueItmCtx);
802         }
803
804         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
805         switch (mtype) {
806         case MutationStatus::NoMem:
807             ret = ENGINE_ENOMEM;
808             break;
809         case MutationStatus::IsLocked:
810             ret = ENGINE_LOCKED;
811             break;
812         case MutationStatus::InvalidCas:
813         case MutationStatus::NotFound:
814             ret = ENGINE_NOT_STORED;
815             break;
816         // FALLTHROUGH
817         case MutationStatus::WasDirty:
818         // Even if the item was dirty, push it into the vbucket's open
819         // checkpoint.
820         case MutationStatus::WasClean:
821             notifyNewSeqno(notifyCtx);
822
823             itm.setBySeqno(v->getBySeqno());
824             itm.setCas(v->getCas());
825             break;
826         case MutationStatus::NeedBgFetch: {
827             // temp item is already created. Simply schedule a bg fetch job
828             hbl.getHTLock().unlock();
829             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
830             ret = ENGINE_EWOULDBLOCK;
831             break;
832         }
833         }
834
835         return ret;
836     } else {
837         if (eviction == VALUE_ONLY) {
838             return ENGINE_KEY_ENOENT;
839         }
840
841         if (maybeKeyExistsInFilter(itm.getKey())) {
842             return addTempItemAndBGFetch(
843                     hbl, itm.getKey(), cookie, engine, bgFetchDelay, false);
844         } else {
845             // As bloomfilter predicted that item surely doesn't exist
846             // on disk, return ENOENT for replace().
847             return ENGINE_KEY_ENOENT;
848         }
849     }
850 }
851
852 ENGINE_ERROR_CODE VBucket::addBackfillItem(Item& itm,
853                                            const GenerateBySeqno genBySeqno) {
854     auto hbl = ht.getLockedBucket(itm.getKey());
855     StoredValue* v = ht.unlocked_find(itm.getKey(),
856                                       hbl.getBucketNum(),
857                                       WantsDeleted::Yes,
858                                       TrackReference::No);
859
860     // Note that this function is only called on replica or pending vbuckets.
861     if (v && v->isLocked(ep_current_time())) {
862         v->unlock();
863     }
864
865     VBQueueItemCtx queueItmCtx(genBySeqno,
866                                GenerateCas::No,
867                                TrackCasDrift::No,
868                                /*isBackfillItem*/ true,
869                                nullptr /* No pre link should happen */);
870     MutationStatus status;
871     VBNotifyCtx notifyCtx;
872     std::tie(status, notifyCtx) = processSet(hbl,
873                                              v,
874                                              itm,
875                                              0,
876                                              /*allowExisting*/ true,
877                                              /*hasMetaData*/ true,
878                                              &queueItmCtx);
879
880     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
881     switch (status) {
882     case MutationStatus::NoMem:
883         ret = ENGINE_ENOMEM;
884         break;
885     case MutationStatus::InvalidCas:
886     case MutationStatus::IsLocked:
887         ret = ENGINE_KEY_EEXISTS;
888         break;
889     case MutationStatus::WasDirty:
890     // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
891     // set correctly, and also the sequence numbers are ordered correctly.
892     // (MB-14003)
893     case MutationStatus::NotFound:
894     // FALLTHROUGH
895     case MutationStatus::WasClean: {
896         setMaxCas(v->getCas());
897         // we unlock ht lock here because we want to avoid potential lock
898         // inversions arising from notifyNewSeqno() call
899         hbl.getHTLock().unlock();
900         notifyNewSeqno(notifyCtx);
901     } break;
902     case MutationStatus::NeedBgFetch:
903         throw std::logic_error(
904                 "VBucket::addBackfillItem: "
905                 "SET on a non-active vbucket should not require a "
906                 "bg_metadata_fetch.");
907     }
908
909     return ret;
910 }
911
912 ENGINE_ERROR_CODE VBucket::setWithMeta(Item& itm,
913                                        const uint64_t cas,
914                                        uint64_t* seqno,
915                                        const void* cookie,
916                                        EventuallyPersistentEngine& engine,
917                                        const int bgFetchDelay,
918                                        const bool force,
919                                        const bool allowExisting,
920                                        const GenerateBySeqno genBySeqno,
921                                        const GenerateCas genCas,
922                                        const bool isReplication) {
923     auto hbl = ht.getLockedBucket(itm.getKey());
924     StoredValue* v = ht.unlocked_find(itm.getKey(),
925                                       hbl.getBucketNum(),
926                                       WantsDeleted::Yes,
927                                       TrackReference::No);
928
929     bool maybeKeyExists = true;
930     if (!force) {
931         if (v) {
932             if (v->isTempInitialItem()) {
933                 bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
934                 return ENGINE_EWOULDBLOCK;
935             }
936
937             if (!(conflictResolver->resolve(*v,
938                                             itm.getMetaData(),
939                                             itm.getDataType(),
940                                             itm.isDeleted()))) {
941                 ++stats.numOpsSetMetaResolutionFailed;
942                 return ENGINE_KEY_EEXISTS;
943             }
944         } else {
945             if (maybeKeyExistsInFilter(itm.getKey())) {
946                 return addTempItemAndBGFetch(hbl,
947                                              itm.getKey(),
948                                              cookie,
949                                              engine,
950                                              bgFetchDelay,
951                                              true,
952                                              isReplication);
953             } else {
954                 maybeKeyExists = false;
955             }
956         }
957     } else {
958         if (eviction == FULL_EVICTION) {
959             // Check Bloomfilter's prediction
960             if (!maybeKeyExistsInFilter(itm.getKey())) {
961                 maybeKeyExists = false;
962             }
963         }
964     }
965
966     if (v && v->isLocked(ep_current_time()) &&
967         (getState() == vbucket_state_replica ||
968          getState() == vbucket_state_pending)) {
969         v->unlock();
970     }
971
972     VBQueueItemCtx queueItmCtx(genBySeqno,
973                                genCas,
974                                TrackCasDrift::Yes,
975                                /*isBackfillItem*/ false,
976                                nullptr /* No pre link step needed */);
977     MutationStatus status;
978     VBNotifyCtx notifyCtx;
979     std::tie(status, notifyCtx) = processSet(hbl,
980                                              v,
981                                              itm,
982                                              cas,
983                                              allowExisting,
984                                              true,
985                                              &queueItmCtx,
986                                              maybeKeyExists,
987                                              isReplication);
988
989     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
990     switch (status) {
991     case MutationStatus::NoMem:
992         ret = ENGINE_ENOMEM;
993         break;
994     case MutationStatus::InvalidCas:
995         ret = ENGINE_KEY_EEXISTS;
996         break;
997     case MutationStatus::IsLocked:
998         ret = ENGINE_LOCKED;
999         break;
1000     case MutationStatus::WasDirty:
1001     case MutationStatus::WasClean: {
1002         if (seqno) {
1003             *seqno = static_cast<uint64_t>(v->getBySeqno());
1004         }
1005         // we unlock ht lock here because we want to avoid potential lock
1006         // inversions arising from notifyNewSeqno() call
1007         hbl.getHTLock().unlock();
1008         notifyNewSeqno(notifyCtx);
1009     } break;
1010     case MutationStatus::NotFound:
1011         ret = ENGINE_KEY_ENOENT;
1012         break;
1013     case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
1014         // + full eviction.
1015         if (v) { // temp item is already created. Simply schedule a
1016             hbl.getHTLock().unlock(); // bg fetch job.
1017             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1018             return ENGINE_EWOULDBLOCK;
1019         }
1020         ret = addTempItemAndBGFetch(hbl,
1021                                     itm.getKey(),
1022                                     cookie,
1023                                     engine,
1024                                     bgFetchDelay,
1025                                     true,
1026                                     isReplication);
1027     }
1028     }
1029
1030     return ret;
1031 }
1032
1033 ENGINE_ERROR_CODE VBucket::deleteItem(const DocKey& key,
1034                                       uint64_t& cas,
1035                                       const void* cookie,
1036                                       EventuallyPersistentEngine& engine,
1037                                       const int bgFetchDelay,
1038                                       ItemMetaData* itemMeta,
1039                                       mutation_descr_t* mutInfo) {
1040     auto hbl = ht.getLockedBucket(key);
1041     StoredValue* v = ht.unlocked_find(
1042             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1043
1044     if (!v || v->isDeleted() || v->isTempItem()) {
1045         if (eviction == VALUE_ONLY) {
1046             return ENGINE_KEY_ENOENT;
1047         } else { // Full eviction.
1048             if (!v) { // Item might be evicted from cache.
1049                 if (maybeKeyExistsInFilter(key)) {
1050                     return addTempItemAndBGFetch(
1051                             hbl, key, cookie, engine, bgFetchDelay, true);
1052                 } else {
1053                     // As bloomfilter predicted that item surely doesn't
1054                     // exist on disk, return ENOENT for deleteItem().
1055                     return ENGINE_KEY_ENOENT;
1056                 }
1057             } else if (v->isTempInitialItem()) {
1058                 hbl.getHTLock().unlock();
1059                 bgFetch(key, cookie, engine, bgFetchDelay, true);
1060                 return ENGINE_EWOULDBLOCK;
1061             } else { // Non-existent or deleted key.
1062                 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1063                     // Delete a temp non-existent item to ensure that
1064                     // if a delete were issued over an item that doesn't
1065                     // exist, then we don't preserve a temp item.
1066                     deleteStoredValue(hbl, *v);
1067                 }
1068                 return ENGINE_KEY_ENOENT;
1069             }
1070         }
1071     }
1072
1073     if (v->isLocked(ep_current_time()) &&
1074         (getState() == vbucket_state_replica ||
1075          getState() == vbucket_state_pending)) {
1076         v->unlock();
1077     }
1078
1079     if (itemMeta != nullptr) {
1080         itemMeta->cas = v->getCas();
1081     }
1082
1083     MutationStatus delrv;
1084     VBNotifyCtx notifyCtx;
1085     if (v->isExpired(ep_real_time())) {
1086         std::tie(delrv, v, notifyCtx) = processExpiredItem(hbl, *v);
1087     } else {
1088         ItemMetaData metadata;
1089         metadata.revSeqno = v->getRevSeqno() + 1;
1090         std::tie(delrv, v, notifyCtx) =
1091                 processSoftDelete(hbl,
1092                                   *v,
1093                                   cas,
1094                                   metadata,
1095                                   VBQueueItemCtx(GenerateBySeqno::Yes,
1096                                                  GenerateCas::Yes,
1097                                                  TrackCasDrift::No,
1098                                                  /*isBackfillItem*/ false,
1099                                                  nullptr /* no pre link */),
1100                                   /*use_meta*/ false,
1101                                   /*bySeqno*/ v->getBySeqno());
1102     }
1103
1104     uint64_t seqno = 0;
1105     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1106     switch (delrv) {
1107     case MutationStatus::NoMem:
1108         ret = ENGINE_ENOMEM;
1109         break;
1110     case MutationStatus::InvalidCas:
1111         ret = ENGINE_KEY_EEXISTS;
1112         break;
1113     case MutationStatus::IsLocked:
1114         ret = ENGINE_LOCKED_TMPFAIL;
1115         break;
1116     case MutationStatus::NotFound:
1117         ret = ENGINE_KEY_ENOENT;
1118     /* Fallthrough:
1119      * A NotFound return value at this point indicates that the
1120      * item has expired. But, a deletion still needs to be queued
1121      * for the item in order to persist it.
1122      */
1123     case MutationStatus::WasClean:
1124     case MutationStatus::WasDirty:
1125         if (itemMeta != nullptr) {
1126             itemMeta->revSeqno = v->getRevSeqno();
1127             itemMeta->flags = v->getFlags();
1128             itemMeta->exptime = v->getExptime();
1129         }
1130
1131         notifyNewSeqno(notifyCtx);
1132         seqno = static_cast<uint64_t>(v->getBySeqno());
1133         cas = v->getCas();
1134
1135         if (delrv != MutationStatus::NotFound) {
1136             if (mutInfo) {
1137                 mutInfo->seqno = seqno;
1138                 mutInfo->vbucket_uuid = failovers->getLatestUUID();
1139             }
1140             if (itemMeta != nullptr) {
1141                 itemMeta->cas = v->getCas();
1142             }
1143         }
1144         break;
1145     case MutationStatus::NeedBgFetch:
1146         // We already figured out if a bg fetch is requred for a full-evicted
1147         // item above.
1148         throw std::logic_error(
1149                 "VBucket::deleteItem: "
1150                 "Unexpected NEEDS_BG_FETCH from processSoftDelete");
1151     }
1152     return ret;
1153 }
1154
1155 ENGINE_ERROR_CODE VBucket::deleteWithMeta(const DocKey& key,
1156                                           uint64_t& cas,
1157                                           uint64_t* seqno,
1158                                           const void* cookie,
1159                                           EventuallyPersistentEngine& engine,
1160                                           const int bgFetchDelay,
1161                                           const bool force,
1162                                           const ItemMetaData& itemMeta,
1163                                           const bool backfill,
1164                                           const GenerateBySeqno genBySeqno,
1165                                           const GenerateCas generateCas,
1166                                           const uint64_t bySeqno,
1167                                           const bool isReplication) {
1168     auto hbl = ht.getLockedBucket(key);
1169     StoredValue* v = ht.unlocked_find(
1170             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1171     if (!force) { // Need conflict resolution.
1172         if (v) {
1173             if (v->isTempInitialItem()) {
1174                 bgFetch(key, cookie, engine, bgFetchDelay, true);
1175                 return ENGINE_EWOULDBLOCK;
1176             }
1177
1178             if (!(conflictResolver->resolve(*v,
1179                                             itemMeta,
1180                                             PROTOCOL_BINARY_RAW_BYTES,
1181                                             true))) {
1182                 ++stats.numOpsDelMetaResolutionFailed;
1183                 return ENGINE_KEY_EEXISTS;
1184             }
1185         } else {
1186             // Item is 1) deleted or not existent in the value eviction case OR
1187             // 2) deleted or evicted in the full eviction.
1188             if (maybeKeyExistsInFilter(key)) {
1189                 return addTempItemAndBGFetch(hbl,
1190                                              key,
1191                                              cookie,
1192                                              engine,
1193                                              bgFetchDelay,
1194                                              true,
1195                                              isReplication);
1196             } else {
1197                 // Even though bloomfilter predicted that item doesn't exist
1198                 // on disk, we must put this delete on disk if the cas is valid.
1199                 AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1200                 if (rv == AddStatus::NoMem) {
1201                     return ENGINE_ENOMEM;
1202                 }
1203                 v = ht.unlocked_find(key,
1204                                      hbl.getBucketNum(),
1205                                      WantsDeleted::Yes,
1206                                      TrackReference::No);
1207                 v->setDeleted();
1208             }
1209         }
1210     } else {
1211         if (!v) {
1212             // We should always try to persist a delete here.
1213             AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1214             if (rv == AddStatus::NoMem) {
1215                 return ENGINE_ENOMEM;
1216             }
1217             v = ht.unlocked_find(key,
1218                                  hbl.getBucketNum(),
1219                                  WantsDeleted::Yes,
1220                                  TrackReference::No);
1221             v->setDeleted();
1222             v->setCas(cas);
1223         } else if (v->isTempInitialItem()) {
1224             v->setDeleted();
1225             v->setCas(cas);
1226         }
1227     }
1228
1229     if (v && v->isLocked(ep_current_time()) &&
1230         (getState() == vbucket_state_replica ||
1231          getState() == vbucket_state_pending)) {
1232         v->unlock();
1233     }
1234
1235     MutationStatus delrv;
1236     VBNotifyCtx notifyCtx;
1237     if (!v) {
1238         if (eviction == FULL_EVICTION) {
1239             delrv = MutationStatus::NeedBgFetch;
1240         } else {
1241             delrv = MutationStatus::NotFound;
1242         }
1243     } else {
1244         VBQueueItemCtx queueItmCtx(genBySeqno,
1245                                    generateCas,
1246                                    TrackCasDrift::Yes,
1247                                    backfill,
1248                                    nullptr /* No pre link step needed */);
1249         std::tie(delrv, v, notifyCtx) = processSoftDelete(hbl,
1250                                                           *v,
1251                                                           cas,
1252                                                           itemMeta,
1253                                                           queueItmCtx,
1254                                                           /*use_meta*/ true,
1255                                                           bySeqno);
1256     }
1257     cas = v ? v->getCas() : 0;
1258
1259     switch (delrv) {
1260     case MutationStatus::NoMem:
1261         return ENGINE_ENOMEM;
1262     case MutationStatus::InvalidCas:
1263         return ENGINE_KEY_EEXISTS;
1264     case MutationStatus::IsLocked:
1265         return ENGINE_LOCKED_TMPFAIL;
1266     case MutationStatus::NotFound:
1267         return ENGINE_KEY_ENOENT;
1268     case MutationStatus::WasDirty:
1269     case MutationStatus::WasClean: {
1270         if (seqno) {
1271             *seqno = static_cast<uint64_t>(v->getBySeqno());
1272         }
1273         // we unlock ht lock here because we want to avoid potential lock
1274         // inversions arising from notifyNewSeqno() call
1275         hbl.getHTLock().unlock();
1276         notifyNewSeqno(notifyCtx);
1277         break;
1278     }
1279     case MutationStatus::NeedBgFetch:
1280         hbl.getHTLock().unlock();
1281         bgFetch(key, cookie, engine, bgFetchDelay, true);
1282         return ENGINE_EWOULDBLOCK;
1283     }
1284     return ENGINE_SUCCESS;
1285 }
1286
1287 void VBucket::deleteExpiredItem(const DocKey& key,
1288                                 time_t startTime,
1289                                 uint64_t revSeqno,
1290                                 ExpireBy source) {
1291     auto hbl = ht.getLockedBucket(key);
1292     StoredValue* v = ht.unlocked_find(
1293             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1294     if (v) {
1295         if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1296             // This is a temporary item whose background fetch for metadata
1297             // has completed.
1298             bool deleted = deleteStoredValue(hbl, *v);
1299             if (!deleted) {
1300                 throw std::logic_error(
1301                         "VBucket::deleteExpiredItem: "
1302                         "Failed to delete seqno:" +
1303                         std::to_string(v->getBySeqno()) + " from bucket " +
1304                         std::to_string(hbl.getBucketNum()));
1305             }
1306         } else if (v->isExpired(startTime) && !v->isDeleted()) {
1307             handlePreExpiry(*v);
1308             VBNotifyCtx notifyCtx;
1309             std::tie(std::ignore, std::ignore, notifyCtx) =
1310                     processExpiredItem(hbl, *v);
1311             // we unlock ht lock here because we want to avoid potential lock
1312             // inversions arising from notifyNewSeqno() call
1313             hbl.getHTLock().unlock();
1314             notifyNewSeqno(notifyCtx);
1315         }
1316     } else {
1317         if (eviction == FULL_EVICTION) {
1318             // Create a temp item and delete and push it
1319             // into the checkpoint queue, only if the bloomfilter
1320             // predicts that the item may exist on disk.
1321             if (maybeKeyExistsInFilter(key)) {
1322                 AddStatus rv = addTempStoredValue(hbl, key);
1323                 if (rv == AddStatus::NoMem) {
1324                     return;
1325                 }
1326                 v = ht.unlocked_find(key,
1327                                      hbl.getBucketNum(),
1328                                      WantsDeleted::Yes,
1329                                      TrackReference::No);
1330                 v->setDeleted();
1331                 v->setRevSeqno(revSeqno);
1332                 VBNotifyCtx notifyCtx;
1333                 std::tie(std::ignore, std::ignore, notifyCtx) =
1334                         processExpiredItem(hbl, *v);
1335                 // we unlock ht lock here because we want to avoid potential
1336                 // lock inversions arising from notifyNewSeqno() call
1337                 hbl.getHTLock().unlock();
1338                 notifyNewSeqno(notifyCtx);
1339             }
1340         }
1341     }
1342     incExpirationStat(source);
1343 }
1344
1345 ENGINE_ERROR_CODE VBucket::add(Item& itm,
1346                                const void* cookie,
1347                                EventuallyPersistentEngine& engine,
1348                                const int bgFetchDelay) {
1349     auto hbl = ht.getLockedBucket(itm.getKey());
1350     StoredValue* v = ht.unlocked_find(itm.getKey(),
1351                                       hbl.getBucketNum(),
1352                                       WantsDeleted::Yes,
1353                                       TrackReference::No);
1354
1355     bool maybeKeyExists = true;
1356     if ((v == nullptr || v->isTempInitialItem()) &&
1357         (eviction == FULL_EVICTION)) {
1358         // Check bloomfilter's prediction
1359         if (!maybeKeyExistsInFilter(itm.getKey())) {
1360             maybeKeyExists = false;
1361         }
1362     }
1363
1364     PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
1365     VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
1366                                GenerateCas::Yes,
1367                                TrackCasDrift::No,
1368                                /*isBackfillItem*/ false,
1369                                &preLinkDocumentContext);
1370     AddStatus status;
1371     VBNotifyCtx notifyCtx;
1372     std::tie(status, notifyCtx) =
1373             processAdd(hbl, v, itm, maybeKeyExists, false, &queueItmCtx);
1374
1375     switch (status) {
1376     case AddStatus::NoMem:
1377         return ENGINE_ENOMEM;
1378     case AddStatus::Exists:
1379         return ENGINE_NOT_STORED;
1380     case AddStatus::AddTmpAndBgFetch:
1381         return addTempItemAndBGFetch(
1382                 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
1383     case AddStatus::BgFetch:
1384         hbl.getHTLock().unlock();
1385         bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1386         return ENGINE_EWOULDBLOCK;
1387     case AddStatus::Success:
1388     case AddStatus::UnDel:
1389         notifyNewSeqno(notifyCtx);
1390         itm.setBySeqno(v->getBySeqno());
1391         itm.setCas(v->getCas());
1392         break;
1393     }
1394     return ENGINE_SUCCESS;
1395 }
1396
1397 GetValue VBucket::getAndUpdateTtl(const DocKey& key,
1398                                   const void* cookie,
1399                                   EventuallyPersistentEngine& engine,
1400                                   int bgFetchDelay,
1401                                   time_t exptime) {
1402     auto hbl = ht.getLockedBucket(key);
1403     StoredValue* v = fetchValidValue(hbl,
1404                                      key,
1405                                      WantsDeleted::Yes,
1406                                      TrackReference::Yes,
1407                                      QueueExpired::Yes);
1408
1409     if (v) {
1410         if (v->isDeleted() || v->isTempDeletedItem() ||
1411             v->isTempNonExistentItem()) {
1412             return {};
1413         }
1414
1415         if (!v->isResident()) {
1416             bgFetch(key, cookie, engine, bgFetchDelay);
1417             return GetValue(nullptr, ENGINE_EWOULDBLOCK, v->getBySeqno());
1418         }
1419         if (v->isLocked(ep_current_time())) {
1420             return GetValue(nullptr, ENGINE_KEY_EEXISTS, 0);
1421         }
1422
1423         const bool exptime_mutated = exptime != v->getExptime();
1424         if (exptime_mutated) {
1425             v->markDirty();
1426             v->setExptime(exptime);
1427             v->setRevSeqno(v->getRevSeqno() + 1);
1428         }
1429
1430         GetValue rv(
1431                 v->toItem(v->isLocked(ep_current_time()), getId()).release(),
1432                 ENGINE_SUCCESS,
1433                 v->getBySeqno());
1434
1435         if (exptime_mutated) {
1436             VBNotifyCtx notifyCtx = queueDirty(*v);
1437             // we unlock ht lock here because we want to avoid potential lock
1438             // inversions arising from notifyNewSeqno() call
1439             hbl.getHTLock().unlock();
1440             notifyNewSeqno(notifyCtx);
1441         }
1442
1443         return rv;
1444     } else {
1445         if (eviction == VALUE_ONLY) {
1446             return {};
1447         } else {
1448             if (maybeKeyExistsInFilter(key)) {
1449                 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1450                         hbl, key, cookie, engine, bgFetchDelay, false);
1451                 return GetValue(NULL, ec, -1, true);
1452             } else {
1453                 // As bloomfilter predicted that item surely doesn't exist
1454                 // on disk, return ENOENT for getAndUpdateTtl().
1455                 return {};
1456             }
1457         }
1458     }
1459 }
1460
1461 MutationStatus VBucket::insertFromWarmup(Item& itm,
1462                                          bool eject,
1463                                          bool keyMetaDataOnly) {
1464     if (!StoredValue::hasAvailableSpace(stats, itm)) {
1465         return MutationStatus::NoMem;
1466     }
1467
1468     auto hbl = ht.getLockedBucket(itm.getKey());
1469     StoredValue* v = ht.unlocked_find(itm.getKey(),
1470                                       hbl.getBucketNum(),
1471                                       WantsDeleted::Yes,
1472                                       TrackReference::No);
1473
1474     if (v == NULL) {
1475         v = addNewStoredValue(hbl, itm, /*queueItmCtx*/ nullptr).first;
1476         if (keyMetaDataOnly) {
1477             v->markNotResident();
1478             /* For now ht stats are updated from outside ht. This seems to be
1479                a better option for now than passing a flag to
1480                addNewStoredValue() just for this func */
1481             ++(ht.numNonResidentItems);
1482         }
1483         /* For now ht stats are updated from outside ht. This seems to be
1484            a better option for now than passing a flag to
1485            addNewStoredValue() just for this func.
1486            We need to decrNumTotalItems because ht.numTotalItems is already
1487            set by warmup when it estimated the item count from disk */
1488         ht.decrNumTotalItems();
1489         v->setNewCacheItem(false);
1490     } else {
1491         if (keyMetaDataOnly) {
1492             // We don't have a better error code ;)
1493             return MutationStatus::InvalidCas;
1494         }
1495
1496         // Verify that the CAS isn't changed
1497         if (v->getCas() != itm.getCas()) {
1498             if (v->getCas() == 0) {
1499                 v->setCas(itm.getCas());
1500                 v->setFlags(itm.getFlags());
1501                 v->setExptime(itm.getExptime());
1502                 v->setRevSeqno(itm.getRevSeqno());
1503             } else {
1504                 return MutationStatus::InvalidCas;
1505             }
1506         }
1507         updateStoredValue(hbl, *v, itm, /*queueItmCtx*/ nullptr);
1508     }
1509
1510     v->markClean();
1511
1512     if (eject && !keyMetaDataOnly) {
1513         ht.unlocked_ejectItem(v, eviction);
1514     }
1515
1516     return MutationStatus::NotFound;
1517 }
1518
1519 GetValue VBucket::getInternal(const DocKey& key,
1520                               const void* cookie,
1521                               EventuallyPersistentEngine& engine,
1522                               int bgFetchDelay,
1523                               get_options_t options,
1524                               bool diskFlushAll) {
1525     const TrackReference trackReference = (options & TRACK_REFERENCE)
1526                                                   ? TrackReference::Yes
1527                                                   : TrackReference::No;
1528     const bool getDeletedValue = (options & GET_DELETED_VALUE);
1529     auto hbl = ht.getLockedBucket(key);
1530     StoredValue* v = fetchValidValue(
1531             hbl, key, WantsDeleted::Yes, trackReference, QueueExpired::Yes);
1532     if (v) {
1533         if (v->isDeleted() && !getDeletedValue) {
1534             return GetValue();
1535         }
1536         if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1537             // Delete a temp non-existent item to ensure that
1538             // if the get were issued over an item that doesn't
1539             // exist, then we dont preserve a temp item.
1540             if (options & DELETE_TEMP) {
1541                 deleteStoredValue(hbl, *v);
1542             }
1543             return GetValue();
1544         }
1545
1546         // If the value is not resident, wait for it...
1547         if (!v->isResident()) {
1548             return getInternalNonResident(
1549                     key, cookie, engine, bgFetchDelay, options, *v);
1550         }
1551
1552         // Should we hide (return -1) for the items' CAS?
1553         const bool hide_cas =
1554                 (options & HIDE_LOCKED_CAS) && v->isLocked(ep_current_time());
1555         return GetValue(v->toItem(hide_cas, getId()).release(),
1556                         ENGINE_SUCCESS,
1557                         v->getBySeqno(),
1558                         false,
1559                         v->getNRUValue());
1560     } else {
1561         if (!getDeletedValue && (eviction == VALUE_ONLY || diskFlushAll)) {
1562             return GetValue();
1563         }
1564
1565         if (maybeKeyExistsInFilter(key)) {
1566             ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1567             if (options &
1568                 QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
1569                 ec = addTempItemAndBGFetch(
1570                         hbl, key, cookie, engine, bgFetchDelay, false);
1571             }
1572             return GetValue(NULL, ec, -1, true);
1573         } else {
1574             // As bloomfilter predicted that item surely doesn't exist
1575             // on disk, return ENOENT, for getInternal().
1576             return GetValue();
1577         }
1578     }
1579 }
1580
1581 ENGINE_ERROR_CODE VBucket::getMetaData(const DocKey& key,
1582                                        const void* cookie,
1583                                        EventuallyPersistentEngine& engine,
1584                                        int bgFetchDelay,
1585                                        bool fetchDatatype,
1586                                        ItemMetaData& metadata,
1587                                        uint32_t& deleted,
1588                                        uint8_t& datatype) {
1589     deleted = 0;
1590     auto hbl = ht.getLockedBucket(key);
1591     StoredValue* v = ht.unlocked_find(
1592             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1593
1594     if (v) {
1595         stats.numOpsGetMeta++;
1596         if (v->isTempInitialItem() || (fetchDatatype && !v->isResident())) {
1597             // Need bg meta fetch.
1598             bgFetch(key, cookie, engine, bgFetchDelay, !fetchDatatype);
1599             return ENGINE_EWOULDBLOCK;
1600         } else if (v->isTempNonExistentItem()) {
1601             metadata.cas = v->getCas();
1602             return ENGINE_KEY_ENOENT;
1603         } else {
1604             if (v->isTempDeletedItem() || v->isDeleted() ||
1605                 v->isExpired(ep_real_time())) {
1606                 deleted |= GET_META_ITEM_DELETED_FLAG;
1607             }
1608
1609             if (v->isLocked(ep_current_time())) {
1610                 metadata.cas = static_cast<uint64_t>(-1);
1611             } else {
1612                 metadata.cas = v->getCas();
1613             }
1614             metadata.flags = v->getFlags();
1615             metadata.exptime = v->getExptime();
1616             metadata.revSeqno = v->getRevSeqno();
1617
1618             if (fetchDatatype) {
1619                 value_t value = v->getValue();
1620                 if (value) {
1621                     datatype = value->getDataType();
1622                 }
1623             }
1624
1625             return ENGINE_SUCCESS;
1626         }
1627     } else {
1628         // The key wasn't found. However, this may be because it was previously
1629         // deleted or evicted with the full eviction strategy.
1630         // So, add a temporary item corresponding to the key to the hash table
1631         // and schedule a background fetch for its metadata from the persistent
1632         // store. The item's state will be updated after the fetch completes.
1633         //
1634         // Schedule this bgFetch only if the key is predicted to be may-be
1635         // existent on disk by the bloomfilter.
1636
1637         if (maybeKeyExistsInFilter(key)) {
1638             return addTempItemAndBGFetch(
1639                     hbl, key, cookie, engine, bgFetchDelay, !fetchDatatype);
1640         } else {
1641             stats.numOpsGetMeta++;
1642             return ENGINE_KEY_ENOENT;
1643         }
1644     }
1645 }
1646
1647 ENGINE_ERROR_CODE VBucket::getKeyStats(const DocKey& key,
1648                                        const void* cookie,
1649                                        EventuallyPersistentEngine& engine,
1650                                        int bgFetchDelay,
1651                                        struct key_stats& kstats,
1652                                        WantsDeleted wantsDeleted) {
1653     auto hbl = ht.getLockedBucket(key);
1654     StoredValue* v = fetchValidValue(hbl,
1655                                      key,
1656                                      WantsDeleted::Yes,
1657                                      TrackReference::Yes,
1658                                      QueueExpired::Yes);
1659
1660     if (v) {
1661         if ((v->isDeleted() && wantsDeleted == WantsDeleted::No) ||
1662             v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1663             return ENGINE_KEY_ENOENT;
1664         }
1665         if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
1666             hbl.getHTLock().unlock();
1667             bgFetch(key, cookie, engine, bgFetchDelay, true);
1668             return ENGINE_EWOULDBLOCK;
1669         }
1670         kstats.logically_deleted = v->isDeleted();
1671         kstats.dirty = v->isDirty();
1672         kstats.exptime = v->getExptime();
1673         kstats.flags = v->getFlags();
1674         kstats.cas = v->getCas();
1675         kstats.vb_state = getState();
1676         return ENGINE_SUCCESS;
1677     } else {
1678         if (eviction == VALUE_ONLY) {
1679             return ENGINE_KEY_ENOENT;
1680         } else {
1681             if (maybeKeyExistsInFilter(key)) {
1682                 return addTempItemAndBGFetch(
1683                         hbl, key, cookie, engine, bgFetchDelay, true);
1684             } else {
1685                 // If bgFetch were false, or bloomfilter predicted that
1686                 // item surely doesn't exist on disk, return ENOENT for
1687                 // getKeyStats().
1688                 return ENGINE_KEY_ENOENT;
1689             }
1690         }
1691     }
1692 }
1693
1694 GetValue VBucket::getLocked(const DocKey& key,
1695                             rel_time_t currentTime,
1696                             uint32_t lockTimeout,
1697                             const void* cookie,
1698                             EventuallyPersistentEngine& engine,
1699                             int bgFetchDelay) {
1700     auto hbl = ht.getLockedBucket(key);
1701     StoredValue* v = fetchValidValue(hbl,
1702                                      key,
1703                                      WantsDeleted::Yes,
1704                                      TrackReference::Yes,
1705                                      QueueExpired::Yes);
1706
1707     if (v) {
1708         if (v->isDeleted() || v->isTempNonExistentItem() ||
1709             v->isTempDeletedItem()) {
1710             return GetValue(NULL, ENGINE_KEY_ENOENT);
1711         }
1712
1713         // if v is locked return error
1714         if (v->isLocked(currentTime)) {
1715             return GetValue(NULL, ENGINE_TMPFAIL);
1716         }
1717
1718         // If the value is not resident, wait for it...
1719         if (!v->isResident()) {
1720             if (cookie) {
1721                 bgFetch(key, cookie, engine, bgFetchDelay);
1722             }
1723             return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
1724         }
1725
1726         // acquire lock and increment cas value
1727         v->lock(currentTime + lockTimeout);
1728
1729         auto it = v->toItem(false, getId());
1730         it->setCas(nextHLCCas());
1731         v->setCas(it->getCas());
1732
1733         return GetValue(it.release());
1734
1735     } else {
1736         // No value found in the hashtable.
1737         switch (eviction) {
1738         case VALUE_ONLY:
1739             return GetValue(NULL, ENGINE_KEY_ENOENT);
1740
1741         case FULL_EVICTION:
1742             if (maybeKeyExistsInFilter(key)) {
1743                 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1744                         hbl, key, cookie, engine, bgFetchDelay, false);
1745                 return GetValue(NULL, ec, -1, true);
1746             } else {
1747                 // As bloomfilter predicted that item surely doesn't exist
1748                 // on disk, return ENOENT for getLocked().
1749                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1750             }
1751         }
1752         return GetValue(); // just to prevent compiler warning
1753     }
1754 }
1755
1756 void VBucket::deletedOnDiskCbk(const Item& queuedItem, bool deleted) {
1757     auto hbl = ht.getLockedBucket(queuedItem.getKey());
1758     StoredValue* v = fetchValidValue(hbl,
1759                                      queuedItem.getKey(),
1760                                      WantsDeleted::Yes,
1761                                      TrackReference::No,
1762                                      QueueExpired::Yes);
1763     // Delete the item in the hash table iff:
1764     //  1. Item is existent in hashtable, and deleted flag is true
1765     //  2. rev seqno of queued item matches rev seqno of hash table item
1766     if (v && v->isDeleted() && (queuedItem.getRevSeqno() == v->getRevSeqno())) {
1767         bool isDeleted = deleteStoredValue(hbl, *v);
1768         if (!isDeleted) {
1769             throw std::logic_error(
1770                     "deletedOnDiskCbk:callback: "
1771                     "Failed to delete key with seqno:" +
1772                     std::to_string(v->getBySeqno()) + "' from bucket " +
1773                     std::to_string(hbl.getBucketNum()));
1774         }
1775
1776         /**
1777          * Deleted items are to be added to the bloomfilter,
1778          * in either eviction policy.
1779          */
1780         addToFilter(queuedItem.getKey());
1781     }
1782
1783     if (deleted) {
1784         ++stats.totalPersisted;
1785         ++opsDelete;
1786     }
1787     doStatsForFlushing(queuedItem, queuedItem.size());
1788     --stats.diskQueueSize;
1789     decrMetaDataDisk(queuedItem);
1790 }
1791
1792 bool VBucket::deleteKey(const DocKey& key) {
1793     auto hbl = ht.getLockedBucket(key);
1794     StoredValue* v = ht.unlocked_find(
1795             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1796     if (!v) {
1797         return false;
1798     }
1799     return deleteStoredValue(hbl, *v);
1800 }
1801
1802 void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
1803                                   uint64_t prevHighSeqno) {
1804     failovers->pruneEntries(rollbackResult.highSeqno);
1805     checkpointManager.clear(*this, rollbackResult.highSeqno);
1806     setPersistedSnapshot(rollbackResult.snapStartSeqno,
1807                          rollbackResult.snapEndSeqno);
1808     incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
1809     setBackfillPhase(false);
1810 }
1811
1812 void VBucket::dump() const {
1813     std::cerr << "VBucket[" << this << "] with state: " << toString(getState())
1814               << " numItems:" << getNumItems()
1815               << " numNonResident:" << getNumNonResidentItems() << std::endl;
1816 }
1817
1818 void VBucket::_addStats(bool details, ADD_STAT add_stat, const void* c) {
1819     addStat(NULL, toString(state), add_stat, c);
1820     if (details) {
1821         size_t numItems = getNumItems();
1822         size_t tempItems = getNumTempItems();
1823         addStat("num_items", numItems, add_stat, c);
1824         addStat("num_temp_items", tempItems, add_stat, c);
1825         addStat("num_non_resident", getNumNonResidentItems(), add_stat, c);
1826         addStat("ht_memory", ht.memorySize(), add_stat, c);
1827         addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
1828         addStat("ht_cache_size", ht.cacheSize.load(), add_stat, c);
1829         addStat("num_ejects", ht.getNumEjects(), add_stat, c);
1830         addStat("ops_create", opsCreate.load(), add_stat, c);
1831         addStat("ops_update", opsUpdate.load(), add_stat, c);
1832         addStat("ops_delete", opsDelete.load(), add_stat, c);
1833         addStat("ops_reject", opsReject.load(), add_stat, c);
1834         addStat("queue_size", dirtyQueueSize.load(), add_stat, c);
1835         addStat("backfill_queue_size", getBackfillSize(), add_stat, c);
1836         addStat("queue_memory", dirtyQueueMem.load(), add_stat, c);
1837         addStat("queue_fill", dirtyQueueFill.load(), add_stat, c);
1838         addStat("queue_drain", dirtyQueueDrain.load(), add_stat, c);
1839         addStat("queue_age", getQueueAge(), add_stat, c);
1840         addStat("pending_writes", dirtyQueuePendingWrites.load(), add_stat, c);
1841
1842         addStat("high_seqno", getHighSeqno(), add_stat, c);
1843         addStat("uuid", failovers->getLatestUUID(), add_stat, c);
1844         addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
1845         addStat("bloom_filter", getFilterStatusString().data(),
1846                 add_stat, c);
1847         addStat("bloom_filter_size", getFilterSize(), add_stat, c);
1848         addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
1849         addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
1850         addStat("hp_vb_req_size", getHighPriorityChkSize(), add_stat, c);
1851         hlc.addStats(statPrefix, add_stat, c);
1852     }
1853 }
1854
1855 void VBucket::decrDirtyQueueMem(size_t decrementBy)
1856 {
1857     size_t oldVal, newVal;
1858     do {
1859         oldVal = dirtyQueueMem.load(std::memory_order_relaxed);
1860         if (oldVal < decrementBy) {
1861             newVal = 0;
1862         } else {
1863             newVal = oldVal - decrementBy;
1864         }
1865     } while (!dirtyQueueMem.compare_exchange_strong(oldVal, newVal));
1866 }
1867
1868 void VBucket::decrDirtyQueueAge(uint32_t decrementBy)
1869 {
1870     uint64_t oldVal, newVal;
1871     do {
1872         oldVal = dirtyQueueAge.load(std::memory_order_relaxed);
1873         if (oldVal < decrementBy) {
1874             newVal = 0;
1875         } else {
1876             newVal = oldVal - decrementBy;
1877         }
1878     } while (!dirtyQueueAge.compare_exchange_strong(oldVal, newVal));
1879 }
1880
1881 void VBucket::decrDirtyQueuePendingWrites(size_t decrementBy)
1882 {
1883     size_t oldVal, newVal;
1884     do {
1885         oldVal = dirtyQueuePendingWrites.load(std::memory_order_relaxed);
1886         if (oldVal < decrementBy) {
1887             newVal = 0;
1888         } else {
1889             newVal = oldVal - decrementBy;
1890         }
1891     } while (!dirtyQueuePendingWrites.compare_exchange_strong(oldVal, newVal));
1892 }
1893
1894 std::pair<MutationStatus, VBNotifyCtx> VBucket::processSet(
1895         const HashTable::HashBucketLock& hbl,
1896         StoredValue*& v,
1897         Item& itm,
1898         uint64_t cas,
1899         bool allowExisting,
1900         bool hasMetaData,
1901         const VBQueueItemCtx* queueItmCtx,
1902         bool maybeKeyExists,
1903         bool isReplication) {
1904     if (!hbl.getHTLock()) {
1905         throw std::invalid_argument(
1906                 "VBucket::processSet: htLock not held for "
1907                 "VBucket " +
1908                 std::to_string(getId()));
1909     }
1910
1911     if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
1912         return {MutationStatus::NoMem, VBNotifyCtx()};
1913     }
1914
1915     if (cas && eviction == FULL_EVICTION && maybeKeyExists) {
1916         if (!v || v->isTempInitialItem()) {
1917             return {MutationStatus::NeedBgFetch, VBNotifyCtx()};
1918         }
1919     }
1920
1921     /*
1922      * prior to checking for the lock, we should check if this object
1923      * has expired. If so, then check if CAS value has been provided
1924      * for this set op. In this case the operation should be denied since
1925      * a cas operation for a key that doesn't exist is not a very cool
1926      * thing to do. See MB 3252
1927      */
1928     if (v && v->isExpired(ep_real_time()) && !hasMetaData) {
1929         if (v->isLocked(ep_current_time())) {
1930             v->unlock();
1931         }
1932         if (cas) {
1933             /* item has expired and cas value provided. Deny ! */
1934             return {MutationStatus::NotFound, VBNotifyCtx()};
1935         }
1936     }
1937
1938     if (v) {
1939         if (!allowExisting && !v->isTempItem()) {
1940             return {MutationStatus::InvalidCas, VBNotifyCtx()};
1941         }
1942         if (v->isLocked(ep_current_time())) {
1943             /*
1944              * item is locked, deny if there is cas value mismatch
1945              * or no cas value is provided by the user
1946              */
1947             if (cas != v->getCas()) {
1948                 return {MutationStatus::IsLocked, VBNotifyCtx()};
1949             }
1950             /* allow operation*/
1951             v->unlock();
1952         } else if (cas && cas != v->getCas()) {
1953             if (v->isTempNonExistentItem()) {
1954                 // This is a temporary item which marks a key as non-existent;
1955                 // therefore specifying a non-matching CAS should be exposed
1956                 // as item not existing.
1957                 return {MutationStatus::NotFound, VBNotifyCtx()};
1958             }
1959             if ((v->isTempDeletedItem() || v->isDeleted()) && !itm.isDeleted()) {
1960                 // Existing item is deleted, and we are not replacing it with
1961                 // a (different) deleted value - return not existing.
1962                 return {MutationStatus::NotFound, VBNotifyCtx()};
1963             }
1964             // None of the above special cases; the existing item cannot be
1965             // modified with the specified CAS.
1966             return {MutationStatus::InvalidCas, VBNotifyCtx()};
1967         }
1968         if (!hasMetaData) {
1969             itm.setRevSeqno(v->getRevSeqno() + 1);
1970             /* MB-23530: We must ensure that a replace operation (i.e.
1971              * set with a CAS) /fails/ if the old document is deleted; it
1972              * logically "doesn't exist". However, if the new value is deleted
1973              * this op is a /delete/ with a CAS and we must permit a
1974              * deleted -> deleted transition for Deleted Bodies.
1975              */
1976             if (cas && (v->isDeleted() || v->isTempDeletedItem()) &&
1977                 !itm.isDeleted()) {
1978                 return {MutationStatus::NotFound, VBNotifyCtx()};
1979             }
1980         }
1981
1982         MutationStatus status;
1983         VBNotifyCtx notifyCtx;
1984         std::tie(v, status, notifyCtx) =
1985                 updateStoredValue(hbl, *v, itm, queueItmCtx);
1986         return {status, notifyCtx};
1987     } else if (cas != 0) {
1988         return {MutationStatus::NotFound, VBNotifyCtx()};
1989     } else {
1990         VBNotifyCtx notifyCtx;
1991         std::tie(v, notifyCtx) = addNewStoredValue(hbl, itm, queueItmCtx);
1992         if (!hasMetaData) {
1993             updateRevSeqNoOfNewStoredValue(*v);
1994             itm.setRevSeqno(v->getRevSeqno());
1995         }
1996         return {MutationStatus::WasClean, notifyCtx};
1997     }
1998 }
1999
2000 std::pair<AddStatus, VBNotifyCtx> VBucket::processAdd(
2001         const HashTable::HashBucketLock& hbl,
2002         StoredValue*& v,
2003         Item& itm,
2004         bool maybeKeyExists,
2005         bool isReplication,
2006         const VBQueueItemCtx* queueItmCtx) {
2007     if (!hbl.getHTLock()) {
2008         throw std::invalid_argument(
2009                 "VBucket::processAdd: htLock not held for "
2010                 "VBucket " +
2011                 std::to_string(getId()));
2012     }
2013
2014     if (v && !v->isDeleted() && !v->isExpired(ep_real_time()) &&
2015         !v->isTempItem()) {
2016         return {AddStatus::Exists, VBNotifyCtx()};
2017     }
2018     if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
2019         return {AddStatus::NoMem, VBNotifyCtx()};
2020     }
2021
2022     std::pair<AddStatus, VBNotifyCtx> rv = {AddStatus::Success, VBNotifyCtx()};
2023
2024     if (v) {
2025         if (v->isTempInitialItem() && eviction == FULL_EVICTION &&
2026             maybeKeyExists) {
2027             // Need to figure out if an item exists on disk
2028             return {AddStatus::BgFetch, VBNotifyCtx()};
2029         }
2030
2031         rv.first = (v->isDeleted() || v->isExpired(ep_real_time()))
2032                            ? AddStatus::UnDel
2033                            : AddStatus::Success;
2034
2035         if (v->isTempDeletedItem()) {
2036             itm.setRevSeqno(v->getRevSeqno() + 1);
2037         } else {
2038             itm.setRevSeqno(ht.getMaxDeletedRevSeqno() + 1);
2039         }
2040
2041         if (!v->isTempItem()) {
2042             itm.setRevSeqno(v->getRevSeqno() + 1);
2043         }
2044
2045         std::tie(v, std::ignore, rv.second) =
2046                 updateStoredValue(hbl, *v, itm, queueItmCtx);
2047     } else {
2048         if (itm.getBySeqno() != StoredValue::state_temp_init) {
2049             if (eviction == FULL_EVICTION && maybeKeyExists) {
2050                 return {AddStatus::AddTmpAndBgFetch, VBNotifyCtx()};
2051             }
2052         }
2053         std::tie(v, rv.second) = addNewStoredValue(hbl, itm, queueItmCtx);
2054         updateRevSeqNoOfNewStoredValue(*v);
2055         itm.setRevSeqno(v->getRevSeqno());
2056         if (v->isTempItem()) {
2057             rv.first = AddStatus::BgFetch;
2058         }
2059     }
2060
2061     if (v->isTempItem()) {
2062         v->setNRUValue(MAX_NRU_VALUE);
2063     }
2064     return rv;
2065 }
2066
2067 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2068 VBucket::processSoftDelete(const HashTable::HashBucketLock& hbl,
2069                            StoredValue& v,
2070                            uint64_t cas,
2071                            const ItemMetaData& metadata,
2072                            const VBQueueItemCtx& queueItmCtx,
2073                            bool use_meta,
2074                            uint64_t bySeqno) {
2075     if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2076         return std::make_tuple(MutationStatus::NeedBgFetch, &v, VBNotifyCtx());
2077     }
2078
2079     if (v.isLocked(ep_current_time())) {
2080         if (cas != v.getCas()) {
2081             return std::make_tuple(MutationStatus::IsLocked, &v, VBNotifyCtx());
2082         }
2083         v.unlock();
2084     }
2085
2086     if (cas != 0 && cas != v.getCas()) {
2087         return std::make_tuple(MutationStatus::InvalidCas, &v, VBNotifyCtx());
2088     }
2089
2090     /* allow operation */
2091     v.unlock();
2092
2093     MutationStatus rv =
2094             v.isDirty() ? MutationStatus::WasDirty : MutationStatus::WasClean;
2095
2096     if (use_meta) {
2097         v.setCas(metadata.cas);
2098         v.setFlags(metadata.flags);
2099         v.setExptime(metadata.exptime);
2100     }
2101
2102     v.setRevSeqno(metadata.revSeqno);
2103     VBNotifyCtx notifyCtx;
2104     StoredValue* newSv;
2105     std::tie(newSv, notifyCtx) =
2106             softDeleteStoredValue(hbl,
2107                                   v,
2108                                   /*onlyMarkDeleted*/ false,
2109                                   queueItmCtx,
2110                                   bySeqno);
2111     ht.updateMaxDeletedRevSeqno(metadata.revSeqno);
2112     return std::make_tuple(rv, newSv, notifyCtx);
2113 }
2114
2115 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2116 VBucket::processExpiredItem(const HashTable::HashBucketLock& hbl,
2117                             StoredValue& v) {
2118     if (!hbl.getHTLock()) {
2119         throw std::invalid_argument(
2120                 "VBucket::processExpiredItem: htLock not held for VBucket " +
2121                 std::to_string(getId()));
2122     }
2123
2124     if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2125         return std::make_tuple(MutationStatus::NeedBgFetch,
2126                                &v,
2127                                queueDirty(v,
2128                                           GenerateBySeqno::Yes,
2129                                           GenerateCas::Yes,
2130                                           /*isBackfillItem*/ false));
2131     }
2132
2133     /* If the datatype is XATTR, mark the item as deleted
2134      * but don't delete the value as system xattrs can
2135      * still be queried by mobile clients even after
2136      * deletion.
2137      * TODO: The current implementation is inefficient
2138      * but functionally correct and for performance reasons
2139      * only the system xattrs need to be stored.
2140      */
2141     value_t value = v.getValue();
2142     bool onlyMarkDeleted =
2143             value && mcbp::datatype::is_xattr(value->getDataType());
2144     v.setRevSeqno(v.getRevSeqno() + 1);
2145     VBNotifyCtx notifyCtx;
2146     StoredValue* newSv;
2147     std::tie(newSv, notifyCtx) =
2148             softDeleteStoredValue(hbl,
2149                                   v,
2150                                   onlyMarkDeleted,
2151                                   VBQueueItemCtx(GenerateBySeqno::Yes,
2152                                                  GenerateCas::Yes,
2153                                                  TrackCasDrift::No,
2154                                                  /*isBackfillItem*/ false,
2155                                                  nullptr /* no pre link */),
2156                                   v.getBySeqno());
2157     ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno() + 1);
2158     return std::make_tuple(MutationStatus::NotFound, newSv, notifyCtx);
2159 }
2160
2161 bool VBucket::deleteStoredValue(const HashTable::HashBucketLock& hbl,
2162                                 StoredValue& v) {
2163     if (!v.isDeleted() && v.isLocked(ep_current_time())) {
2164         return false;
2165     }
2166
2167     /* StoredValue deleted here. If any other in-memory data structures are
2168        using the StoredValue intrusively then they must have handled the delete
2169        by this point */
2170     ht.unlocked_del(hbl, v.getKey());
2171     return true;
2172 }
2173
2174 AddStatus VBucket::addTempStoredValue(const HashTable::HashBucketLock& hbl,
2175                                       const DocKey& key,
2176                                       bool isReplication) {
2177     uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_RAW_BYTES};
2178     static_assert(sizeof(ext_meta) == 1,
2179                   "VBucket::addTempStoredValue(): expected "
2180                   "EXT_META_LEN to be 1");
2181     Item itm(key,
2182              /*flags*/ 0,
2183              /*exp*/ 0,
2184              /*data*/ NULL,
2185              /*size*/ 0,
2186              ext_meta,
2187              sizeof(ext_meta),
2188              0,
2189              StoredValue::state_temp_init);
2190
2191     /* if a temp item for a possibly deleted, set it non-resident by resetting
2192        the value cuz normally a new item added is considered resident which
2193        does not apply for temp item. */
2194     StoredValue* v = nullptr;
2195     return processAdd(hbl, v, itm, true, isReplication, nullptr).first;
2196 }
2197
2198 void VBucket::notifyNewSeqno(const VBNotifyCtx& notifyCtx) {
2199     if (newSeqnoCb) {
2200         newSeqnoCb->callback(getId(), notifyCtx);
2201     }
2202 }
2203
2204 /*
2205  * Queue the item to the checkpoint and return the seqno the item was
2206  * allocated.
2207  */
2208 int64_t VBucket::queueItem(Item* item, OptionalSeqno seqno) {
2209     item->setVBucketId(id);
2210     queued_item qi(item);
2211     checkpointManager.queueDirty(
2212             *this,
2213             qi,
2214             seqno ? GenerateBySeqno::No : GenerateBySeqno::Yes,
2215             GenerateCas::Yes,
2216             nullptr /* No pre link step as this is for system events */);
2217     VBNotifyCtx notifyCtx;
2218     // If the seqno is initialized, skip replication notification
2219     notifyCtx.notifyReplication = !seqno.is_initialized();
2220     notifyCtx.notifyFlusher = true;
2221     notifyCtx.bySeqno = qi->getBySeqno();
2222     notifyNewSeqno(notifyCtx);
2223     return qi->getBySeqno();
2224 }
2225
2226 VBNotifyCtx VBucket::queueDirty(StoredValue& v,
2227                                 const VBQueueItemCtx& queueItmCtx) {
2228     if (queueItmCtx.trackCasDrift == TrackCasDrift::Yes) {
2229         setMaxCasAndTrackDrift(v.getCas());
2230     }
2231     return queueDirty(v,
2232                       queueItmCtx.genBySeqno,
2233                       queueItmCtx.genCas,
2234                       queueItmCtx.isBackfillItem,
2235                       queueItmCtx.preLinkDocumentContext);
2236 }
2237
2238 void VBucket::updateRevSeqNoOfNewStoredValue(StoredValue& v) {
2239     /**
2240      * Possibly, this item is being recreated. Conservatively assign it
2241      * a seqno that is greater than the greatest seqno of all deleted
2242      * items seen so far.
2243      */
2244     uint64_t seqno = ht.getMaxDeletedRevSeqno();
2245     if (!v.isTempItem()) {
2246         ++seqno;
2247     }
2248     v.setRevSeqno(seqno);
2249 }
2250
2251 void VBucket::addHighPriorityVBEntry(uint64_t seqnoOrChkId,
2252                                      const void* cookie,
2253                                      HighPriorityVBNotify reqType) {
2254     std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2255     hpVBReqs.push_back(HighPriorityVBEntry(cookie, seqnoOrChkId, reqType));
2256     numHpVBReqs.store(hpVBReqs.size());
2257
2258     LOG(EXTENSION_LOG_NOTICE,
2259         "Added high priority async request %s "
2260         "for vb:%" PRIu16 ", Check for:%" PRIu64 ", "
2261         "Persisted upto:%" PRIu64 ", cookie:%p",
2262         to_string(reqType).c_str(),
2263         getId(),
2264         seqnoOrChkId,
2265         getPersistenceSeqno(),
2266         cookie);
2267 }
2268
2269 std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifications(
2270         EventuallyPersistentEngine& engine,
2271         uint64_t idNum,
2272         HighPriorityVBNotify notifyType) {
2273     std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2274     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2275
2276     auto entry = hpVBReqs.begin();
2277
2278     while (entry != hpVBReqs.end()) {
2279         if (notifyType != entry->reqType) {
2280             ++entry;
2281             continue;
2282         }
2283
2284         std::string logStr(to_string(notifyType));
2285
2286         hrtime_t wall_time(gethrtime() - entry->start);
2287         size_t spent = wall_time / 1000000000;
2288         if (entry->id <= idNum) {
2289             toNotify[entry->cookie] = ENGINE_SUCCESS;
2290             stats.chkPersistenceHisto.add(wall_time / 1000);
2291             adjustCheckpointFlushTimeout(wall_time / 1000000000);
2292             LOG(EXTENSION_LOG_NOTICE,
2293                 "Notified the completion of %s "
2294                 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2295                 ", "
2296                 "Persisted upto: %" PRIu64 ", cookie %p",
2297                 logStr.c_str(),
2298                 getId(),
2299                 entry->id,
2300                 idNum,
2301                 entry->cookie);
2302             entry = hpVBReqs.erase(entry);
2303         } else if (spent > getCheckpointFlushTimeout()) {
2304             adjustCheckpointFlushTimeout(spent);
2305             engine.storeEngineSpecific(entry->cookie, NULL);
2306             toNotify[entry->cookie] = ENGINE_TMPFAIL;
2307             LOG(EXTENSION_LOG_WARNING,
2308                 "Notified the timeout on %s "
2309                 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2310                 ", "
2311                 "Persisted upto: %" PRIu64 ", cookie %p",
2312                 logStr.c_str(),
2313                 getId(),
2314                 entry->id,
2315                 idNum,
2316                 entry->cookie);
2317             entry = hpVBReqs.erase(entry);
2318         } else {
2319             ++entry;
2320         }
2321     }
2322     numHpVBReqs.store(hpVBReqs.size());
2323     return toNotify;
2324 }
2325
2326 std::map<const void*, ENGINE_ERROR_CODE> VBucket::tmpFailAndGetAllHpNotifies(
2327         EventuallyPersistentEngine& engine) {
2328     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2329
2330     LockHolder lh(hpVBReqsMutex);
2331
2332     for (auto& entry : hpVBReqs) {
2333         toNotify[entry.cookie] = ENGINE_TMPFAIL;
2334         engine.storeEngineSpecific(entry.cookie, NULL);
2335     }
2336     hpVBReqs.clear();
2337
2338     return toNotify;
2339 }
2340
2341 void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
2342     size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
2343
2344     if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
2345         chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
2346     } else if (wall_time <= middle) {
2347         chkFlushTimeout = middle;
2348     } else {
2349         chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
2350     }
2351 }
2352
2353 size_t VBucket::getCheckpointFlushTimeout() {
2354     return chkFlushTimeout;
2355 }