Decouple StoredValue and HashTable class
[ep-engine.git] / src / vbucket.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <functional>
21 #include <list>
22 #include <set>
23 #include <string>
24 #include <vector>
25
26 #include "atomic.h"
27 #include "bgfetcher.h"
28 #include "conflict_resolution.h"
29 #include "ep_engine.h"
30 #include "ep_types.h"
31 #include "failover-table.h"
32 #include "flusher.h"
33 #include "pre_link_document_context.h"
34 #include "vbucketdeletiontask.h"
35
36 #define STATWRITER_NAMESPACE vbucket
37 #include "statwriter.h"
38 #undef STATWRITER_NAMESPACE
39 #include "stored_value_factories.h"
40
41 #include "vbucket.h"
42
43 #include <xattr/blob.h>
44 #include <xattr/utils.h>
45
46 /* Macros */
47 const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
48 const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
49
50 /* Statics definitions */
51 std::atomic<size_t> VBucket::chkFlushTimeout(MIN_CHK_FLUSH_TIMEOUT);
52
53 VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
54     std::vector<uint16_t> tmp(acceptable.size() + other.size());
55     std::vector<uint16_t>::iterator end;
56     end = std::set_symmetric_difference(acceptable.begin(),
57                                         acceptable.end(),
58                                         other.acceptable.begin(),
59                                         other.acceptable.end(),
60                                         tmp.begin());
61     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
62 }
63
64 VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
65                                                                         const {
66     std::vector<uint16_t> tmp(acceptable.size() + other.size());
67     std::vector<uint16_t>::iterator end;
68
69     end = std::set_intersection(acceptable.begin(), acceptable.end(),
70                                 other.acceptable.begin(),
71                                 other.acceptable.end(),
72                                 tmp.begin());
73     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
74 }
75
76 static bool isRange(std::set<uint16_t>::const_iterator it,
77                     const std::set<uint16_t>::const_iterator &end,
78                     size_t &length)
79 {
80     length = 0;
81     for (uint16_t val = *it;
82          it != end && (val + length) == *it;
83          ++it, ++length) {
84         // empty
85     }
86
87     --length;
88
89     return length > 1;
90 }
91
92 std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
93 {
94     std::set<uint16_t>::const_iterator it;
95
96     if (filter.acceptable.empty()) {
97         out << "{ empty }";
98     } else {
99         bool needcomma = false;
100         out << "{ ";
101         for (it = filter.acceptable.begin();
102              it != filter.acceptable.end();
103              ++it) {
104             if (needcomma) {
105                 out << ", ";
106             }
107
108             size_t length;
109             if (isRange(it, filter.acceptable.end(), length)) {
110                 std::set<uint16_t>::iterator last = it;
111                 for (size_t i = 0; i < length; ++i) {
112                     ++last;
113                 }
114                 out << "[" << *it << "," << *last << "]";
115                 it = last;
116             } else {
117                 out << *it;
118             }
119             needcomma = true;
120         }
121         out << " }";
122     }
123
124     return out;
125 }
126
127 const vbucket_state_t VBucket::ACTIVE =
128                      static_cast<vbucket_state_t>(htonl(vbucket_state_active));
129 const vbucket_state_t VBucket::REPLICA =
130                     static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
131 const vbucket_state_t VBucket::PENDING =
132                     static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
133 const vbucket_state_t VBucket::DEAD =
134                     static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
135
136 VBucket::VBucket(id_type i,
137                  vbucket_state_t newState,
138                  EPStats& st,
139                  CheckpointConfig& chkConfig,
140                  int64_t lastSeqno,
141                  uint64_t lastSnapStart,
142                  uint64_t lastSnapEnd,
143                  std::unique_ptr<FailoverTable> table,
144                  std::shared_ptr<Callback<id_type> > flusherCb,
145                  std::unique_ptr<AbstractStoredValueFactory> valFact,
146                  NewSeqnoCallback newSeqnoCb,
147                  Configuration& config,
148                  item_eviction_policy_t evictionPolicy,
149                  vbucket_state_t initState,
150                  uint64_t purgeSeqno,
151                  uint64_t maxCas,
152                  const std::string& collectionsManifest)
153     : ht(st, std::move(valFact), config.getHtSize(), config.getHtLocks()),
154       checkpointManager(st,
155                         i,
156                         chkConfig,
157                         lastSeqno,
158                         lastSnapStart,
159                         lastSnapEnd,
160                         flusherCb),
161       failovers(std::move(table)),
162       opsCreate(0),
163       opsUpdate(0),
164       opsDelete(0),
165       opsReject(0),
166       dirtyQueueSize(0),
167       dirtyQueueMem(0),
168       dirtyQueueFill(0),
169       dirtyQueueDrain(0),
170       dirtyQueueAge(0),
171       dirtyQueuePendingWrites(0),
172       metaDataDisk(0),
173       numExpiredItems(0),
174       eviction(evictionPolicy),
175       stats(st),
176       persistenceSeqno(0),
177       numHpVBReqs(0),
178       id(i),
179       state(newState),
180       initialState(initState),
181       purge_seqno(purgeSeqno),
182       takeover_backed_up(false),
183       persisted_snapshot_start(lastSnapStart),
184       persisted_snapshot_end(lastSnapEnd),
185       rollbackItemCount(0),
186       hlc(maxCas,
187           std::chrono::microseconds(config.getHlcDriftAheadThresholdUs()),
188           std::chrono::microseconds(config.getHlcDriftBehindThresholdUs())),
189       statPrefix("vb_" + std::to_string(i)),
190       persistenceCheckpointId(0),
191       bucketCreation(false),
192       deferredDeletion(false),
193       deferredDeletionCookie(nullptr),
194       newSeqnoCb(std::move(newSeqnoCb)),
195       manifest(collectionsManifest) {
196     if (config.getConflictResolutionType().compare("lww") == 0) {
197         conflictResolver.reset(new LastWriteWinsResolution());
198     } else {
199         conflictResolver.reset(new RevisionSeqnoResolution());
200     }
201
202     backfill.isBackfillPhase = false;
203     pendingOpsStart = 0;
204     stats.memOverhead->fetch_add(sizeof(VBucket)
205                                 + ht.memorySize() + sizeof(CheckpointManager));
206     LOG(EXTENSION_LOG_NOTICE,
207         "VBucket: created vbucket:%" PRIu16 " with state:%s "
208                 "initialState:%s "
209                 "lastSeqno:%" PRIu64 " "
210                 "lastSnapshot:{%" PRIu64 ",%" PRIu64 "} "
211                 "persisted_snapshot:{%" PRIu64 ",%" PRIu64 "} "
212                 "max_cas:%" PRIu64,
213         id, VBucket::toString(state), VBucket::toString(initialState),
214         lastSeqno, lastSnapStart, lastSnapEnd,
215         persisted_snapshot_start, persisted_snapshot_end,
216         getMaxCas());
217 }
218
219 VBucket::~VBucket() {
220     if (!pendingOps.empty()) {
221         LOG(EXTENSION_LOG_WARNING,
222             "~Vbucket(): vbucket:%" PRIu16 " has %ld pending ops",
223             id,
224             pendingOps.size());
225     }
226
227     stats.diskQueueSize.fetch_sub(dirtyQueueSize.load());
228     stats.vbBackfillQueueSize.fetch_sub(getBackfillSize());
229
230     // Clear out the bloomfilter(s)
231     clearFilter();
232
233     stats.memOverhead->fetch_sub(sizeof(VBucket) + ht.memorySize() +
234                                 sizeof(CheckpointManager));
235
236     LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
237 }
238
239 void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
240                          ENGINE_ERROR_CODE code) {
241     std::unique_lock<std::mutex> lh(pendingOpLock);
242
243     if (pendingOpsStart > 0) {
244         hrtime_t now = gethrtime();
245         if (now > pendingOpsStart) {
246             hrtime_t d = (now - pendingOpsStart) / 1000;
247             stats.pendingOpsHisto.add(d);
248             atomic_setIfBigger(stats.pendingOpsMaxDuration, d);
249         }
250     } else {
251         return;
252     }
253
254     pendingOpsStart = 0;
255     stats.pendingOps.fetch_sub(pendingOps.size());
256     atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
257
258     while (!pendingOps.empty()) {
259         const void *pendingOperation = pendingOps.back();
260         pendingOps.pop_back();
261         // We don't want to hold the pendingOpLock when
262         // calling notifyIOComplete.
263         lh.unlock();
264         engine.notifyIOComplete(pendingOperation, code);
265         lh.lock();
266     }
267
268     LOG(EXTENSION_LOG_INFO,
269         "Fired pendings ops for vbucket %" PRIu16 " in state %s\n",
270         id, VBucket::toString(state));
271 }
272
273 void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
274
275     if (state == vbucket_state_active) {
276         fireAllOps(engine, ENGINE_SUCCESS);
277     } else if (state == vbucket_state_pending) {
278         // Nothing
279     } else {
280         fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
281     }
282 }
283
284 void VBucket::setState(vbucket_state_t to) {
285     vbucket_state_t oldstate;
286     {
287         WriterLockHolder wlh(stateLock);
288         oldstate = state;
289
290         if (to == vbucket_state_active &&
291             checkpointManager.getOpenCheckpointId() < 2) {
292             checkpointManager.setOpenCheckpointId(2);
293         }
294
295         LOG(EXTENSION_LOG_NOTICE,
296             "VBucket::setState: transitioning vbucket:%" PRIu16 " from:%s to:%s",
297             id, VBucket::toString(oldstate), VBucket::toString(to));
298
299         state = to;
300     }
301 }
302
303 vbucket_state VBucket::getVBucketState() const {
304      auto persisted_range = getPersistedSnapshot();
305
306      return vbucket_state{getState(),
307                           getPersistenceCheckpointId(), 0, getHighSeqno(),
308                           getPurgeSeqno(),
309                           persisted_range.start, persisted_range.end,
310                           getMaxCas(), failovers->toJSON()};
311 }
312
313
314
315 void VBucket::doStatsForQueueing(const Item& qi, size_t itemBytes)
316 {
317     ++dirtyQueueSize;
318     dirtyQueueMem.fetch_add(sizeof(Item));
319     ++dirtyQueueFill;
320     dirtyQueueAge.fetch_add(qi.getQueuedTime());
321     dirtyQueuePendingWrites.fetch_add(itemBytes);
322 }
323
324 void VBucket::doStatsForFlushing(const Item& qi, size_t itemBytes) {
325     --dirtyQueueSize;
326     decrDirtyQueueMem(sizeof(Item));
327     ++dirtyQueueDrain;
328     decrDirtyQueueAge(qi.getQueuedTime());
329     decrDirtyQueuePendingWrites(itemBytes);
330 }
331
332 void VBucket::incrMetaDataDisk(const Item& qi) {
333     metaDataDisk.fetch_add(qi.getKey().size() + sizeof(ItemMetaData));
334 }
335
336 void VBucket::decrMetaDataDisk(const Item& qi) {
337     // assume couchstore remove approx this much data from disk
338     metaDataDisk.fetch_sub((qi.getKey().size() + sizeof(ItemMetaData)));
339 }
340
341 void VBucket::resetStats() {
342     opsCreate.store(0);
343     opsUpdate.store(0);
344     opsDelete.store(0);
345     opsReject.store(0);
346
347     stats.diskQueueSize.fetch_sub(dirtyQueueSize.exchange(0));
348     dirtyQueueMem.store(0);
349     dirtyQueueFill.store(0);
350     dirtyQueueAge.store(0);
351     dirtyQueuePendingWrites.store(0);
352     dirtyQueueDrain.store(0);
353
354     hlc.resetStats();
355 }
356
357 template <typename T>
358 void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
359                       const void *c) {
360     std::string stat = statPrefix;
361     if (nm != NULL) {
362         add_prefixed_stat(statPrefix, nm, val, add_stat, c);
363     } else {
364         add_casted_stat(statPrefix.data(), val, add_stat, c);
365     }
366 }
367
368 void VBucket::handlePreExpiry(StoredValue& v) {
369     value_t value = v.getValue();
370     if (value) {
371         std::unique_ptr<Item> itm(v.toItem(false, id));
372         item_info itm_info;
373         EventuallyPersistentEngine* engine = ObjectRegistry::getCurrentEngine();
374         itm_info = itm->toItemInfo(failovers->getLatestUUID());
375         value_t new_val(Blob::Copy(*value));
376         itm->setValue(new_val);
377
378         SERVER_HANDLE_V1* sapi = engine->getServerApi();
379         /* TODO: In order to minimize allocations, the callback needs to
380          * allocate an item whose value size will be exactly the size of the
381          * value after pre-expiry is performed.
382          */
383         if (sapi->document->pre_expiry(itm_info)) {
384             char* extMeta = const_cast<char *>(v.getValue()->getExtMeta());
385             Item new_item(v.getKey(), v.getFlags(), v.getExptime(),
386                           itm_info.value[0].iov_base, itm_info.value[0].iov_len,
387                           reinterpret_cast<uint8_t*>(extMeta),
388                           v.getValue()->getExtLen(), v.getCas(),
389                           v.getBySeqno(), id, v.getRevSeqno(),
390                           v.getNRUValue());
391
392             new_item.setDeleted();
393             ht.setValue(new_item, v);
394         }
395     }
396 }
397
398 size_t VBucket::getNumNonResidentItems() const {
399     if (eviction == VALUE_ONLY) {
400         return ht.getNumInMemoryNonResItems();
401     } else {
402         size_t num_items = ht.getNumItems();
403         size_t num_res_items = ht.getNumInMemoryItems() -
404                                ht.getNumInMemoryNonResItems();
405         return num_items > num_res_items ? (num_items - num_res_items) : 0;
406     }
407 }
408
409
410 uint64_t VBucket::getPersistenceCheckpointId() const {
411     return persistenceCheckpointId.load();
412 }
413
414 void VBucket::setPersistenceCheckpointId(uint64_t checkpointId) {
415     persistenceCheckpointId.store(checkpointId);
416 }
417
418 void VBucket::markDirty(const DocKey& key) {
419     auto hbl = ht.getLockedBucket(key);
420     StoredValue* v = ht.unlocked_find(
421             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::Yes);
422     if (v) {
423         v->markDirty();
424     } else {
425         LOG(EXTENSION_LOG_WARNING, "markDirty: Error marking dirty, a key "
426             "is missing from vb:%" PRIu16, id);
427     }
428 }
429
430 bool VBucket::isResidentRatioUnderThreshold(float threshold) {
431     if (eviction != FULL_EVICTION) {
432         throw std::invalid_argument("VBucket::isResidentRatioUnderThreshold: "
433                 "policy (which is " + std::to_string(eviction) +
434                 ") must be FULL_EVICTION");
435     }
436     size_t num_items = getNumItems();
437     size_t num_non_resident_items = getNumNonResidentItems();
438     if (threshold >= ((float)(num_items - num_non_resident_items) /
439                                                                 num_items)) {
440         return true;
441     } else {
442         return false;
443     }
444 }
445
446 void VBucket::createFilter(size_t key_count, double probability) {
447     // Create the actual bloom filter upon vbucket creation during
448     // scenarios:
449     //      - Bucket creation
450     //      - Rebalance
451     LockHolder lh(bfMutex);
452     if (bFilter == nullptr && tempFilter == nullptr) {
453         bFilter = std::make_unique<BloomFilter>(key_count, probability,
454                                         BFILTER_ENABLED);
455     } else {
456         LOG(EXTENSION_LOG_WARNING, "(vb %" PRIu16 ") Bloom filter / Temp filter"
457             " already exist!", id);
458     }
459 }
460
461 void VBucket::initTempFilter(size_t key_count, double probability) {
462     // Create a temp bloom filter with status as COMPACTING,
463     // if the main filter is found to exist, set its state to
464     // COMPACTING as well.
465     LockHolder lh(bfMutex);
466     tempFilter = std::make_unique<BloomFilter>(key_count, probability,
467                                      BFILTER_COMPACTING);
468     if (bFilter) {
469         bFilter->setStatus(BFILTER_COMPACTING);
470     }
471 }
472
473 void VBucket::addToFilter(const DocKey& key) {
474     LockHolder lh(bfMutex);
475     if (bFilter) {
476         bFilter->addKey(key);
477     }
478
479     // If the temp bloom filter is not found to be NULL,
480     // it means that compaction is running on the particular
481     // vbucket. Therefore add the key to the temp filter as
482     // well, as once compaction completes the temp filter
483     // will replace the main bloom filter.
484     if (tempFilter) {
485         tempFilter->addKey(key);
486     }
487 }
488
489 bool VBucket::maybeKeyExistsInFilter(const DocKey& key) {
490     LockHolder lh(bfMutex);
491     if (bFilter) {
492         return bFilter->maybeKeyExists(key);
493     } else {
494         // If filter doesn't exist, allow the BgFetch to go through.
495         return true;
496     }
497 }
498
499 bool VBucket::isTempFilterAvailable() {
500     LockHolder lh(bfMutex);
501     if (tempFilter &&
502         (tempFilter->getStatus() == BFILTER_COMPACTING ||
503          tempFilter->getStatus() == BFILTER_ENABLED)) {
504         return true;
505     } else {
506         return false;
507     }
508 }
509
510 void VBucket::addToTempFilter(const DocKey& key) {
511     // Keys will be added to only the temp filter during
512     // compaction.
513     LockHolder lh(bfMutex);
514     if (tempFilter) {
515         tempFilter->addKey(key);
516     }
517 }
518
519 void VBucket::swapFilter() {
520     // Delete the main bloom filter and replace it with
521     // the temp filter that was populated during compaction,
522     // only if the temp filter's state is found to be either at
523     // COMPACTING or ENABLED (if in the case the user enables
524     // bloomfilters for some reason while compaction was running).
525     // Otherwise, it indicates that the filter's state was
526     // possibly disabled during compaction, therefore clear out
527     // the temp filter. If it gets enabled at some point, a new
528     // bloom filter will be made available after the next
529     // compaction.
530
531     LockHolder lh(bfMutex);
532     if (tempFilter) {
533         bFilter.reset();
534
535         if (tempFilter->getStatus() == BFILTER_COMPACTING ||
536              tempFilter->getStatus() == BFILTER_ENABLED) {
537             bFilter = std::move(tempFilter);
538             bFilter->setStatus(BFILTER_ENABLED);
539         }
540         tempFilter.reset();
541     }
542 }
543
544 void VBucket::clearFilter() {
545     LockHolder lh(bfMutex);
546     bFilter.reset();
547     tempFilter.reset();
548 }
549
550 void VBucket::setFilterStatus(bfilter_status_t to) {
551     LockHolder lh(bfMutex);
552     if (bFilter) {
553         bFilter->setStatus(to);
554     }
555     if (tempFilter) {
556         tempFilter->setStatus(to);
557     }
558 }
559
560 std::string VBucket::getFilterStatusString() {
561     LockHolder lh(bfMutex);
562     if (bFilter) {
563         return bFilter->getStatusString();
564     } else if (tempFilter) {
565         return tempFilter->getStatusString();
566     } else {
567         return "DOESN'T EXIST";
568     }
569 }
570
571 size_t VBucket::getFilterSize() {
572     LockHolder lh(bfMutex);
573     if (bFilter) {
574         return bFilter->getFilterSize();
575     } else {
576         return 0;
577     }
578 }
579
580 size_t VBucket::getNumOfKeysInFilter() {
581     LockHolder lh(bfMutex);
582     if (bFilter) {
583         return bFilter->getNumOfKeysInFilter();
584     } else {
585         return 0;
586     }
587 }
588
589 VBNotifyCtx VBucket::queueDirty(
590         StoredValue& v,
591         const GenerateBySeqno generateBySeqno,
592         const GenerateCas generateCas,
593         const bool isBackfillItem,
594         PreLinkDocumentContext* preLinkDocumentContext) {
595     VBNotifyCtx notifyCtx;
596
597     queued_item qi(v.toItem(false, getId()));
598
599     if (isBackfillItem) {
600         queueBackfillItem(qi, generateBySeqno);
601         notifyCtx.notifyFlusher = true;
602         /* During backfill on a TAP receiver we need to update the snapshot
603          range in the checkpoint. Has to be done here because in case of TAP
604          backfill, above, we use vb.queueBackfillItem() instead of
605          vb.checkpointManager.queueDirty() */
606         if (generateBySeqno == GenerateBySeqno::Yes) {
607             checkpointManager.resetSnapshotRange();
608         }
609     } else {
610         notifyCtx.notifyFlusher =
611                 checkpointManager.queueDirty(*this,
612                                              qi,
613                                              generateBySeqno,
614                                              generateCas,
615                                              preLinkDocumentContext);
616         notifyCtx.notifyReplication = true;
617         if (GenerateCas::Yes == generateCas) {
618             v.setCas(qi->getCas());
619         }
620     }
621
622     v.setBySeqno(qi->getBySeqno());
623     notifyCtx.bySeqno = qi->getBySeqno();
624
625     return notifyCtx;
626 }
627
628 StoredValue* VBucket::fetchValidValue(HashTable::HashBucketLock& hbl,
629                                       const DocKey& key,
630                                       WantsDeleted wantsDeleted,
631                                       TrackReference trackReference,
632                                       QueueExpired queueExpired) {
633     if (!hbl.getHTLock()) {
634         throw std::logic_error(
635                 "Hash bucket lock not held in "
636                 "VBucket::fetchValidValue() for hash bucket: " +
637                 std::to_string(hbl.getBucketNum()) + "for key: " +
638                 std::string(reinterpret_cast<const char*>(key.data()),
639                             key.size()));
640     }
641     StoredValue* v = ht.unlocked_find(
642             key, hbl.getBucketNum(), wantsDeleted, trackReference);
643     if (v && !v->isDeleted() && !v->isTempItem()) {
644         // In the deleted case, we ignore expiration time.
645         if (v->isExpired(ep_real_time())) {
646             if (getState() != vbucket_state_active) {
647                 return wantsDeleted == WantsDeleted::Yes ? v : NULL;
648             }
649
650             // queueDirty only allowed on active VB
651             if (queueExpired == QueueExpired::Yes &&
652                 getState() == vbucket_state_active) {
653                 incExpirationStat(ExpireBy::Access);
654                 handlePreExpiry(*v);
655                 VBNotifyCtx notifyCtx;
656                 std::tie(std::ignore, v, notifyCtx) =
657                         processExpiredItem(hbl, *v);
658                 notifyNewSeqno(notifyCtx);
659             }
660             return wantsDeleted == WantsDeleted::Yes ? v : NULL;
661         }
662     }
663     return v;
664 }
665
666 void VBucket::incExpirationStat(const ExpireBy source) {
667     switch (source) {
668     case ExpireBy::Pager:
669         ++stats.expired_pager;
670         break;
671     case ExpireBy::Compactor:
672         ++stats.expired_compactor;
673         break;
674     case ExpireBy::Access:
675         ++stats.expired_access;
676         break;
677     }
678     ++numExpiredItems;
679 }
680
681 MutationStatus VBucket::setFromInternal(Item& itm) {
682     return ht.set(itm);
683 }
684
685 ENGINE_ERROR_CODE VBucket::set(Item& itm,
686                                const void* cookie,
687                                EventuallyPersistentEngine& engine,
688                                const int bgFetchDelay) {
689     bool cas_op = (itm.getCas() != 0);
690     auto hbl = ht.getLockedBucket(itm.getKey());
691     StoredValue* v = ht.unlocked_find(itm.getKey(),
692                                       hbl.getBucketNum(),
693                                       WantsDeleted::Yes,
694                                       TrackReference::No);
695     if (v && v->isLocked(ep_current_time()) &&
696         (getState() == vbucket_state_replica ||
697          getState() == vbucket_state_pending)) {
698         v->unlock();
699     }
700
701     bool maybeKeyExists = true;
702     // If we didn't find a valid item, check Bloomfilter's prediction if in
703     // full eviction policy and for a CAS operation.
704     if ((v == nullptr || v->isTempInitialItem()) &&
705         (eviction == FULL_EVICTION) && (itm.getCas() != 0)) {
706         // Check Bloomfilter's prediction
707         if (!maybeKeyExistsInFilter(itm.getKey())) {
708             maybeKeyExists = false;
709         }
710     }
711
712     PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
713     VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
714                                GenerateCas::Yes,
715                                TrackCasDrift::No,
716                                /*isBackfillItem*/ false,
717                                &preLinkDocumentContext);
718
719     MutationStatus status;
720     VBNotifyCtx notifyCtx;
721     std::tie(status, notifyCtx) = processSet(hbl,
722                                              v,
723                                              itm,
724                                              itm.getCas(),
725                                              /*allowExisting*/ true,
726                                              /*hashMetaData*/ false,
727                                              &queueItmCtx,
728                                              maybeKeyExists);
729
730     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
731     switch (status) {
732     case MutationStatus::NoMem:
733         ret = ENGINE_ENOMEM;
734         break;
735     case MutationStatus::InvalidCas:
736         ret = ENGINE_KEY_EEXISTS;
737         break;
738     case MutationStatus::IsLocked:
739         ret = ENGINE_LOCKED;
740         break;
741     case MutationStatus::NotFound:
742         if (cas_op) {
743             ret = ENGINE_KEY_ENOENT;
744             break;
745         }
746     // FALLTHROUGH
747     case MutationStatus::WasDirty:
748     // Even if the item was dirty, push it into the vbucket's open
749     // checkpoint.
750     case MutationStatus::WasClean:
751         notifyNewSeqno(notifyCtx);
752
753         itm.setBySeqno(v->getBySeqno());
754         itm.setCas(v->getCas());
755         break;
756     case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
757         // +
758         // full eviction.
759         if (v) {
760             // temp item is already created. Simply schedule a bg fetch job
761             hbl.getHTLock().unlock();
762             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
763             return ENGINE_EWOULDBLOCK;
764         }
765         ret = addTempItemAndBGFetch(
766                 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
767         break;
768     }
769     }
770
771     return ret;
772 }
773
774 ENGINE_ERROR_CODE VBucket::replace(Item& itm,
775                                    const void* cookie,
776                                    EventuallyPersistentEngine& engine,
777                                    const int bgFetchDelay) {
778     auto hbl = ht.getLockedBucket(itm.getKey());
779     StoredValue* v = ht.unlocked_find(itm.getKey(),
780                                       hbl.getBucketNum(),
781                                       WantsDeleted::Yes,
782                                       TrackReference::No);
783     if (v) {
784         if (v->isDeleted() || v->isTempDeletedItem() ||
785             v->isTempNonExistentItem()) {
786             return ENGINE_KEY_ENOENT;
787         }
788
789         MutationStatus mtype;
790         VBNotifyCtx notifyCtx;
791         if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
792             mtype = MutationStatus::NeedBgFetch;
793         } else {
794             PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
795             VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
796                                        GenerateCas::Yes,
797                                        TrackCasDrift::No,
798                                        /*isBackfillItem*/ false,
799                                        &preLinkDocumentContext);
800             std::tie(mtype, notifyCtx) = processSet(hbl,
801                                                     v,
802                                                     itm,
803                                                     0,
804                                                     /*allowExisting*/ true,
805                                                     /*hasMetaData*/ false,
806                                                     &queueItmCtx);
807         }
808
809         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
810         switch (mtype) {
811         case MutationStatus::NoMem:
812             ret = ENGINE_ENOMEM;
813             break;
814         case MutationStatus::IsLocked:
815             ret = ENGINE_LOCKED;
816             break;
817         case MutationStatus::InvalidCas:
818         case MutationStatus::NotFound:
819             ret = ENGINE_NOT_STORED;
820             break;
821         // FALLTHROUGH
822         case MutationStatus::WasDirty:
823         // Even if the item was dirty, push it into the vbucket's open
824         // checkpoint.
825         case MutationStatus::WasClean:
826             notifyNewSeqno(notifyCtx);
827
828             itm.setBySeqno(v->getBySeqno());
829             itm.setCas(v->getCas());
830             break;
831         case MutationStatus::NeedBgFetch: {
832             // temp item is already created. Simply schedule a bg fetch job
833             hbl.getHTLock().unlock();
834             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
835             ret = ENGINE_EWOULDBLOCK;
836             break;
837         }
838         }
839
840         return ret;
841     } else {
842         if (eviction == VALUE_ONLY) {
843             return ENGINE_KEY_ENOENT;
844         }
845
846         if (maybeKeyExistsInFilter(itm.getKey())) {
847             return addTempItemAndBGFetch(
848                     hbl, itm.getKey(), cookie, engine, bgFetchDelay, false);
849         } else {
850             // As bloomfilter predicted that item surely doesn't exist
851             // on disk, return ENOENT for replace().
852             return ENGINE_KEY_ENOENT;
853         }
854     }
855 }
856
857 ENGINE_ERROR_CODE VBucket::addBackfillItem(Item& itm,
858                                            const GenerateBySeqno genBySeqno) {
859     auto hbl = ht.getLockedBucket(itm.getKey());
860     StoredValue* v = ht.unlocked_find(itm.getKey(),
861                                       hbl.getBucketNum(),
862                                       WantsDeleted::Yes,
863                                       TrackReference::No);
864
865     // Note that this function is only called on replica or pending vbuckets.
866     if (v && v->isLocked(ep_current_time())) {
867         v->unlock();
868     }
869
870     VBQueueItemCtx queueItmCtx(genBySeqno,
871                                GenerateCas::No,
872                                TrackCasDrift::No,
873                                /*isBackfillItem*/ true,
874                                nullptr /* No pre link should happen */);
875     MutationStatus status;
876     VBNotifyCtx notifyCtx;
877     std::tie(status, notifyCtx) = processSet(hbl,
878                                              v,
879                                              itm,
880                                              0,
881                                              /*allowExisting*/ true,
882                                              /*hasMetaData*/ true,
883                                              &queueItmCtx);
884
885     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
886     switch (status) {
887     case MutationStatus::NoMem:
888         ret = ENGINE_ENOMEM;
889         break;
890     case MutationStatus::InvalidCas:
891     case MutationStatus::IsLocked:
892         ret = ENGINE_KEY_EEXISTS;
893         break;
894     case MutationStatus::WasDirty:
895     // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
896     // set correctly, and also the sequence numbers are ordered correctly.
897     // (MB-14003)
898     case MutationStatus::NotFound:
899     // FALLTHROUGH
900     case MutationStatus::WasClean: {
901         setMaxCas(v->getCas());
902         // we unlock ht lock here because we want to avoid potential lock
903         // inversions arising from notifyNewSeqno() call
904         hbl.getHTLock().unlock();
905         notifyNewSeqno(notifyCtx);
906     } break;
907     case MutationStatus::NeedBgFetch:
908         throw std::logic_error(
909                 "VBucket::addBackfillItem: "
910                 "SET on a non-active vbucket should not require a "
911                 "bg_metadata_fetch.");
912     }
913
914     return ret;
915 }
916
917 ENGINE_ERROR_CODE VBucket::setWithMeta(Item& itm,
918                                        const uint64_t cas,
919                                        uint64_t* seqno,
920                                        const void* cookie,
921                                        EventuallyPersistentEngine& engine,
922                                        const int bgFetchDelay,
923                                        const bool force,
924                                        const bool allowExisting,
925                                        const GenerateBySeqno genBySeqno,
926                                        const GenerateCas genCas,
927                                        const bool isReplication) {
928     auto hbl = ht.getLockedBucket(itm.getKey());
929     StoredValue* v = ht.unlocked_find(itm.getKey(),
930                                       hbl.getBucketNum(),
931                                       WantsDeleted::Yes,
932                                       TrackReference::No);
933
934     bool maybeKeyExists = true;
935     if (!force) {
936         if (v) {
937             if (v->isTempInitialItem()) {
938                 bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
939                 return ENGINE_EWOULDBLOCK;
940             }
941
942             if (!(conflictResolver->resolve(*v,
943                                             itm.getMetaData(),
944                                             itm.getDataType(),
945                                             itm.isDeleted()))) {
946                 ++stats.numOpsSetMetaResolutionFailed;
947                 return ENGINE_KEY_EEXISTS;
948             }
949         } else {
950             if (maybeKeyExistsInFilter(itm.getKey())) {
951                 return addTempItemAndBGFetch(hbl,
952                                              itm.getKey(),
953                                              cookie,
954                                              engine,
955                                              bgFetchDelay,
956                                              true,
957                                              isReplication);
958             } else {
959                 maybeKeyExists = false;
960             }
961         }
962     } else {
963         if (eviction == FULL_EVICTION) {
964             // Check Bloomfilter's prediction
965             if (!maybeKeyExistsInFilter(itm.getKey())) {
966                 maybeKeyExists = false;
967             }
968         }
969     }
970
971     if (v && v->isLocked(ep_current_time()) &&
972         (getState() == vbucket_state_replica ||
973          getState() == vbucket_state_pending)) {
974         v->unlock();
975     }
976
977     VBQueueItemCtx queueItmCtx(genBySeqno,
978                                genCas,
979                                TrackCasDrift::Yes,
980                                /*isBackfillItem*/ false,
981                                nullptr /* No pre link step needed */);
982     MutationStatus status;
983     VBNotifyCtx notifyCtx;
984     std::tie(status, notifyCtx) = processSet(hbl,
985                                              v,
986                                              itm,
987                                              cas,
988                                              allowExisting,
989                                              true,
990                                              &queueItmCtx,
991                                              maybeKeyExists,
992                                              isReplication);
993
994     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
995     switch (status) {
996     case MutationStatus::NoMem:
997         ret = ENGINE_ENOMEM;
998         break;
999     case MutationStatus::InvalidCas:
1000         ret = ENGINE_KEY_EEXISTS;
1001         break;
1002     case MutationStatus::IsLocked:
1003         ret = ENGINE_LOCKED;
1004         break;
1005     case MutationStatus::WasDirty:
1006     case MutationStatus::WasClean: {
1007         if (seqno) {
1008             *seqno = static_cast<uint64_t>(v->getBySeqno());
1009         }
1010         // we unlock ht lock here because we want to avoid potential lock
1011         // inversions arising from notifyNewSeqno() call
1012         hbl.getHTLock().unlock();
1013         notifyNewSeqno(notifyCtx);
1014     } break;
1015     case MutationStatus::NotFound:
1016         ret = ENGINE_KEY_ENOENT;
1017         break;
1018     case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
1019         // + full eviction.
1020         if (v) { // temp item is already created. Simply schedule a
1021             hbl.getHTLock().unlock(); // bg fetch job.
1022             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1023             return ENGINE_EWOULDBLOCK;
1024         }
1025         ret = addTempItemAndBGFetch(hbl,
1026                                     itm.getKey(),
1027                                     cookie,
1028                                     engine,
1029                                     bgFetchDelay,
1030                                     true,
1031                                     isReplication);
1032     }
1033     }
1034
1035     return ret;
1036 }
1037
1038 ENGINE_ERROR_CODE VBucket::deleteItem(const DocKey& key,
1039                                       uint64_t& cas,
1040                                       const void* cookie,
1041                                       EventuallyPersistentEngine& engine,
1042                                       const int bgFetchDelay,
1043                                       ItemMetaData* itemMeta,
1044                                       mutation_descr_t* mutInfo) {
1045     auto hbl = ht.getLockedBucket(key);
1046     StoredValue* v = ht.unlocked_find(
1047             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1048
1049     if (!v || v->isDeleted() || v->isTempItem()) {
1050         if (eviction == VALUE_ONLY) {
1051             return ENGINE_KEY_ENOENT;
1052         } else { // Full eviction.
1053             if (!v) { // Item might be evicted from cache.
1054                 if (maybeKeyExistsInFilter(key)) {
1055                     return addTempItemAndBGFetch(
1056                             hbl, key, cookie, engine, bgFetchDelay, true);
1057                 } else {
1058                     // As bloomfilter predicted that item surely doesn't
1059                     // exist on disk, return ENOENT for deleteItem().
1060                     return ENGINE_KEY_ENOENT;
1061                 }
1062             } else if (v->isTempInitialItem()) {
1063                 hbl.getHTLock().unlock();
1064                 bgFetch(key, cookie, engine, bgFetchDelay, true);
1065                 return ENGINE_EWOULDBLOCK;
1066             } else { // Non-existent or deleted key.
1067                 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1068                     // Delete a temp non-existent item to ensure that
1069                     // if a delete were issued over an item that doesn't
1070                     // exist, then we don't preserve a temp item.
1071                     deleteStoredValue(hbl, *v);
1072                 }
1073                 return ENGINE_KEY_ENOENT;
1074             }
1075         }
1076     }
1077
1078     if (v->isLocked(ep_current_time()) &&
1079         (getState() == vbucket_state_replica ||
1080          getState() == vbucket_state_pending)) {
1081         v->unlock();
1082     }
1083
1084     if (itemMeta != nullptr) {
1085         itemMeta->cas = v->getCas();
1086     }
1087
1088     MutationStatus delrv;
1089     VBNotifyCtx notifyCtx;
1090     if (v->isExpired(ep_real_time())) {
1091         std::tie(delrv, v, notifyCtx) = processExpiredItem(hbl, *v);
1092     } else {
1093         ItemMetaData metadata;
1094         metadata.revSeqno = v->getRevSeqno() + 1;
1095         std::tie(delrv, v, notifyCtx) =
1096                 processSoftDelete(hbl,
1097                                   *v,
1098                                   cas,
1099                                   metadata,
1100                                   VBQueueItemCtx(GenerateBySeqno::Yes,
1101                                                  GenerateCas::Yes,
1102                                                  TrackCasDrift::No,
1103                                                  /*isBackfillItem*/ false,
1104                                                  nullptr /* no pre link */),
1105                                   /*use_meta*/ false,
1106                                   /*bySeqno*/ v->getBySeqno());
1107     }
1108
1109     uint64_t seqno = 0;
1110     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1111     switch (delrv) {
1112     case MutationStatus::NoMem:
1113         ret = ENGINE_ENOMEM;
1114         break;
1115     case MutationStatus::InvalidCas:
1116         ret = ENGINE_KEY_EEXISTS;
1117         break;
1118     case MutationStatus::IsLocked:
1119         ret = ENGINE_LOCKED_TMPFAIL;
1120         break;
1121     case MutationStatus::NotFound:
1122         ret = ENGINE_KEY_ENOENT;
1123     /* Fallthrough:
1124      * A NotFound return value at this point indicates that the
1125      * item has expired. But, a deletion still needs to be queued
1126      * for the item in order to persist it.
1127      */
1128     case MutationStatus::WasClean:
1129     case MutationStatus::WasDirty:
1130         if (itemMeta != nullptr) {
1131             itemMeta->revSeqno = v->getRevSeqno();
1132             itemMeta->flags = v->getFlags();
1133             itemMeta->exptime = v->getExptime();
1134         }
1135
1136         notifyNewSeqno(notifyCtx);
1137         seqno = static_cast<uint64_t>(v->getBySeqno());
1138         cas = v->getCas();
1139
1140         if (delrv != MutationStatus::NotFound) {
1141             if (mutInfo) {
1142                 mutInfo->seqno = seqno;
1143                 mutInfo->vbucket_uuid = failovers->getLatestUUID();
1144             }
1145             if (itemMeta != nullptr) {
1146                 itemMeta->cas = v->getCas();
1147             }
1148         }
1149         break;
1150     case MutationStatus::NeedBgFetch:
1151         // We already figured out if a bg fetch is requred for a full-evicted
1152         // item above.
1153         throw std::logic_error(
1154                 "VBucket::deleteItem: "
1155                 "Unexpected NEEDS_BG_FETCH from processSoftDelete");
1156     }
1157     return ret;
1158 }
1159
1160 ENGINE_ERROR_CODE VBucket::deleteWithMeta(const DocKey& key,
1161                                           uint64_t& cas,
1162                                           uint64_t* seqno,
1163                                           const void* cookie,
1164                                           EventuallyPersistentEngine& engine,
1165                                           const int bgFetchDelay,
1166                                           const bool force,
1167                                           const ItemMetaData& itemMeta,
1168                                           const bool backfill,
1169                                           const GenerateBySeqno genBySeqno,
1170                                           const GenerateCas generateCas,
1171                                           const uint64_t bySeqno,
1172                                           const bool isReplication) {
1173     auto hbl = ht.getLockedBucket(key);
1174     StoredValue* v = ht.unlocked_find(
1175             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1176     if (!force) { // Need conflict resolution.
1177         if (v) {
1178             if (v->isTempInitialItem()) {
1179                 bgFetch(key, cookie, engine, bgFetchDelay, true);
1180                 return ENGINE_EWOULDBLOCK;
1181             }
1182
1183             if (!(conflictResolver->resolve(*v,
1184                                             itemMeta,
1185                                             PROTOCOL_BINARY_RAW_BYTES,
1186                                             true))) {
1187                 ++stats.numOpsDelMetaResolutionFailed;
1188                 return ENGINE_KEY_EEXISTS;
1189             }
1190         } else {
1191             // Item is 1) deleted or not existent in the value eviction case OR
1192             // 2) deleted or evicted in the full eviction.
1193             if (maybeKeyExistsInFilter(key)) {
1194                 return addTempItemAndBGFetch(hbl,
1195                                              key,
1196                                              cookie,
1197                                              engine,
1198                                              bgFetchDelay,
1199                                              true,
1200                                              isReplication);
1201             } else {
1202                 // Even though bloomfilter predicted that item doesn't exist
1203                 // on disk, we must put this delete on disk if the cas is valid.
1204                 AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1205                 if (rv == AddStatus::NoMem) {
1206                     return ENGINE_ENOMEM;
1207                 }
1208                 v = ht.unlocked_find(key,
1209                                      hbl.getBucketNum(),
1210                                      WantsDeleted::Yes,
1211                                      TrackReference::No);
1212                 v->setDeleted();
1213             }
1214         }
1215     } else {
1216         if (!v) {
1217             // We should always try to persist a delete here.
1218             AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1219             if (rv == AddStatus::NoMem) {
1220                 return ENGINE_ENOMEM;
1221             }
1222             v = ht.unlocked_find(key,
1223                                  hbl.getBucketNum(),
1224                                  WantsDeleted::Yes,
1225                                  TrackReference::No);
1226             v->setDeleted();
1227             v->setCas(cas);
1228         } else if (v->isTempInitialItem()) {
1229             v->setDeleted();
1230             v->setCas(cas);
1231         }
1232     }
1233
1234     if (v && v->isLocked(ep_current_time()) &&
1235         (getState() == vbucket_state_replica ||
1236          getState() == vbucket_state_pending)) {
1237         v->unlock();
1238     }
1239
1240     MutationStatus delrv;
1241     VBNotifyCtx notifyCtx;
1242     if (!v) {
1243         if (eviction == FULL_EVICTION) {
1244             delrv = MutationStatus::NeedBgFetch;
1245         } else {
1246             delrv = MutationStatus::NotFound;
1247         }
1248     } else {
1249         VBQueueItemCtx queueItmCtx(genBySeqno,
1250                                    generateCas,
1251                                    TrackCasDrift::Yes,
1252                                    backfill,
1253                                    nullptr /* No pre link step needed */);
1254
1255         // system xattrs must remain
1256         std::unique_ptr<Item> itm;
1257         if (mcbp::datatype::is_xattr(v->getDatatype()) &&
1258             (itm = pruneXattrDocument(*v, itemMeta))) {
1259             std::tie(v, delrv, notifyCtx) =
1260                     updateStoredValue(hbl, *v, *itm, &queueItmCtx);
1261         } else {
1262             std::tie(delrv, v, notifyCtx) = processSoftDelete(hbl,
1263                                                               *v,
1264                                                               cas,
1265                                                               itemMeta,
1266                                                               queueItmCtx,
1267                                                               /*use_meta*/ true,
1268                                                               bySeqno);
1269         }
1270     }
1271     cas = v ? v->getCas() : 0;
1272
1273     switch (delrv) {
1274     case MutationStatus::NoMem:
1275         return ENGINE_ENOMEM;
1276     case MutationStatus::InvalidCas:
1277         return ENGINE_KEY_EEXISTS;
1278     case MutationStatus::IsLocked:
1279         return ENGINE_LOCKED_TMPFAIL;
1280     case MutationStatus::NotFound:
1281         return ENGINE_KEY_ENOENT;
1282     case MutationStatus::WasDirty:
1283     case MutationStatus::WasClean: {
1284         if (seqno) {
1285             *seqno = static_cast<uint64_t>(v->getBySeqno());
1286         }
1287         // we unlock ht lock here because we want to avoid potential lock
1288         // inversions arising from notifyNewSeqno() call
1289         hbl.getHTLock().unlock();
1290         notifyNewSeqno(notifyCtx);
1291         break;
1292     }
1293     case MutationStatus::NeedBgFetch:
1294         hbl.getHTLock().unlock();
1295         bgFetch(key, cookie, engine, bgFetchDelay, true);
1296         return ENGINE_EWOULDBLOCK;
1297     }
1298     return ENGINE_SUCCESS;
1299 }
1300
1301 void VBucket::deleteExpiredItem(const Item& it,
1302                                 time_t startTime,
1303                                 ExpireBy source) {
1304
1305     // The item is correctly trimmed (by the caller). Fetch the one in the
1306     // hashtable and replace it if the CAS match (same item; no race).
1307     // If not found in the hashtable we should add it as a deleted item
1308     const DocKey& key = it.getKey();
1309     auto hbl = ht.getLockedBucket(key);
1310     StoredValue* v = ht.unlocked_find(
1311             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1312     if (v) {
1313         if (v->getCas() != it.getCas()) {
1314             return;
1315         }
1316
1317         if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1318             bool deleted = deleteStoredValue(hbl, *v);
1319             if (!deleted) {
1320                 throw std::logic_error(
1321                         "VBucket::deleteExpiredItem: "
1322                         "Failed to delete seqno:" +
1323                         std::to_string(v->getBySeqno()) + " from bucket " +
1324                         std::to_string(hbl.getBucketNum()));
1325             }
1326         } else if (v->isExpired(startTime) && !v->isDeleted()) {
1327             VBNotifyCtx notifyCtx;
1328             std::tie(std::ignore, std::ignore, notifyCtx) =
1329                     processExpiredItem(hbl, *v);
1330             // we unlock ht lock here because we want to avoid potential lock
1331             // inversions arising from notifyNewSeqno() call
1332             hbl.getHTLock().unlock();
1333             notifyNewSeqno(notifyCtx);
1334         }
1335     } else {
1336         if (eviction == FULL_EVICTION) {
1337             // Create a temp item and delete and push it
1338             // into the checkpoint queue, only if the bloomfilter
1339             // predicts that the item may exist on disk.
1340             if (maybeKeyExistsInFilter(key)) {
1341                 AddStatus rv = addTempStoredValue(hbl, key);
1342                 if (rv == AddStatus::NoMem) {
1343                     return;
1344                 }
1345                 v = ht.unlocked_find(key,
1346                                      hbl.getBucketNum(),
1347                                      WantsDeleted::Yes,
1348                                      TrackReference::No);
1349                 v->setDeleted();
1350                 v->setRevSeqno(it.getRevSeqno());
1351                 ht.setValue(it, *v);
1352                 VBNotifyCtx notifyCtx;
1353                 std::tie(std::ignore, std::ignore, notifyCtx) =
1354                         processExpiredItem(hbl, *v);
1355                 // we unlock ht lock here because we want to avoid potential
1356                 // lock inversions arising from notifyNewSeqno() call
1357                 hbl.getHTLock().unlock();
1358                 notifyNewSeqno(notifyCtx);
1359             }
1360         }
1361     }
1362     incExpirationStat(source);
1363 }
1364
1365 ENGINE_ERROR_CODE VBucket::add(Item& itm,
1366                                const void* cookie,
1367                                EventuallyPersistentEngine& engine,
1368                                const int bgFetchDelay) {
1369     auto hbl = ht.getLockedBucket(itm.getKey());
1370     StoredValue* v = ht.unlocked_find(itm.getKey(),
1371                                       hbl.getBucketNum(),
1372                                       WantsDeleted::Yes,
1373                                       TrackReference::No);
1374
1375     bool maybeKeyExists = true;
1376     if ((v == nullptr || v->isTempInitialItem()) &&
1377         (eviction == FULL_EVICTION)) {
1378         // Check bloomfilter's prediction
1379         if (!maybeKeyExistsInFilter(itm.getKey())) {
1380             maybeKeyExists = false;
1381         }
1382     }
1383
1384     PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
1385     VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
1386                                GenerateCas::Yes,
1387                                TrackCasDrift::No,
1388                                /*isBackfillItem*/ false,
1389                                &preLinkDocumentContext);
1390     AddStatus status;
1391     VBNotifyCtx notifyCtx;
1392     std::tie(status, notifyCtx) =
1393             processAdd(hbl, v, itm, maybeKeyExists, false, &queueItmCtx);
1394
1395     switch (status) {
1396     case AddStatus::NoMem:
1397         return ENGINE_ENOMEM;
1398     case AddStatus::Exists:
1399         return ENGINE_NOT_STORED;
1400     case AddStatus::AddTmpAndBgFetch:
1401         return addTempItemAndBGFetch(
1402                 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
1403     case AddStatus::BgFetch:
1404         hbl.getHTLock().unlock();
1405         bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1406         return ENGINE_EWOULDBLOCK;
1407     case AddStatus::Success:
1408     case AddStatus::UnDel:
1409         notifyNewSeqno(notifyCtx);
1410         itm.setBySeqno(v->getBySeqno());
1411         itm.setCas(v->getCas());
1412         break;
1413     }
1414     return ENGINE_SUCCESS;
1415 }
1416
1417 std::pair<MutationStatus, GetValue> VBucket::processGetAndUpdateTtl(
1418         HashTable::HashBucketLock& hbl,
1419         const DocKey& key,
1420         StoredValue* v,
1421         time_t exptime) {
1422     if (v) {
1423         if (v->isDeleted() || v->isTempDeletedItem() ||
1424             v->isTempNonExistentItem()) {
1425             return {MutationStatus::NotFound, {}};
1426         }
1427
1428         if (!v->isResident()) {
1429             return {MutationStatus::NeedBgFetch, {}};
1430         }
1431
1432         if (v->isLocked(ep_current_time())) {
1433             return {MutationStatus::IsLocked,
1434                     GetValue(nullptr, ENGINE_KEY_EEXISTS, 0)};
1435         }
1436
1437         const bool exptime_mutated = exptime != v->getExptime();
1438         auto bySeqNo = v->getBySeqno();
1439         if (exptime_mutated) {
1440             v->markDirty();
1441             v->setExptime(exptime);
1442             v->setRevSeqno(v->getRevSeqno() + 1);
1443         }
1444
1445         Item* item =
1446                 v->toItem(v->isLocked(ep_current_time()), getId()).release();
1447
1448         GetValue rv(item, ENGINE_SUCCESS, bySeqNo);
1449
1450         if (exptime_mutated) {
1451             VBQueueItemCtx qItemCtx(GenerateBySeqno::Yes,
1452                                     GenerateCas::Yes,
1453                                     TrackCasDrift::No,
1454                                     false,
1455                                     nullptr);
1456             VBNotifyCtx notifyCtx;
1457             std::tie(v, std::ignore, notifyCtx) =
1458                     updateStoredValue(hbl, *v, *item, &qItemCtx, true);
1459             rv.getValue()->setCas(v->getCas());
1460             // we unlock ht lock here because we want to avoid potential lock
1461             // inversions arising from notifyNewSeqno() call
1462             hbl.getHTLock().unlock();
1463             notifyNewSeqno(notifyCtx);
1464         }
1465
1466         return {MutationStatus::WasClean, rv};
1467     } else {
1468         if (eviction == VALUE_ONLY) {
1469             return {MutationStatus::NotFound, {}};
1470         } else {
1471             if (maybeKeyExistsInFilter(key)) {
1472                 return {MutationStatus::NeedBgFetch, {}};
1473             } else {
1474                 // As bloomfilter predicted that item surely doesn't exist
1475                 // on disk, return ENOENT for getAndUpdateTtl().
1476                 return {MutationStatus::NotFound, {}};
1477             }
1478         }
1479     }
1480 }
1481
1482 GetValue VBucket::getAndUpdateTtl(const DocKey& key,
1483                                   const void* cookie,
1484                                   EventuallyPersistentEngine& engine,
1485                                   int bgFetchDelay,
1486                                   time_t exptime) {
1487     auto hbl = ht.getLockedBucket(key);
1488     StoredValue* v = fetchValidValue(hbl,
1489                                      key,
1490                                      WantsDeleted::Yes,
1491                                      TrackReference::Yes,
1492                                      QueueExpired::Yes);
1493     GetValue gv;
1494     MutationStatus status;
1495     std::tie(status, gv) = processGetAndUpdateTtl(hbl, key, v, exptime);
1496
1497     if (status == MutationStatus::NeedBgFetch) {
1498         if (v) {
1499             bgFetch(key, cookie, engine, bgFetchDelay);
1500             return GetValue(nullptr, ENGINE_EWOULDBLOCK, v->getBySeqno());
1501         } else {
1502             ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1503                     hbl, key, cookie, engine, bgFetchDelay, false);
1504             return GetValue(NULL, ec, -1, true);
1505         }
1506     }
1507
1508     return gv;
1509 }
1510
1511 MutationStatus VBucket::insertFromWarmup(Item& itm,
1512                                          bool eject,
1513                                          bool keyMetaDataOnly) {
1514     if (!StoredValue::hasAvailableSpace(stats, itm)) {
1515         return MutationStatus::NoMem;
1516     }
1517
1518     auto hbl = ht.getLockedBucket(itm.getKey());
1519     StoredValue* v = ht.unlocked_find(itm.getKey(),
1520                                       hbl.getBucketNum(),
1521                                       WantsDeleted::Yes,
1522                                       TrackReference::No);
1523
1524     if (v == NULL) {
1525         v = addNewStoredValue(hbl, itm, /*queueItmCtx*/ nullptr).first;
1526         if (keyMetaDataOnly) {
1527             v->markNotResident();
1528             /* For now ht stats are updated from outside ht. This seems to be
1529                a better option for now than passing a flag to
1530                addNewStoredValue() just for this func */
1531             ++(ht.numNonResidentItems);
1532         }
1533         /* For now ht stats are updated from outside ht. This seems to be
1534            a better option for now than passing a flag to
1535            addNewStoredValue() just for this func.
1536            We need to decrNumTotalItems because ht.numTotalItems is already
1537            set by warmup when it estimated the item count from disk */
1538         ht.decrNumTotalItems();
1539         v->setNewCacheItem(false);
1540     } else {
1541         if (keyMetaDataOnly) {
1542             // We don't have a better error code ;)
1543             return MutationStatus::InvalidCas;
1544         }
1545
1546         // Verify that the CAS isn't changed
1547         if (v->getCas() != itm.getCas()) {
1548             if (v->getCas() == 0) {
1549                 v->setCas(itm.getCas());
1550                 v->setFlags(itm.getFlags());
1551                 v->setExptime(itm.getExptime());
1552                 v->setRevSeqno(itm.getRevSeqno());
1553             } else {
1554                 return MutationStatus::InvalidCas;
1555             }
1556         }
1557         updateStoredValue(hbl, *v, itm, /*queueItmCtx*/ nullptr);
1558     }
1559
1560     v->markClean();
1561
1562     if (eject && !keyMetaDataOnly) {
1563         ht.unlocked_ejectItem(v, eviction);
1564     }
1565
1566     return MutationStatus::NotFound;
1567 }
1568
1569 GetValue VBucket::getInternal(const DocKey& key,
1570                               const void* cookie,
1571                               EventuallyPersistentEngine& engine,
1572                               int bgFetchDelay,
1573                               get_options_t options,
1574                               bool diskFlushAll) {
1575     const TrackReference trackReference = (options & TRACK_REFERENCE)
1576                                                   ? TrackReference::Yes
1577                                                   : TrackReference::No;
1578     const bool getDeletedValue = (options & GET_DELETED_VALUE);
1579     auto hbl = ht.getLockedBucket(key);
1580     StoredValue* v = fetchValidValue(
1581             hbl, key, WantsDeleted::Yes, trackReference, QueueExpired::Yes);
1582     if (v) {
1583         if (v->isDeleted() && !getDeletedValue) {
1584             return GetValue();
1585         }
1586         if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1587             // Delete a temp non-existent item to ensure that
1588             // if the get were issued over an item that doesn't
1589             // exist, then we dont preserve a temp item.
1590             if (options & DELETE_TEMP) {
1591                 deleteStoredValue(hbl, *v);
1592             }
1593             return GetValue();
1594         }
1595
1596         // If the value is not resident, wait for it...
1597         if (!v->isResident()) {
1598             return getInternalNonResident(
1599                     key, cookie, engine, bgFetchDelay, options, *v);
1600         }
1601
1602         // Should we hide (return -1) for the items' CAS?
1603         const bool hide_cas =
1604                 (options & HIDE_LOCKED_CAS) && v->isLocked(ep_current_time());
1605         return GetValue(v->toItem(hide_cas, getId()).release(),
1606                         ENGINE_SUCCESS,
1607                         v->getBySeqno(),
1608                         false,
1609                         v->getNRUValue());
1610     } else {
1611         if (!getDeletedValue && (eviction == VALUE_ONLY || diskFlushAll)) {
1612             return GetValue();
1613         }
1614
1615         if (maybeKeyExistsInFilter(key)) {
1616             ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1617             if (options &
1618                 QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
1619                 ec = addTempItemAndBGFetch(
1620                         hbl, key, cookie, engine, bgFetchDelay, false);
1621             }
1622             return GetValue(NULL, ec, -1, true);
1623         } else {
1624             // As bloomfilter predicted that item surely doesn't exist
1625             // on disk, return ENOENT, for getInternal().
1626             return GetValue();
1627         }
1628     }
1629 }
1630
1631 ENGINE_ERROR_CODE VBucket::getMetaData(const DocKey& key,
1632                                        const void* cookie,
1633                                        EventuallyPersistentEngine& engine,
1634                                        int bgFetchDelay,
1635                                        ItemMetaData& metadata,
1636                                        uint32_t& deleted,
1637                                        uint8_t& datatype) {
1638     deleted = 0;
1639     auto hbl = ht.getLockedBucket(key);
1640     StoredValue* v = ht.unlocked_find(
1641             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1642
1643     if (v) {
1644         stats.numOpsGetMeta++;
1645         if (v->isTempInitialItem()) {
1646             // Need bg meta fetch.
1647             bgFetch(key, cookie, engine, bgFetchDelay, true);
1648             return ENGINE_EWOULDBLOCK;
1649         } else if (v->isTempNonExistentItem()) {
1650             metadata.cas = v->getCas();
1651             return ENGINE_KEY_ENOENT;
1652         } else {
1653             if (v->isTempDeletedItem() || v->isDeleted() ||
1654                 v->isExpired(ep_real_time())) {
1655                 deleted |= GET_META_ITEM_DELETED_FLAG;
1656             }
1657
1658             if (v->isLocked(ep_current_time())) {
1659                 metadata.cas = static_cast<uint64_t>(-1);
1660             } else {
1661                 metadata.cas = v->getCas();
1662             }
1663             metadata.flags = v->getFlags();
1664             metadata.exptime = v->getExptime();
1665             metadata.revSeqno = v->getRevSeqno();
1666             datatype = v->getDatatype();
1667
1668             return ENGINE_SUCCESS;
1669         }
1670     } else {
1671         // The key wasn't found. However, this may be because it was previously
1672         // deleted or evicted with the full eviction strategy.
1673         // So, add a temporary item corresponding to the key to the hash table
1674         // and schedule a background fetch for its metadata from the persistent
1675         // store. The item's state will be updated after the fetch completes.
1676         //
1677         // Schedule this bgFetch only if the key is predicted to be may-be
1678         // existent on disk by the bloomfilter.
1679
1680         if (maybeKeyExistsInFilter(key)) {
1681             return addTempItemAndBGFetch(
1682                     hbl, key, cookie, engine, bgFetchDelay, true);
1683         } else {
1684             stats.numOpsGetMeta++;
1685             return ENGINE_KEY_ENOENT;
1686         }
1687     }
1688 }
1689
1690 ENGINE_ERROR_CODE VBucket::getKeyStats(const DocKey& key,
1691                                        const void* cookie,
1692                                        EventuallyPersistentEngine& engine,
1693                                        int bgFetchDelay,
1694                                        struct key_stats& kstats,
1695                                        WantsDeleted wantsDeleted) {
1696     auto hbl = ht.getLockedBucket(key);
1697     StoredValue* v = fetchValidValue(hbl,
1698                                      key,
1699                                      WantsDeleted::Yes,
1700                                      TrackReference::Yes,
1701                                      QueueExpired::Yes);
1702
1703     if (v) {
1704         if ((v->isDeleted() && wantsDeleted == WantsDeleted::No) ||
1705             v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1706             return ENGINE_KEY_ENOENT;
1707         }
1708         if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
1709             hbl.getHTLock().unlock();
1710             bgFetch(key, cookie, engine, bgFetchDelay, true);
1711             return ENGINE_EWOULDBLOCK;
1712         }
1713         kstats.logically_deleted = v->isDeleted();
1714         kstats.dirty = v->isDirty();
1715         kstats.exptime = v->getExptime();
1716         kstats.flags = v->getFlags();
1717         kstats.cas = v->getCas();
1718         kstats.vb_state = getState();
1719         return ENGINE_SUCCESS;
1720     } else {
1721         if (eviction == VALUE_ONLY) {
1722             return ENGINE_KEY_ENOENT;
1723         } else {
1724             if (maybeKeyExistsInFilter(key)) {
1725                 return addTempItemAndBGFetch(
1726                         hbl, key, cookie, engine, bgFetchDelay, true);
1727             } else {
1728                 // If bgFetch were false, or bloomfilter predicted that
1729                 // item surely doesn't exist on disk, return ENOENT for
1730                 // getKeyStats().
1731                 return ENGINE_KEY_ENOENT;
1732             }
1733         }
1734     }
1735 }
1736
1737 GetValue VBucket::getLocked(const DocKey& key,
1738                             rel_time_t currentTime,
1739                             uint32_t lockTimeout,
1740                             const void* cookie,
1741                             EventuallyPersistentEngine& engine,
1742                             int bgFetchDelay) {
1743     auto hbl = ht.getLockedBucket(key);
1744     StoredValue* v = fetchValidValue(hbl,
1745                                      key,
1746                                      WantsDeleted::Yes,
1747                                      TrackReference::Yes,
1748                                      QueueExpired::Yes);
1749
1750     if (v) {
1751         if (v->isDeleted() || v->isTempNonExistentItem() ||
1752             v->isTempDeletedItem()) {
1753             return GetValue(NULL, ENGINE_KEY_ENOENT);
1754         }
1755
1756         // if v is locked return error
1757         if (v->isLocked(currentTime)) {
1758             return GetValue(NULL, ENGINE_TMPFAIL);
1759         }
1760
1761         // If the value is not resident, wait for it...
1762         if (!v->isResident()) {
1763             if (cookie) {
1764                 bgFetch(key, cookie, engine, bgFetchDelay);
1765             }
1766             return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
1767         }
1768
1769         // acquire lock and increment cas value
1770         v->lock(currentTime + lockTimeout);
1771
1772         auto it = v->toItem(false, getId());
1773         it->setCas(nextHLCCas());
1774         v->setCas(it->getCas());
1775
1776         return GetValue(it.release());
1777
1778     } else {
1779         // No value found in the hashtable.
1780         switch (eviction) {
1781         case VALUE_ONLY:
1782             return GetValue(NULL, ENGINE_KEY_ENOENT);
1783
1784         case FULL_EVICTION:
1785             if (maybeKeyExistsInFilter(key)) {
1786                 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1787                         hbl, key, cookie, engine, bgFetchDelay, false);
1788                 return GetValue(NULL, ec, -1, true);
1789             } else {
1790                 // As bloomfilter predicted that item surely doesn't exist
1791                 // on disk, return ENOENT for getLocked().
1792                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1793             }
1794         }
1795         return GetValue(); // just to prevent compiler warning
1796     }
1797 }
1798
1799 void VBucket::deletedOnDiskCbk(const Item& queuedItem, bool deleted) {
1800     auto hbl = ht.getLockedBucket(queuedItem.getKey());
1801     StoredValue* v = fetchValidValue(hbl,
1802                                      queuedItem.getKey(),
1803                                      WantsDeleted::Yes,
1804                                      TrackReference::No,
1805                                      QueueExpired::Yes);
1806     // Delete the item in the hash table iff:
1807     //  1. Item is existent in hashtable, and deleted flag is true
1808     //  2. rev seqno of queued item matches rev seqno of hash table item
1809     if (v && v->isDeleted() && (queuedItem.getRevSeqno() == v->getRevSeqno())) {
1810         bool isDeleted = deleteStoredValue(hbl, *v);
1811         if (!isDeleted) {
1812             throw std::logic_error(
1813                     "deletedOnDiskCbk:callback: "
1814                     "Failed to delete key with seqno:" +
1815                     std::to_string(v->getBySeqno()) + "' from bucket " +
1816                     std::to_string(hbl.getBucketNum()));
1817         }
1818
1819         /**
1820          * Deleted items are to be added to the bloomfilter,
1821          * in either eviction policy.
1822          */
1823         addToFilter(queuedItem.getKey());
1824     }
1825
1826     if (deleted) {
1827         ++stats.totalPersisted;
1828         ++opsDelete;
1829     }
1830     doStatsForFlushing(queuedItem, queuedItem.size());
1831     --stats.diskQueueSize;
1832     decrMetaDataDisk(queuedItem);
1833 }
1834
1835 bool VBucket::deleteKey(const DocKey& key) {
1836     auto hbl = ht.getLockedBucket(key);
1837     StoredValue* v = ht.unlocked_find(
1838             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1839     if (!v) {
1840         return false;
1841     }
1842     return deleteStoredValue(hbl, *v);
1843 }
1844
1845 void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
1846                                   uint64_t prevHighSeqno) {
1847     failovers->pruneEntries(rollbackResult.highSeqno);
1848     checkpointManager.clear(*this, rollbackResult.highSeqno);
1849     setPersistedSnapshot(rollbackResult.snapStartSeqno,
1850                          rollbackResult.snapEndSeqno);
1851     incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
1852     checkpointManager.setOpenCheckpointId(1);
1853 }
1854
1855 void VBucket::dump() const {
1856     std::cerr << "VBucket[" << this << "] with state: " << toString(getState())
1857               << " numItems:" << getNumItems()
1858               << " numNonResident:" << getNumNonResidentItems()
1859               << " ht: " << std::endl << "  " << ht << std::endl
1860               << "]" << std::endl;
1861 }
1862
1863 void VBucket::_addStats(bool details, ADD_STAT add_stat, const void* c) {
1864     addStat(NULL, toString(state), add_stat, c);
1865     if (details) {
1866         size_t numItems = getNumItems();
1867         size_t tempItems = getNumTempItems();
1868         addStat("num_items", numItems, add_stat, c);
1869         addStat("num_temp_items", tempItems, add_stat, c);
1870         addStat("num_non_resident", getNumNonResidentItems(), add_stat, c);
1871         addStat("ht_memory", ht.memorySize(), add_stat, c);
1872         addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
1873         addStat("ht_cache_size", ht.cacheSize.load(), add_stat, c);
1874         addStat("ht_size", ht.getSize(), add_stat, c);
1875         addStat("num_ejects", ht.getNumEjects(), add_stat, c);
1876         addStat("ops_create", opsCreate.load(), add_stat, c);
1877         addStat("ops_update", opsUpdate.load(), add_stat, c);
1878         addStat("ops_delete", opsDelete.load(), add_stat, c);
1879         addStat("ops_reject", opsReject.load(), add_stat, c);
1880         addStat("queue_size", dirtyQueueSize.load(), add_stat, c);
1881         addStat("backfill_queue_size", getBackfillSize(), add_stat, c);
1882         addStat("queue_memory", dirtyQueueMem.load(), add_stat, c);
1883         addStat("queue_fill", dirtyQueueFill.load(), add_stat, c);
1884         addStat("queue_drain", dirtyQueueDrain.load(), add_stat, c);
1885         addStat("queue_age", getQueueAge(), add_stat, c);
1886         addStat("pending_writes", dirtyQueuePendingWrites.load(), add_stat, c);
1887
1888         addStat("high_seqno", getHighSeqno(), add_stat, c);
1889         addStat("uuid", failovers->getLatestUUID(), add_stat, c);
1890         addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
1891         addStat("bloom_filter", getFilterStatusString().data(),
1892                 add_stat, c);
1893         addStat("bloom_filter_size", getFilterSize(), add_stat, c);
1894         addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
1895         addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
1896         addStat("hp_vb_req_size", getHighPriorityChkSize(), add_stat, c);
1897         hlc.addStats(statPrefix, add_stat, c);
1898     }
1899 }
1900
1901 void VBucket::decrDirtyQueueMem(size_t decrementBy)
1902 {
1903     size_t oldVal, newVal;
1904     do {
1905         oldVal = dirtyQueueMem.load(std::memory_order_relaxed);
1906         if (oldVal < decrementBy) {
1907             newVal = 0;
1908         } else {
1909             newVal = oldVal - decrementBy;
1910         }
1911     } while (!dirtyQueueMem.compare_exchange_strong(oldVal, newVal));
1912 }
1913
1914 void VBucket::decrDirtyQueueAge(uint32_t decrementBy)
1915 {
1916     uint64_t oldVal, newVal;
1917     do {
1918         oldVal = dirtyQueueAge.load(std::memory_order_relaxed);
1919         if (oldVal < decrementBy) {
1920             newVal = 0;
1921         } else {
1922             newVal = oldVal - decrementBy;
1923         }
1924     } while (!dirtyQueueAge.compare_exchange_strong(oldVal, newVal));
1925 }
1926
1927 void VBucket::decrDirtyQueuePendingWrites(size_t decrementBy)
1928 {
1929     size_t oldVal, newVal;
1930     do {
1931         oldVal = dirtyQueuePendingWrites.load(std::memory_order_relaxed);
1932         if (oldVal < decrementBy) {
1933             newVal = 0;
1934         } else {
1935             newVal = oldVal - decrementBy;
1936         }
1937     } while (!dirtyQueuePendingWrites.compare_exchange_strong(oldVal, newVal));
1938 }
1939
1940 std::pair<MutationStatus, VBNotifyCtx> VBucket::processSet(
1941         const HashTable::HashBucketLock& hbl,
1942         StoredValue*& v,
1943         Item& itm,
1944         uint64_t cas,
1945         bool allowExisting,
1946         bool hasMetaData,
1947         const VBQueueItemCtx* queueItmCtx,
1948         bool maybeKeyExists,
1949         bool isReplication) {
1950     if (!hbl.getHTLock()) {
1951         throw std::invalid_argument(
1952                 "VBucket::processSet: htLock not held for "
1953                 "VBucket " +
1954                 std::to_string(getId()));
1955     }
1956
1957     if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
1958         return {MutationStatus::NoMem, VBNotifyCtx()};
1959     }
1960
1961     if (cas && eviction == FULL_EVICTION && maybeKeyExists) {
1962         if (!v || v->isTempInitialItem()) {
1963             return {MutationStatus::NeedBgFetch, VBNotifyCtx()};
1964         }
1965     }
1966
1967     /*
1968      * prior to checking for the lock, we should check if this object
1969      * has expired. If so, then check if CAS value has been provided
1970      * for this set op. In this case the operation should be denied since
1971      * a cas operation for a key that doesn't exist is not a very cool
1972      * thing to do. See MB 3252
1973      */
1974     if (v && v->isExpired(ep_real_time()) && !hasMetaData && !itm.isDeleted()) {
1975         if (v->isLocked(ep_current_time())) {
1976             v->unlock();
1977         }
1978         if (cas) {
1979             /* item has expired and cas value provided. Deny ! */
1980             return {MutationStatus::NotFound, VBNotifyCtx()};
1981         }
1982     }
1983
1984     if (v) {
1985         if (!allowExisting && !v->isTempItem()) {
1986             return {MutationStatus::InvalidCas, VBNotifyCtx()};
1987         }
1988         if (v->isLocked(ep_current_time())) {
1989             /*
1990              * item is locked, deny if there is cas value mismatch
1991              * or no cas value is provided by the user
1992              */
1993             if (cas != v->getCas()) {
1994                 return {MutationStatus::IsLocked, VBNotifyCtx()};
1995             }
1996             /* allow operation*/
1997             v->unlock();
1998         } else if (cas && cas != v->getCas()) {
1999             if (v->isTempNonExistentItem()) {
2000                 // This is a temporary item which marks a key as non-existent;
2001                 // therefore specifying a non-matching CAS should be exposed
2002                 // as item not existing.
2003                 return {MutationStatus::NotFound, VBNotifyCtx()};
2004             }
2005             if ((v->isTempDeletedItem() || v->isDeleted()) && !itm.isDeleted()) {
2006                 // Existing item is deleted, and we are not replacing it with
2007                 // a (different) deleted value - return not existing.
2008                 return {MutationStatus::NotFound, VBNotifyCtx()};
2009             }
2010             // None of the above special cases; the existing item cannot be
2011             // modified with the specified CAS.
2012             return {MutationStatus::InvalidCas, VBNotifyCtx()};
2013         }
2014         if (!hasMetaData) {
2015             itm.setRevSeqno(v->getRevSeqno() + 1);
2016             /* MB-23530: We must ensure that a replace operation (i.e.
2017              * set with a CAS) /fails/ if the old document is deleted; it
2018              * logically "doesn't exist". However, if the new value is deleted
2019              * this op is a /delete/ with a CAS and we must permit a
2020              * deleted -> deleted transition for Deleted Bodies.
2021              */
2022             if (cas && (v->isDeleted() || v->isTempDeletedItem()) &&
2023                 !itm.isDeleted()) {
2024                 return {MutationStatus::NotFound, VBNotifyCtx()};
2025             }
2026         }
2027
2028         MutationStatus status;
2029         VBNotifyCtx notifyCtx;
2030         std::tie(v, status, notifyCtx) =
2031                 updateStoredValue(hbl, *v, itm, queueItmCtx);
2032         return {status, notifyCtx};
2033     } else if (cas != 0) {
2034         return {MutationStatus::NotFound, VBNotifyCtx()};
2035     } else {
2036         VBNotifyCtx notifyCtx;
2037         std::tie(v, notifyCtx) = addNewStoredValue(hbl, itm, queueItmCtx);
2038         if (!hasMetaData) {
2039             updateRevSeqNoOfNewStoredValue(*v);
2040             itm.setRevSeqno(v->getRevSeqno());
2041         }
2042         return {MutationStatus::WasClean, notifyCtx};
2043     }
2044 }
2045
2046 std::pair<AddStatus, VBNotifyCtx> VBucket::processAdd(
2047         const HashTable::HashBucketLock& hbl,
2048         StoredValue*& v,
2049         Item& itm,
2050         bool maybeKeyExists,
2051         bool isReplication,
2052         const VBQueueItemCtx* queueItmCtx) {
2053     if (!hbl.getHTLock()) {
2054         throw std::invalid_argument(
2055                 "VBucket::processAdd: htLock not held for "
2056                 "VBucket " +
2057                 std::to_string(getId()));
2058     }
2059
2060     if (v && !v->isDeleted() && !v->isExpired(ep_real_time()) &&
2061         !v->isTempItem()) {
2062         return {AddStatus::Exists, VBNotifyCtx()};
2063     }
2064     if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
2065         return {AddStatus::NoMem, VBNotifyCtx()};
2066     }
2067
2068     std::pair<AddStatus, VBNotifyCtx> rv = {AddStatus::Success, VBNotifyCtx()};
2069
2070     if (v) {
2071         if (v->isTempInitialItem() && eviction == FULL_EVICTION &&
2072             maybeKeyExists) {
2073             // Need to figure out if an item exists on disk
2074             return {AddStatus::BgFetch, VBNotifyCtx()};
2075         }
2076
2077         rv.first = (v->isDeleted() || v->isExpired(ep_real_time()))
2078                            ? AddStatus::UnDel
2079                            : AddStatus::Success;
2080
2081         if (v->isTempDeletedItem()) {
2082             itm.setRevSeqno(v->getRevSeqno() + 1);
2083         } else {
2084             itm.setRevSeqno(ht.getMaxDeletedRevSeqno() + 1);
2085         }
2086
2087         if (!v->isTempItem()) {
2088             itm.setRevSeqno(v->getRevSeqno() + 1);
2089         }
2090
2091         std::tie(v, std::ignore, rv.second) =
2092                 updateStoredValue(hbl, *v, itm, queueItmCtx);
2093     } else {
2094         if (itm.getBySeqno() != StoredValue::state_temp_init) {
2095             if (eviction == FULL_EVICTION && maybeKeyExists) {
2096                 return {AddStatus::AddTmpAndBgFetch, VBNotifyCtx()};
2097             }
2098         }
2099         std::tie(v, rv.second) = addNewStoredValue(hbl, itm, queueItmCtx);
2100         updateRevSeqNoOfNewStoredValue(*v);
2101         itm.setRevSeqno(v->getRevSeqno());
2102         if (v->isTempItem()) {
2103             rv.first = AddStatus::BgFetch;
2104         }
2105     }
2106
2107     if (v->isTempItem()) {
2108         v->setNRUValue(MAX_NRU_VALUE);
2109     }
2110     return rv;
2111 }
2112
2113 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2114 VBucket::processSoftDelete(const HashTable::HashBucketLock& hbl,
2115                            StoredValue& v,
2116                            uint64_t cas,
2117                            const ItemMetaData& metadata,
2118                            const VBQueueItemCtx& queueItmCtx,
2119                            bool use_meta,
2120                            uint64_t bySeqno) {
2121     if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2122         return std::make_tuple(MutationStatus::NeedBgFetch, &v, VBNotifyCtx());
2123     }
2124
2125     if (v.isLocked(ep_current_time())) {
2126         if (cas != v.getCas()) {
2127             return std::make_tuple(MutationStatus::IsLocked, &v, VBNotifyCtx());
2128         }
2129         v.unlock();
2130     }
2131
2132     if (cas != 0 && cas != v.getCas()) {
2133         return std::make_tuple(MutationStatus::InvalidCas, &v, VBNotifyCtx());
2134     }
2135
2136     /* allow operation */
2137     v.unlock();
2138
2139     MutationStatus rv =
2140             v.isDirty() ? MutationStatus::WasDirty : MutationStatus::WasClean;
2141
2142     if (use_meta) {
2143         v.setCas(metadata.cas);
2144         v.setFlags(metadata.flags);
2145         v.setExptime(metadata.exptime);
2146     }
2147
2148     v.setRevSeqno(metadata.revSeqno);
2149     VBNotifyCtx notifyCtx;
2150     StoredValue* newSv;
2151     std::tie(newSv, notifyCtx) =
2152             softDeleteStoredValue(hbl,
2153                                   v,
2154                                   /*onlyMarkDeleted*/ false,
2155                                   queueItmCtx,
2156                                   bySeqno);
2157     ht.updateMaxDeletedRevSeqno(metadata.revSeqno);
2158     return std::make_tuple(rv, newSv, notifyCtx);
2159 }
2160
2161 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2162 VBucket::processExpiredItem(const HashTable::HashBucketLock& hbl,
2163                             StoredValue& v) {
2164     if (!hbl.getHTLock()) {
2165         throw std::invalid_argument(
2166                 "VBucket::processExpiredItem: htLock not held for VBucket " +
2167                 std::to_string(getId()));
2168     }
2169
2170     if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2171         return std::make_tuple(MutationStatus::NeedBgFetch,
2172                                &v,
2173                                queueDirty(v,
2174                                           GenerateBySeqno::Yes,
2175                                           GenerateCas::Yes,
2176                                           /*isBackfillItem*/ false));
2177     }
2178
2179     /* If the datatype is XATTR, mark the item as deleted
2180      * but don't delete the value as system xattrs can
2181      * still be queried by mobile clients even after
2182      * deletion.
2183      * TODO: The current implementation is inefficient
2184      * but functionally correct and for performance reasons
2185      * only the system xattrs need to be stored.
2186      */
2187     value_t value = v.getValue();
2188     bool onlyMarkDeleted =
2189             value && mcbp::datatype::is_xattr(value->getDataType());
2190     v.setRevSeqno(v.getRevSeqno() + 1);
2191     VBNotifyCtx notifyCtx;
2192     StoredValue* newSv;
2193     std::tie(newSv, notifyCtx) =
2194             softDeleteStoredValue(hbl,
2195                                   v,
2196                                   onlyMarkDeleted,
2197                                   VBQueueItemCtx(GenerateBySeqno::Yes,
2198                                                  GenerateCas::Yes,
2199                                                  TrackCasDrift::No,
2200                                                  /*isBackfillItem*/ false,
2201                                                  nullptr /* no pre link */),
2202                                   v.getBySeqno());
2203     ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno() + 1);
2204     return std::make_tuple(MutationStatus::NotFound, newSv, notifyCtx);
2205 }
2206
2207 bool VBucket::deleteStoredValue(const HashTable::HashBucketLock& hbl,
2208                                 StoredValue& v) {
2209     if (!v.isDeleted() && v.isLocked(ep_current_time())) {
2210         return false;
2211     }
2212
2213     /* StoredValue deleted here. If any other in-memory data structures are
2214        using the StoredValue intrusively then they must have handled the delete
2215        by this point */
2216     ht.unlocked_del(hbl, v.getKey());
2217     return true;
2218 }
2219
2220 AddStatus VBucket::addTempStoredValue(const HashTable::HashBucketLock& hbl,
2221                                       const DocKey& key,
2222                                       bool isReplication) {
2223     uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_RAW_BYTES};
2224     static_assert(sizeof(ext_meta) == 1,
2225                   "VBucket::addTempStoredValue(): expected "
2226                   "EXT_META_LEN to be 1");
2227     Item itm(key,
2228              /*flags*/ 0,
2229              /*exp*/ 0,
2230              /*data*/ NULL,
2231              /*size*/ 0,
2232              ext_meta,
2233              sizeof(ext_meta),
2234              0,
2235              StoredValue::state_temp_init);
2236
2237     /* if a temp item for a possibly deleted, set it non-resident by resetting
2238        the value cuz normally a new item added is considered resident which
2239        does not apply for temp item. */
2240     StoredValue* v = nullptr;
2241     return processAdd(hbl, v, itm, true, isReplication, nullptr).first;
2242 }
2243
2244 void VBucket::notifyNewSeqno(const VBNotifyCtx& notifyCtx) {
2245     if (newSeqnoCb) {
2246         newSeqnoCb->callback(getId(), notifyCtx);
2247     }
2248 }
2249
2250 /*
2251  * Queue the item to the checkpoint and return the seqno the item was
2252  * allocated.
2253  */
2254 int64_t VBucket::queueItem(Item* item, OptionalSeqno seqno) {
2255     item->setVBucketId(id);
2256     queued_item qi(item);
2257     checkpointManager.queueDirty(
2258             *this,
2259             qi,
2260             seqno ? GenerateBySeqno::No : GenerateBySeqno::Yes,
2261             GenerateCas::Yes,
2262             nullptr /* No pre link step as this is for system events */);
2263     VBNotifyCtx notifyCtx;
2264     // If the seqno is initialized, skip replication notification
2265     notifyCtx.notifyReplication = !seqno.is_initialized();
2266     notifyCtx.notifyFlusher = true;
2267     notifyCtx.bySeqno = qi->getBySeqno();
2268     notifyNewSeqno(notifyCtx);
2269     return qi->getBySeqno();
2270 }
2271
2272 VBNotifyCtx VBucket::queueDirty(StoredValue& v,
2273                                 const VBQueueItemCtx& queueItmCtx) {
2274     if (queueItmCtx.trackCasDrift == TrackCasDrift::Yes) {
2275         setMaxCasAndTrackDrift(v.getCas());
2276     }
2277     return queueDirty(v,
2278                       queueItmCtx.genBySeqno,
2279                       queueItmCtx.genCas,
2280                       queueItmCtx.isBackfillItem,
2281                       queueItmCtx.preLinkDocumentContext);
2282 }
2283
2284 void VBucket::updateRevSeqNoOfNewStoredValue(StoredValue& v) {
2285     /**
2286      * Possibly, this item is being recreated. Conservatively assign it
2287      * a seqno that is greater than the greatest seqno of all deleted
2288      * items seen so far.
2289      */
2290     uint64_t seqno = ht.getMaxDeletedRevSeqno();
2291     if (!v.isTempItem()) {
2292         ++seqno;
2293     }
2294     v.setRevSeqno(seqno);
2295 }
2296
2297 void VBucket::addHighPriorityVBEntry(uint64_t seqnoOrChkId,
2298                                      const void* cookie,
2299                                      HighPriorityVBNotify reqType) {
2300     std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2301     hpVBReqs.push_back(HighPriorityVBEntry(cookie, seqnoOrChkId, reqType));
2302     numHpVBReqs.store(hpVBReqs.size());
2303
2304     LOG(EXTENSION_LOG_NOTICE,
2305         "Added high priority async request %s "
2306         "for vb:%" PRIu16 ", Check for:%" PRIu64 ", "
2307         "Persisted upto:%" PRIu64 ", cookie:%p",
2308         to_string(reqType).c_str(),
2309         getId(),
2310         seqnoOrChkId,
2311         getPersistenceSeqno(),
2312         cookie);
2313 }
2314
2315 std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifications(
2316         EventuallyPersistentEngine& engine,
2317         uint64_t idNum,
2318         HighPriorityVBNotify notifyType) {
2319     std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2320     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2321
2322     auto entry = hpVBReqs.begin();
2323
2324     while (entry != hpVBReqs.end()) {
2325         if (notifyType != entry->reqType) {
2326             ++entry;
2327             continue;
2328         }
2329
2330         std::string logStr(to_string(notifyType));
2331
2332         hrtime_t wall_time(gethrtime() - entry->start);
2333         size_t spent = wall_time / 1000000000;
2334         if (entry->id <= idNum) {
2335             toNotify[entry->cookie] = ENGINE_SUCCESS;
2336             stats.chkPersistenceHisto.add(wall_time / 1000);
2337             adjustCheckpointFlushTimeout(wall_time / 1000000000);
2338             LOG(EXTENSION_LOG_NOTICE,
2339                 "Notified the completion of %s "
2340                 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2341                 ", "
2342                 "Persisted upto: %" PRIu64 ", cookie %p",
2343                 logStr.c_str(),
2344                 getId(),
2345                 entry->id,
2346                 idNum,
2347                 entry->cookie);
2348             entry = hpVBReqs.erase(entry);
2349         } else if (spent > getCheckpointFlushTimeout()) {
2350             adjustCheckpointFlushTimeout(spent);
2351             engine.storeEngineSpecific(entry->cookie, NULL);
2352             toNotify[entry->cookie] = ENGINE_TMPFAIL;
2353             LOG(EXTENSION_LOG_WARNING,
2354                 "Notified the timeout on %s "
2355                 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2356                 ", "
2357                 "Persisted upto: %" PRIu64 ", cookie %p",
2358                 logStr.c_str(),
2359                 getId(),
2360                 entry->id,
2361                 idNum,
2362                 entry->cookie);
2363             entry = hpVBReqs.erase(entry);
2364         } else {
2365             ++entry;
2366         }
2367     }
2368     numHpVBReqs.store(hpVBReqs.size());
2369     return toNotify;
2370 }
2371
2372 std::map<const void*, ENGINE_ERROR_CODE> VBucket::tmpFailAndGetAllHpNotifies(
2373         EventuallyPersistentEngine& engine) {
2374     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2375
2376     LockHolder lh(hpVBReqsMutex);
2377
2378     for (auto& entry : hpVBReqs) {
2379         toNotify[entry.cookie] = ENGINE_TMPFAIL;
2380         engine.storeEngineSpecific(entry.cookie, NULL);
2381     }
2382     hpVBReqs.clear();
2383
2384     return toNotify;
2385 }
2386
2387 void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
2388     size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
2389
2390     if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
2391         chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
2392     } else if (wall_time <= middle) {
2393         chkFlushTimeout = middle;
2394     } else {
2395         chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
2396     }
2397 }
2398
2399 size_t VBucket::getCheckpointFlushTimeout() {
2400     return chkFlushTimeout;
2401 }
2402
2403 std::unique_ptr<Item> VBucket::pruneXattrDocument(
2404         StoredValue& v, const ItemMetaData& itemMeta) {
2405     // Need to take a copy of the value, prune it, and add it back
2406
2407     // Create work-space document
2408     std::vector<uint8_t> workspace(v.getValue()->vlength());
2409     std::copy_n(v.getValue()->getData(),
2410                 v.getValue()->vlength(),
2411                 workspace.begin());
2412
2413     // Now attach to the XATTRs in the document
2414     auto sz = cb::xattr::get_body_offset(
2415             {reinterpret_cast<char*>(workspace.data()), workspace.size()});
2416
2417     cb::xattr::Blob xattr({workspace.data(), sz});
2418     xattr.prune_user_keys();
2419
2420     auto prunedXattrs = xattr.finalize();
2421
2422     if (prunedXattrs.size()) {
2423         // Something remains - Create a Blob and copy-in just the XATTRs
2424         auto newValue =
2425                 Blob::New(reinterpret_cast<const char*>(prunedXattrs.data()),
2426                           prunedXattrs.size(),
2427                           const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(
2428                                   v.getValue()->getExtMeta())),
2429                           v.getValue()->getExtLen());
2430
2431         return std::make_unique<Item>(v.getKey(),
2432                                       itemMeta.flags,
2433                                       itemMeta.exptime,
2434                                       newValue,
2435                                       itemMeta.cas,
2436                                       v.getBySeqno(),
2437                                       getId(),
2438                                       itemMeta.revSeqno);
2439     } else {
2440         return {};
2441     }
2442 }
2443
2444 void VBucket::DeferredDeleter::operator()(VBucket* vb) const {
2445     // If the vbucket is marked as deleting then we must schedule task to
2446     // perform the resource destruction (memory/disk).
2447     if (vb->isDeletionDeferred()) {
2448         vb->scheduleDeferredDeletion(engine);
2449         return;
2450     }
2451     delete vb;
2452 }