93ab6d8fb6c0f2dbb12b7e44d976f55846a5b1aa
[ep-engine.git] / src / vbucket.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <functional>
21 #include <list>
22 #include <set>
23 #include <string>
24 #include <vector>
25
26 #include "atomic.h"
27 #include "bgfetcher.h"
28 #include "conflict_resolution.h"
29 #include "ep_engine.h"
30 #include "ep_types.h"
31 #include "failover-table.h"
32 #include "flusher.h"
33 #include "pre_link_document_context.h"
34 #include "vbucketdeletiontask.h"
35
36 #define STATWRITER_NAMESPACE vbucket
37 #include "statwriter.h"
38 #undef STATWRITER_NAMESPACE
39 #include "stored_value_factories.h"
40
41 #include "vbucket.h"
42
43 #include <xattr/blob.h>
44 #include <xattr/utils.h>
45
46 /* Macros */
47 const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
48 const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
49
50 /* Statics definitions */
51 std::atomic<size_t> VBucket::chkFlushTimeout(MIN_CHK_FLUSH_TIMEOUT);
52
53 VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
54     std::vector<uint16_t> tmp(acceptable.size() + other.size());
55     std::vector<uint16_t>::iterator end;
56     end = std::set_symmetric_difference(acceptable.begin(),
57                                         acceptable.end(),
58                                         other.acceptable.begin(),
59                                         other.acceptable.end(),
60                                         tmp.begin());
61     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
62 }
63
64 VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
65                                                                         const {
66     std::vector<uint16_t> tmp(acceptable.size() + other.size());
67     std::vector<uint16_t>::iterator end;
68
69     end = std::set_intersection(acceptable.begin(), acceptable.end(),
70                                 other.acceptable.begin(),
71                                 other.acceptable.end(),
72                                 tmp.begin());
73     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
74 }
75
76 static bool isRange(std::set<uint16_t>::const_iterator it,
77                     const std::set<uint16_t>::const_iterator &end,
78                     size_t &length)
79 {
80     length = 0;
81     for (uint16_t val = *it;
82          it != end && (val + length) == *it;
83          ++it, ++length) {
84         // empty
85     }
86
87     --length;
88
89     return length > 1;
90 }
91
92 std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
93 {
94     std::set<uint16_t>::const_iterator it;
95
96     if (filter.acceptable.empty()) {
97         out << "{ empty }";
98     } else {
99         bool needcomma = false;
100         out << "{ ";
101         for (it = filter.acceptable.begin();
102              it != filter.acceptable.end();
103              ++it) {
104             if (needcomma) {
105                 out << ", ";
106             }
107
108             size_t length;
109             if (isRange(it, filter.acceptable.end(), length)) {
110                 std::set<uint16_t>::iterator last = it;
111                 for (size_t i = 0; i < length; ++i) {
112                     ++last;
113                 }
114                 out << "[" << *it << "," << *last << "]";
115                 it = last;
116             } else {
117                 out << *it;
118             }
119             needcomma = true;
120         }
121         out << " }";
122     }
123
124     return out;
125 }
126
127 const vbucket_state_t VBucket::ACTIVE =
128                      static_cast<vbucket_state_t>(htonl(vbucket_state_active));
129 const vbucket_state_t VBucket::REPLICA =
130                     static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
131 const vbucket_state_t VBucket::PENDING =
132                     static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
133 const vbucket_state_t VBucket::DEAD =
134                     static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
135
136 VBucket::VBucket(id_type i,
137                  vbucket_state_t newState,
138                  EPStats& st,
139                  CheckpointConfig& chkConfig,
140                  int64_t lastSeqno,
141                  uint64_t lastSnapStart,
142                  uint64_t lastSnapEnd,
143                  std::unique_ptr<FailoverTable> table,
144                  std::shared_ptr<Callback<id_type> > flusherCb,
145                  std::unique_ptr<AbstractStoredValueFactory> valFact,
146                  NewSeqnoCallback newSeqnoCb,
147                  Configuration& config,
148                  item_eviction_policy_t evictionPolicy,
149                  vbucket_state_t initState,
150                  uint64_t purgeSeqno,
151                  uint64_t maxCas,
152                  const std::string& collectionsManifest)
153     : ht(st, std::move(valFact), config.getHtSize(), config.getHtLocks()),
154       checkpointManager(st,
155                         i,
156                         chkConfig,
157                         lastSeqno,
158                         lastSnapStart,
159                         lastSnapEnd,
160                         flusherCb),
161       failovers(std::move(table)),
162       opsCreate(0),
163       opsUpdate(0),
164       opsDelete(0),
165       opsReject(0),
166       dirtyQueueSize(0),
167       dirtyQueueMem(0),
168       dirtyQueueFill(0),
169       dirtyQueueDrain(0),
170       dirtyQueueAge(0),
171       dirtyQueuePendingWrites(0),
172       metaDataDisk(0),
173       numExpiredItems(0),
174       eviction(evictionPolicy),
175       stats(st),
176       persistenceSeqno(0),
177       numHpVBReqs(0),
178       id(i),
179       state(newState),
180       initialState(initState),
181       purge_seqno(purgeSeqno),
182       takeover_backed_up(false),
183       persisted_snapshot_start(lastSnapStart),
184       persisted_snapshot_end(lastSnapEnd),
185       rollbackItemCount(0),
186       hlc(maxCas,
187           std::chrono::microseconds(config.getHlcDriftAheadThresholdUs()),
188           std::chrono::microseconds(config.getHlcDriftBehindThresholdUs())),
189       statPrefix("vb_" + std::to_string(i)),
190       persistenceCheckpointId(0),
191       bucketCreation(false),
192       deferredDeletion(false),
193       deferredDeletionCookie(nullptr),
194       newSeqnoCb(std::move(newSeqnoCb)),
195       manifest(collectionsManifest) {
196     if (config.getConflictResolutionType().compare("lww") == 0) {
197         conflictResolver.reset(new LastWriteWinsResolution());
198     } else {
199         conflictResolver.reset(new RevisionSeqnoResolution());
200     }
201
202     backfill.isBackfillPhase = false;
203     pendingOpsStart = 0;
204     stats.memOverhead->fetch_add(sizeof(VBucket)
205                                 + ht.memorySize() + sizeof(CheckpointManager));
206     LOG(EXTENSION_LOG_NOTICE,
207         "VBucket: created vbucket:%" PRIu16 " with state:%s "
208                 "initialState:%s "
209                 "lastSeqno:%" PRIu64 " "
210                 "lastSnapshot:{%" PRIu64 ",%" PRIu64 "} "
211                 "persisted_snapshot:{%" PRIu64 ",%" PRIu64 "} "
212                 "max_cas:%" PRIu64,
213         id, VBucket::toString(state), VBucket::toString(initialState),
214         lastSeqno, lastSnapStart, lastSnapEnd,
215         persisted_snapshot_start, persisted_snapshot_end,
216         getMaxCas());
217 }
218
219 VBucket::~VBucket() {
220     if (!pendingOps.empty()) {
221         LOG(EXTENSION_LOG_WARNING,
222             "~Vbucket(): vbucket:%" PRIu16 " has %ld pending ops",
223             id,
224             pendingOps.size());
225     }
226
227     stats.diskQueueSize.fetch_sub(dirtyQueueSize.load());
228     stats.vbBackfillQueueSize.fetch_sub(getBackfillSize());
229
230     // Clear out the bloomfilter(s)
231     clearFilter();
232
233     stats.memOverhead->fetch_sub(sizeof(VBucket) + ht.memorySize() +
234                                 sizeof(CheckpointManager));
235
236     LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
237 }
238
239 void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
240                          ENGINE_ERROR_CODE code) {
241     std::unique_lock<std::mutex> lh(pendingOpLock);
242
243     if (pendingOpsStart > 0) {
244         hrtime_t now = gethrtime();
245         if (now > pendingOpsStart) {
246             hrtime_t d = (now - pendingOpsStart) / 1000;
247             stats.pendingOpsHisto.add(d);
248             atomic_setIfBigger(stats.pendingOpsMaxDuration, d);
249         }
250     } else {
251         return;
252     }
253
254     pendingOpsStart = 0;
255     stats.pendingOps.fetch_sub(pendingOps.size());
256     atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
257
258     while (!pendingOps.empty()) {
259         const void *pendingOperation = pendingOps.back();
260         pendingOps.pop_back();
261         // We don't want to hold the pendingOpLock when
262         // calling notifyIOComplete.
263         lh.unlock();
264         engine.notifyIOComplete(pendingOperation, code);
265         lh.lock();
266     }
267
268     LOG(EXTENSION_LOG_INFO,
269         "Fired pendings ops for vbucket %" PRIu16 " in state %s\n",
270         id, VBucket::toString(state));
271 }
272
273 void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
274
275     if (state == vbucket_state_active) {
276         fireAllOps(engine, ENGINE_SUCCESS);
277     } else if (state == vbucket_state_pending) {
278         // Nothing
279     } else {
280         fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
281     }
282 }
283
284 void VBucket::setState(vbucket_state_t to) {
285     vbucket_state_t oldstate;
286     {
287         WriterLockHolder wlh(stateLock);
288         oldstate = state;
289
290         if (to == vbucket_state_active &&
291             checkpointManager.getOpenCheckpointId() < 2) {
292             checkpointManager.setOpenCheckpointId(2);
293         }
294
295         LOG(EXTENSION_LOG_NOTICE,
296             "VBucket::setState: transitioning vbucket:%" PRIu16 " from:%s to:%s",
297             id, VBucket::toString(oldstate), VBucket::toString(to));
298
299         state = to;
300     }
301 }
302
303 vbucket_state VBucket::getVBucketState() const {
304      auto persisted_range = getPersistedSnapshot();
305
306      return vbucket_state{getState(),
307                           getPersistenceCheckpointId(), 0, getHighSeqno(),
308                           getPurgeSeqno(),
309                           persisted_range.start, persisted_range.end,
310                           getMaxCas(), failovers->toJSON()};
311 }
312
313
314
315 void VBucket::doStatsForQueueing(const Item& qi, size_t itemBytes)
316 {
317     ++dirtyQueueSize;
318     dirtyQueueMem.fetch_add(sizeof(Item));
319     ++dirtyQueueFill;
320     dirtyQueueAge.fetch_add(qi.getQueuedTime());
321     dirtyQueuePendingWrites.fetch_add(itemBytes);
322 }
323
324 void VBucket::doStatsForFlushing(const Item& qi, size_t itemBytes) {
325     --dirtyQueueSize;
326     decrDirtyQueueMem(sizeof(Item));
327     ++dirtyQueueDrain;
328     decrDirtyQueueAge(qi.getQueuedTime());
329     decrDirtyQueuePendingWrites(itemBytes);
330 }
331
332 void VBucket::incrMetaDataDisk(const Item& qi) {
333     metaDataDisk.fetch_add(qi.getKey().size() + sizeof(ItemMetaData));
334 }
335
336 void VBucket::decrMetaDataDisk(const Item& qi) {
337     // assume couchstore remove approx this much data from disk
338     metaDataDisk.fetch_sub((qi.getKey().size() + sizeof(ItemMetaData)));
339 }
340
341 void VBucket::resetStats() {
342     opsCreate.store(0);
343     opsUpdate.store(0);
344     opsDelete.store(0);
345     opsReject.store(0);
346
347     stats.diskQueueSize.fetch_sub(dirtyQueueSize.exchange(0));
348     dirtyQueueMem.store(0);
349     dirtyQueueFill.store(0);
350     dirtyQueueAge.store(0);
351     dirtyQueuePendingWrites.store(0);
352     dirtyQueueDrain.store(0);
353
354     hlc.resetStats();
355 }
356
357 template <typename T>
358 void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
359                       const void *c) {
360     std::string stat = statPrefix;
361     if (nm != NULL) {
362         add_prefixed_stat(statPrefix, nm, val, add_stat, c);
363     } else {
364         add_casted_stat(statPrefix.data(), val, add_stat, c);
365     }
366 }
367
368 void VBucket::handlePreExpiry(StoredValue& v) {
369     value_t value = v.getValue();
370     if (value) {
371         std::unique_ptr<Item> itm(v.toItem(false, id));
372         item_info itm_info;
373         EventuallyPersistentEngine* engine = ObjectRegistry::getCurrentEngine();
374         itm_info = itm->toItemInfo(failovers->getLatestUUID());
375         value_t new_val(Blob::Copy(*value));
376         itm->setValue(new_val);
377
378         SERVER_HANDLE_V1* sapi = engine->getServerApi();
379         /* TODO: In order to minimize allocations, the callback needs to
380          * allocate an item whose value size will be exactly the size of the
381          * value after pre-expiry is performed.
382          */
383         if (sapi->document->pre_expiry(itm_info)) {
384             char* extMeta = const_cast<char *>(v.getValue()->getExtMeta());
385             Item new_item(v.getKey(), v.getFlags(), v.getExptime(),
386                           itm_info.value[0].iov_base, itm_info.value[0].iov_len,
387                           reinterpret_cast<uint8_t*>(extMeta),
388                           v.getValue()->getExtLen(), v.getCas(),
389                           v.getBySeqno(), id, v.getRevSeqno(),
390                           v.getNRUValue());
391
392             new_item.setDeleted();
393             v.setValue(new_item, ht);
394         }
395     }
396 }
397
398 size_t VBucket::getNumNonResidentItems() const {
399     if (eviction == VALUE_ONLY) {
400         return ht.getNumInMemoryNonResItems();
401     } else {
402         size_t num_items = ht.getNumItems();
403         size_t num_res_items = ht.getNumInMemoryItems() -
404                                ht.getNumInMemoryNonResItems();
405         return num_items > num_res_items ? (num_items - num_res_items) : 0;
406     }
407 }
408
409
410 uint64_t VBucket::getPersistenceCheckpointId() const {
411     return persistenceCheckpointId.load();
412 }
413
414 void VBucket::setPersistenceCheckpointId(uint64_t checkpointId) {
415     persistenceCheckpointId.store(checkpointId);
416 }
417
418 void VBucket::markDirty(const DocKey& key) {
419     auto hbl = ht.getLockedBucket(key);
420     StoredValue* v = ht.unlocked_find(
421             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::Yes);
422     if (v) {
423         v->markDirty();
424     } else {
425         LOG(EXTENSION_LOG_WARNING, "markDirty: Error marking dirty, a key "
426             "is missing from vb:%" PRIu16, id);
427     }
428 }
429
430 bool VBucket::isResidentRatioUnderThreshold(float threshold) {
431     if (eviction != FULL_EVICTION) {
432         throw std::invalid_argument("VBucket::isResidentRatioUnderThreshold: "
433                 "policy (which is " + std::to_string(eviction) +
434                 ") must be FULL_EVICTION");
435     }
436     size_t num_items = getNumItems();
437     size_t num_non_resident_items = getNumNonResidentItems();
438     if (threshold >= ((float)(num_items - num_non_resident_items) /
439                                                                 num_items)) {
440         return true;
441     } else {
442         return false;
443     }
444 }
445
446 void VBucket::createFilter(size_t key_count, double probability) {
447     // Create the actual bloom filter upon vbucket creation during
448     // scenarios:
449     //      - Bucket creation
450     //      - Rebalance
451     LockHolder lh(bfMutex);
452     if (bFilter == nullptr && tempFilter == nullptr) {
453         bFilter = std::make_unique<BloomFilter>(key_count, probability,
454                                         BFILTER_ENABLED);
455     } else {
456         LOG(EXTENSION_LOG_WARNING, "(vb %" PRIu16 ") Bloom filter / Temp filter"
457             " already exist!", id);
458     }
459 }
460
461 void VBucket::initTempFilter(size_t key_count, double probability) {
462     // Create a temp bloom filter with status as COMPACTING,
463     // if the main filter is found to exist, set its state to
464     // COMPACTING as well.
465     LockHolder lh(bfMutex);
466     tempFilter = std::make_unique<BloomFilter>(key_count, probability,
467                                      BFILTER_COMPACTING);
468     if (bFilter) {
469         bFilter->setStatus(BFILTER_COMPACTING);
470     }
471 }
472
473 void VBucket::addToFilter(const DocKey& key) {
474     LockHolder lh(bfMutex);
475     if (bFilter) {
476         bFilter->addKey(key);
477     }
478
479     // If the temp bloom filter is not found to be NULL,
480     // it means that compaction is running on the particular
481     // vbucket. Therefore add the key to the temp filter as
482     // well, as once compaction completes the temp filter
483     // will replace the main bloom filter.
484     if (tempFilter) {
485         tempFilter->addKey(key);
486     }
487 }
488
489 bool VBucket::maybeKeyExistsInFilter(const DocKey& key) {
490     LockHolder lh(bfMutex);
491     if (bFilter) {
492         return bFilter->maybeKeyExists(key);
493     } else {
494         // If filter doesn't exist, allow the BgFetch to go through.
495         return true;
496     }
497 }
498
499 bool VBucket::isTempFilterAvailable() {
500     LockHolder lh(bfMutex);
501     if (tempFilter &&
502         (tempFilter->getStatus() == BFILTER_COMPACTING ||
503          tempFilter->getStatus() == BFILTER_ENABLED)) {
504         return true;
505     } else {
506         return false;
507     }
508 }
509
510 void VBucket::addToTempFilter(const DocKey& key) {
511     // Keys will be added to only the temp filter during
512     // compaction.
513     LockHolder lh(bfMutex);
514     if (tempFilter) {
515         tempFilter->addKey(key);
516     }
517 }
518
519 void VBucket::swapFilter() {
520     // Delete the main bloom filter and replace it with
521     // the temp filter that was populated during compaction,
522     // only if the temp filter's state is found to be either at
523     // COMPACTING or ENABLED (if in the case the user enables
524     // bloomfilters for some reason while compaction was running).
525     // Otherwise, it indicates that the filter's state was
526     // possibly disabled during compaction, therefore clear out
527     // the temp filter. If it gets enabled at some point, a new
528     // bloom filter will be made available after the next
529     // compaction.
530
531     LockHolder lh(bfMutex);
532     if (tempFilter) {
533         bFilter.reset();
534
535         if (tempFilter->getStatus() == BFILTER_COMPACTING ||
536              tempFilter->getStatus() == BFILTER_ENABLED) {
537             bFilter = std::move(tempFilter);
538             bFilter->setStatus(BFILTER_ENABLED);
539         }
540         tempFilter.reset();
541     }
542 }
543
544 void VBucket::clearFilter() {
545     LockHolder lh(bfMutex);
546     bFilter.reset();
547     tempFilter.reset();
548 }
549
550 void VBucket::setFilterStatus(bfilter_status_t to) {
551     LockHolder lh(bfMutex);
552     if (bFilter) {
553         bFilter->setStatus(to);
554     }
555     if (tempFilter) {
556         tempFilter->setStatus(to);
557     }
558 }
559
560 std::string VBucket::getFilterStatusString() {
561     LockHolder lh(bfMutex);
562     if (bFilter) {
563         return bFilter->getStatusString();
564     } else if (tempFilter) {
565         return tempFilter->getStatusString();
566     } else {
567         return "DOESN'T EXIST";
568     }
569 }
570
571 size_t VBucket::getFilterSize() {
572     LockHolder lh(bfMutex);
573     if (bFilter) {
574         return bFilter->getFilterSize();
575     } else {
576         return 0;
577     }
578 }
579
580 size_t VBucket::getNumOfKeysInFilter() {
581     LockHolder lh(bfMutex);
582     if (bFilter) {
583         return bFilter->getNumOfKeysInFilter();
584     } else {
585         return 0;
586     }
587 }
588
589 VBNotifyCtx VBucket::queueDirty(
590         StoredValue& v,
591         const GenerateBySeqno generateBySeqno,
592         const GenerateCas generateCas,
593         const bool isBackfillItem,
594         PreLinkDocumentContext* preLinkDocumentContext) {
595     VBNotifyCtx notifyCtx;
596
597     queued_item qi(v.toItem(false, getId()));
598
599     if (isBackfillItem) {
600         queueBackfillItem(qi, generateBySeqno);
601         notifyCtx.notifyFlusher = true;
602         /* During backfill on a TAP receiver we need to update the snapshot
603          range in the checkpoint. Has to be done here because in case of TAP
604          backfill, above, we use vb.queueBackfillItem() instead of
605          vb.checkpointManager.queueDirty() */
606         if (generateBySeqno == GenerateBySeqno::Yes) {
607             checkpointManager.resetSnapshotRange();
608         }
609     } else {
610         notifyCtx.notifyFlusher =
611                 checkpointManager.queueDirty(*this,
612                                              qi,
613                                              generateBySeqno,
614                                              generateCas,
615                                              preLinkDocumentContext);
616         notifyCtx.notifyReplication = true;
617         if (GenerateCas::Yes == generateCas) {
618             v.setCas(qi->getCas());
619         }
620     }
621
622     v.setBySeqno(qi->getBySeqno());
623     notifyCtx.bySeqno = qi->getBySeqno();
624
625     return notifyCtx;
626 }
627
628 StoredValue* VBucket::fetchValidValue(HashTable::HashBucketLock& hbl,
629                                       const DocKey& key,
630                                       WantsDeleted wantsDeleted,
631                                       TrackReference trackReference,
632                                       QueueExpired queueExpired) {
633     if (!hbl.getHTLock()) {
634         throw std::logic_error(
635                 "Hash bucket lock not held in "
636                 "VBucket::fetchValidValue() for hash bucket: " +
637                 std::to_string(hbl.getBucketNum()) + "for key: " +
638                 std::string(reinterpret_cast<const char*>(key.data()),
639                             key.size()));
640     }
641     StoredValue* v = ht.unlocked_find(
642             key, hbl.getBucketNum(), wantsDeleted, trackReference);
643     if (v && !v->isDeleted() && !v->isTempItem()) {
644         // In the deleted case, we ignore expiration time.
645         if (v->isExpired(ep_real_time())) {
646             if (getState() != vbucket_state_active) {
647                 return wantsDeleted == WantsDeleted::Yes ? v : NULL;
648             }
649
650             // queueDirty only allowed on active VB
651             if (queueExpired == QueueExpired::Yes &&
652                 getState() == vbucket_state_active) {
653                 incExpirationStat(ExpireBy::Access);
654                 handlePreExpiry(*v);
655                 VBNotifyCtx notifyCtx;
656                 std::tie(std::ignore, v, notifyCtx) =
657                         processExpiredItem(hbl, *v);
658                 notifyNewSeqno(notifyCtx);
659             }
660             return wantsDeleted == WantsDeleted::Yes ? v : NULL;
661         }
662     }
663     return v;
664 }
665
666 void VBucket::incExpirationStat(const ExpireBy source) {
667     switch (source) {
668     case ExpireBy::Pager:
669         ++stats.expired_pager;
670         break;
671     case ExpireBy::Compactor:
672         ++stats.expired_compactor;
673         break;
674     case ExpireBy::Access:
675         ++stats.expired_access;
676         break;
677     }
678     ++numExpiredItems;
679 }
680
681 MutationStatus VBucket::setFromInternal(Item& itm) {
682     return ht.set(itm);
683 }
684
685 ENGINE_ERROR_CODE VBucket::set(Item& itm,
686                                const void* cookie,
687                                EventuallyPersistentEngine& engine,
688                                const int bgFetchDelay) {
689     bool cas_op = (itm.getCas() != 0);
690     auto hbl = ht.getLockedBucket(itm.getKey());
691     StoredValue* v = ht.unlocked_find(itm.getKey(),
692                                       hbl.getBucketNum(),
693                                       WantsDeleted::Yes,
694                                       TrackReference::No);
695     if (v && v->isLocked(ep_current_time()) &&
696         (getState() == vbucket_state_replica ||
697          getState() == vbucket_state_pending)) {
698         v->unlock();
699     }
700
701     bool maybeKeyExists = true;
702     // If we didn't find a valid item, check Bloomfilter's prediction if in
703     // full eviction policy and for a CAS operation.
704     if ((v == nullptr || v->isTempInitialItem()) &&
705         (eviction == FULL_EVICTION) && (itm.getCas() != 0)) {
706         // Check Bloomfilter's prediction
707         if (!maybeKeyExistsInFilter(itm.getKey())) {
708             maybeKeyExists = false;
709         }
710     }
711
712     PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
713     VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
714                                GenerateCas::Yes,
715                                TrackCasDrift::No,
716                                /*isBackfillItem*/ false,
717                                &preLinkDocumentContext);
718
719     MutationStatus status;
720     VBNotifyCtx notifyCtx;
721     std::tie(status, notifyCtx) = processSet(hbl,
722                                              v,
723                                              itm,
724                                              itm.getCas(),
725                                              /*allowExisting*/ true,
726                                              /*hashMetaData*/ false,
727                                              &queueItmCtx,
728                                              maybeKeyExists);
729
730     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
731     switch (status) {
732     case MutationStatus::NoMem:
733         ret = ENGINE_ENOMEM;
734         break;
735     case MutationStatus::InvalidCas:
736         ret = ENGINE_KEY_EEXISTS;
737         break;
738     case MutationStatus::IsLocked:
739         ret = ENGINE_LOCKED;
740         break;
741     case MutationStatus::NotFound:
742         if (cas_op) {
743             ret = ENGINE_KEY_ENOENT;
744             break;
745         }
746     // FALLTHROUGH
747     case MutationStatus::WasDirty:
748     // Even if the item was dirty, push it into the vbucket's open
749     // checkpoint.
750     case MutationStatus::WasClean:
751         notifyNewSeqno(notifyCtx);
752
753         itm.setBySeqno(v->getBySeqno());
754         itm.setCas(v->getCas());
755         break;
756     case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
757         // +
758         // full eviction.
759         if (v) {
760             // temp item is already created. Simply schedule a bg fetch job
761             hbl.getHTLock().unlock();
762             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
763             return ENGINE_EWOULDBLOCK;
764         }
765         ret = addTempItemAndBGFetch(
766                 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
767         break;
768     }
769     }
770
771     return ret;
772 }
773
774 ENGINE_ERROR_CODE VBucket::replace(Item& itm,
775                                    const void* cookie,
776                                    EventuallyPersistentEngine& engine,
777                                    const int bgFetchDelay) {
778     auto hbl = ht.getLockedBucket(itm.getKey());
779     StoredValue* v = ht.unlocked_find(itm.getKey(),
780                                       hbl.getBucketNum(),
781                                       WantsDeleted::Yes,
782                                       TrackReference::No);
783     if (v) {
784         if (v->isDeleted() || v->isTempDeletedItem() ||
785             v->isTempNonExistentItem()) {
786             return ENGINE_KEY_ENOENT;
787         }
788
789         MutationStatus mtype;
790         VBNotifyCtx notifyCtx;
791         if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
792             mtype = MutationStatus::NeedBgFetch;
793         } else {
794             PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
795             VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
796                                        GenerateCas::Yes,
797                                        TrackCasDrift::No,
798                                        /*isBackfillItem*/ false,
799                                        &preLinkDocumentContext);
800             std::tie(mtype, notifyCtx) = processSet(hbl,
801                                                     v,
802                                                     itm,
803                                                     0,
804                                                     /*allowExisting*/ true,
805                                                     /*hasMetaData*/ false,
806                                                     &queueItmCtx);
807         }
808
809         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
810         switch (mtype) {
811         case MutationStatus::NoMem:
812             ret = ENGINE_ENOMEM;
813             break;
814         case MutationStatus::IsLocked:
815             ret = ENGINE_LOCKED;
816             break;
817         case MutationStatus::InvalidCas:
818         case MutationStatus::NotFound:
819             ret = ENGINE_NOT_STORED;
820             break;
821         // FALLTHROUGH
822         case MutationStatus::WasDirty:
823         // Even if the item was dirty, push it into the vbucket's open
824         // checkpoint.
825         case MutationStatus::WasClean:
826             notifyNewSeqno(notifyCtx);
827
828             itm.setBySeqno(v->getBySeqno());
829             itm.setCas(v->getCas());
830             break;
831         case MutationStatus::NeedBgFetch: {
832             // temp item is already created. Simply schedule a bg fetch job
833             hbl.getHTLock().unlock();
834             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
835             ret = ENGINE_EWOULDBLOCK;
836             break;
837         }
838         }
839
840         return ret;
841     } else {
842         if (eviction == VALUE_ONLY) {
843             return ENGINE_KEY_ENOENT;
844         }
845
846         if (maybeKeyExistsInFilter(itm.getKey())) {
847             return addTempItemAndBGFetch(
848                     hbl, itm.getKey(), cookie, engine, bgFetchDelay, false);
849         } else {
850             // As bloomfilter predicted that item surely doesn't exist
851             // on disk, return ENOENT for replace().
852             return ENGINE_KEY_ENOENT;
853         }
854     }
855 }
856
857 ENGINE_ERROR_CODE VBucket::addBackfillItem(Item& itm,
858                                            const GenerateBySeqno genBySeqno) {
859     auto hbl = ht.getLockedBucket(itm.getKey());
860     StoredValue* v = ht.unlocked_find(itm.getKey(),
861                                       hbl.getBucketNum(),
862                                       WantsDeleted::Yes,
863                                       TrackReference::No);
864
865     // Note that this function is only called on replica or pending vbuckets.
866     if (v && v->isLocked(ep_current_time())) {
867         v->unlock();
868     }
869
870     VBQueueItemCtx queueItmCtx(genBySeqno,
871                                GenerateCas::No,
872                                TrackCasDrift::No,
873                                /*isBackfillItem*/ true,
874                                nullptr /* No pre link should happen */);
875     MutationStatus status;
876     VBNotifyCtx notifyCtx;
877     std::tie(status, notifyCtx) = processSet(hbl,
878                                              v,
879                                              itm,
880                                              0,
881                                              /*allowExisting*/ true,
882                                              /*hasMetaData*/ true,
883                                              &queueItmCtx);
884
885     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
886     switch (status) {
887     case MutationStatus::NoMem:
888         ret = ENGINE_ENOMEM;
889         break;
890     case MutationStatus::InvalidCas:
891     case MutationStatus::IsLocked:
892         ret = ENGINE_KEY_EEXISTS;
893         break;
894     case MutationStatus::WasDirty:
895     // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
896     // set correctly, and also the sequence numbers are ordered correctly.
897     // (MB-14003)
898     case MutationStatus::NotFound:
899     // FALLTHROUGH
900     case MutationStatus::WasClean: {
901         setMaxCas(v->getCas());
902         // we unlock ht lock here because we want to avoid potential lock
903         // inversions arising from notifyNewSeqno() call
904         hbl.getHTLock().unlock();
905         notifyNewSeqno(notifyCtx);
906     } break;
907     case MutationStatus::NeedBgFetch:
908         throw std::logic_error(
909                 "VBucket::addBackfillItem: "
910                 "SET on a non-active vbucket should not require a "
911                 "bg_metadata_fetch.");
912     }
913
914     return ret;
915 }
916
917 ENGINE_ERROR_CODE VBucket::setWithMeta(Item& itm,
918                                        const uint64_t cas,
919                                        uint64_t* seqno,
920                                        const void* cookie,
921                                        EventuallyPersistentEngine& engine,
922                                        const int bgFetchDelay,
923                                        const bool force,
924                                        const bool allowExisting,
925                                        const GenerateBySeqno genBySeqno,
926                                        const GenerateCas genCas,
927                                        const bool isReplication) {
928     auto hbl = ht.getLockedBucket(itm.getKey());
929     StoredValue* v = ht.unlocked_find(itm.getKey(),
930                                       hbl.getBucketNum(),
931                                       WantsDeleted::Yes,
932                                       TrackReference::No);
933
934     bool maybeKeyExists = true;
935     if (!force) {
936         if (v) {
937             if (v->isTempInitialItem()) {
938                 bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
939                 return ENGINE_EWOULDBLOCK;
940             }
941
942             if (!(conflictResolver->resolve(*v,
943                                             itm.getMetaData(),
944                                             itm.getDataType(),
945                                             itm.isDeleted()))) {
946                 ++stats.numOpsSetMetaResolutionFailed;
947                 return ENGINE_KEY_EEXISTS;
948             }
949         } else {
950             if (maybeKeyExistsInFilter(itm.getKey())) {
951                 return addTempItemAndBGFetch(hbl,
952                                              itm.getKey(),
953                                              cookie,
954                                              engine,
955                                              bgFetchDelay,
956                                              true,
957                                              isReplication);
958             } else {
959                 maybeKeyExists = false;
960             }
961         }
962     } else {
963         if (eviction == FULL_EVICTION) {
964             // Check Bloomfilter's prediction
965             if (!maybeKeyExistsInFilter(itm.getKey())) {
966                 maybeKeyExists = false;
967             }
968         }
969     }
970
971     if (v && v->isLocked(ep_current_time()) &&
972         (getState() == vbucket_state_replica ||
973          getState() == vbucket_state_pending)) {
974         v->unlock();
975     }
976
977     VBQueueItemCtx queueItmCtx(genBySeqno,
978                                genCas,
979                                TrackCasDrift::Yes,
980                                /*isBackfillItem*/ false,
981                                nullptr /* No pre link step needed */);
982     MutationStatus status;
983     VBNotifyCtx notifyCtx;
984     std::tie(status, notifyCtx) = processSet(hbl,
985                                              v,
986                                              itm,
987                                              cas,
988                                              allowExisting,
989                                              true,
990                                              &queueItmCtx,
991                                              maybeKeyExists,
992                                              isReplication);
993
994     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
995     switch (status) {
996     case MutationStatus::NoMem:
997         ret = ENGINE_ENOMEM;
998         break;
999     case MutationStatus::InvalidCas:
1000         ret = ENGINE_KEY_EEXISTS;
1001         break;
1002     case MutationStatus::IsLocked:
1003         ret = ENGINE_LOCKED;
1004         break;
1005     case MutationStatus::WasDirty:
1006     case MutationStatus::WasClean: {
1007         if (seqno) {
1008             *seqno = static_cast<uint64_t>(v->getBySeqno());
1009         }
1010         // we unlock ht lock here because we want to avoid potential lock
1011         // inversions arising from notifyNewSeqno() call
1012         hbl.getHTLock().unlock();
1013         notifyNewSeqno(notifyCtx);
1014     } break;
1015     case MutationStatus::NotFound:
1016         ret = ENGINE_KEY_ENOENT;
1017         break;
1018     case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
1019         // + full eviction.
1020         if (v) { // temp item is already created. Simply schedule a
1021             hbl.getHTLock().unlock(); // bg fetch job.
1022             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1023             return ENGINE_EWOULDBLOCK;
1024         }
1025         ret = addTempItemAndBGFetch(hbl,
1026                                     itm.getKey(),
1027                                     cookie,
1028                                     engine,
1029                                     bgFetchDelay,
1030                                     true,
1031                                     isReplication);
1032     }
1033     }
1034
1035     return ret;
1036 }
1037
1038 ENGINE_ERROR_CODE VBucket::deleteItem(const DocKey& key,
1039                                       uint64_t& cas,
1040                                       const void* cookie,
1041                                       EventuallyPersistentEngine& engine,
1042                                       const int bgFetchDelay,
1043                                       ItemMetaData* itemMeta,
1044                                       mutation_descr_t* mutInfo) {
1045     auto hbl = ht.getLockedBucket(key);
1046     StoredValue* v = ht.unlocked_find(
1047             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1048
1049     if (!v || v->isDeleted() || v->isTempItem()) {
1050         if (eviction == VALUE_ONLY) {
1051             return ENGINE_KEY_ENOENT;
1052         } else { // Full eviction.
1053             if (!v) { // Item might be evicted from cache.
1054                 if (maybeKeyExistsInFilter(key)) {
1055                     return addTempItemAndBGFetch(
1056                             hbl, key, cookie, engine, bgFetchDelay, true);
1057                 } else {
1058                     // As bloomfilter predicted that item surely doesn't
1059                     // exist on disk, return ENOENT for deleteItem().
1060                     return ENGINE_KEY_ENOENT;
1061                 }
1062             } else if (v->isTempInitialItem()) {
1063                 hbl.getHTLock().unlock();
1064                 bgFetch(key, cookie, engine, bgFetchDelay, true);
1065                 return ENGINE_EWOULDBLOCK;
1066             } else { // Non-existent or deleted key.
1067                 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1068                     // Delete a temp non-existent item to ensure that
1069                     // if a delete were issued over an item that doesn't
1070                     // exist, then we don't preserve a temp item.
1071                     deleteStoredValue(hbl, *v);
1072                 }
1073                 return ENGINE_KEY_ENOENT;
1074             }
1075         }
1076     }
1077
1078     if (v->isLocked(ep_current_time()) &&
1079         (getState() == vbucket_state_replica ||
1080          getState() == vbucket_state_pending)) {
1081         v->unlock();
1082     }
1083
1084     if (itemMeta != nullptr) {
1085         itemMeta->cas = v->getCas();
1086     }
1087
1088     MutationStatus delrv;
1089     VBNotifyCtx notifyCtx;
1090     if (v->isExpired(ep_real_time())) {
1091         std::tie(delrv, v, notifyCtx) = processExpiredItem(hbl, *v);
1092     } else {
1093         ItemMetaData metadata;
1094         metadata.revSeqno = v->getRevSeqno() + 1;
1095         std::tie(delrv, v, notifyCtx) =
1096                 processSoftDelete(hbl,
1097                                   *v,
1098                                   cas,
1099                                   metadata,
1100                                   VBQueueItemCtx(GenerateBySeqno::Yes,
1101                                                  GenerateCas::Yes,
1102                                                  TrackCasDrift::No,
1103                                                  /*isBackfillItem*/ false,
1104                                                  nullptr /* no pre link */),
1105                                   /*use_meta*/ false,
1106                                   /*bySeqno*/ v->getBySeqno());
1107     }
1108
1109     uint64_t seqno = 0;
1110     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1111     switch (delrv) {
1112     case MutationStatus::NoMem:
1113         ret = ENGINE_ENOMEM;
1114         break;
1115     case MutationStatus::InvalidCas:
1116         ret = ENGINE_KEY_EEXISTS;
1117         break;
1118     case MutationStatus::IsLocked:
1119         ret = ENGINE_LOCKED_TMPFAIL;
1120         break;
1121     case MutationStatus::NotFound:
1122         ret = ENGINE_KEY_ENOENT;
1123     /* Fallthrough:
1124      * A NotFound return value at this point indicates that the
1125      * item has expired. But, a deletion still needs to be queued
1126      * for the item in order to persist it.
1127      */
1128     case MutationStatus::WasClean:
1129     case MutationStatus::WasDirty:
1130         if (itemMeta != nullptr) {
1131             itemMeta->revSeqno = v->getRevSeqno();
1132             itemMeta->flags = v->getFlags();
1133             itemMeta->exptime = v->getExptime();
1134         }
1135
1136         notifyNewSeqno(notifyCtx);
1137         seqno = static_cast<uint64_t>(v->getBySeqno());
1138         cas = v->getCas();
1139
1140         if (delrv != MutationStatus::NotFound) {
1141             if (mutInfo) {
1142                 mutInfo->seqno = seqno;
1143                 mutInfo->vbucket_uuid = failovers->getLatestUUID();
1144             }
1145             if (itemMeta != nullptr) {
1146                 itemMeta->cas = v->getCas();
1147             }
1148         }
1149         break;
1150     case MutationStatus::NeedBgFetch:
1151         // We already figured out if a bg fetch is requred for a full-evicted
1152         // item above.
1153         throw std::logic_error(
1154                 "VBucket::deleteItem: "
1155                 "Unexpected NEEDS_BG_FETCH from processSoftDelete");
1156     }
1157     return ret;
1158 }
1159
1160 ENGINE_ERROR_CODE VBucket::deleteWithMeta(const DocKey& key,
1161                                           uint64_t& cas,
1162                                           uint64_t* seqno,
1163                                           const void* cookie,
1164                                           EventuallyPersistentEngine& engine,
1165                                           const int bgFetchDelay,
1166                                           const bool force,
1167                                           const ItemMetaData& itemMeta,
1168                                           const bool backfill,
1169                                           const GenerateBySeqno genBySeqno,
1170                                           const GenerateCas generateCas,
1171                                           const uint64_t bySeqno,
1172                                           const bool isReplication) {
1173     auto hbl = ht.getLockedBucket(key);
1174     StoredValue* v = ht.unlocked_find(
1175             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1176     if (!force) { // Need conflict resolution.
1177         if (v) {
1178             if (v->isTempInitialItem()) {
1179                 bgFetch(key, cookie, engine, bgFetchDelay, true);
1180                 return ENGINE_EWOULDBLOCK;
1181             }
1182
1183             if (!(conflictResolver->resolve(*v,
1184                                             itemMeta,
1185                                             PROTOCOL_BINARY_RAW_BYTES,
1186                                             true))) {
1187                 ++stats.numOpsDelMetaResolutionFailed;
1188                 return ENGINE_KEY_EEXISTS;
1189             }
1190         } else {
1191             // Item is 1) deleted or not existent in the value eviction case OR
1192             // 2) deleted or evicted in the full eviction.
1193             if (maybeKeyExistsInFilter(key)) {
1194                 return addTempItemAndBGFetch(hbl,
1195                                              key,
1196                                              cookie,
1197                                              engine,
1198                                              bgFetchDelay,
1199                                              true,
1200                                              isReplication);
1201             } else {
1202                 // Even though bloomfilter predicted that item doesn't exist
1203                 // on disk, we must put this delete on disk if the cas is valid.
1204                 AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1205                 if (rv == AddStatus::NoMem) {
1206                     return ENGINE_ENOMEM;
1207                 }
1208                 v = ht.unlocked_find(key,
1209                                      hbl.getBucketNum(),
1210                                      WantsDeleted::Yes,
1211                                      TrackReference::No);
1212                 v->setDeleted();
1213             }
1214         }
1215     } else {
1216         if (!v) {
1217             // We should always try to persist a delete here.
1218             AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1219             if (rv == AddStatus::NoMem) {
1220                 return ENGINE_ENOMEM;
1221             }
1222             v = ht.unlocked_find(key,
1223                                  hbl.getBucketNum(),
1224                                  WantsDeleted::Yes,
1225                                  TrackReference::No);
1226             v->setDeleted();
1227             v->setCas(cas);
1228         } else if (v->isTempInitialItem()) {
1229             v->setDeleted();
1230             v->setCas(cas);
1231         }
1232     }
1233
1234     if (v && v->isLocked(ep_current_time()) &&
1235         (getState() == vbucket_state_replica ||
1236          getState() == vbucket_state_pending)) {
1237         v->unlock();
1238     }
1239
1240     MutationStatus delrv;
1241     VBNotifyCtx notifyCtx;
1242     if (!v) {
1243         if (eviction == FULL_EVICTION) {
1244             delrv = MutationStatus::NeedBgFetch;
1245         } else {
1246             delrv = MutationStatus::NotFound;
1247         }
1248     } else {
1249         VBQueueItemCtx queueItmCtx(genBySeqno,
1250                                    generateCas,
1251                                    TrackCasDrift::Yes,
1252                                    backfill,
1253                                    nullptr /* No pre link step needed */);
1254
1255         // system xattrs must remain
1256         std::unique_ptr<Item> itm;
1257         if (mcbp::datatype::is_xattr(v->getDatatype()) &&
1258             (itm = pruneXattrDocument(*v, itemMeta))) {
1259             std::tie(v, delrv, notifyCtx) =
1260                     updateStoredValue(hbl, *v, *itm, &queueItmCtx);
1261         } else {
1262             std::tie(delrv, v, notifyCtx) = processSoftDelete(hbl,
1263                                                               *v,
1264                                                               cas,
1265                                                               itemMeta,
1266                                                               queueItmCtx,
1267                                                               /*use_meta*/ true,
1268                                                               bySeqno);
1269         }
1270     }
1271     cas = v ? v->getCas() : 0;
1272
1273     switch (delrv) {
1274     case MutationStatus::NoMem:
1275         return ENGINE_ENOMEM;
1276     case MutationStatus::InvalidCas:
1277         return ENGINE_KEY_EEXISTS;
1278     case MutationStatus::IsLocked:
1279         return ENGINE_LOCKED_TMPFAIL;
1280     case MutationStatus::NotFound:
1281         return ENGINE_KEY_ENOENT;
1282     case MutationStatus::WasDirty:
1283     case MutationStatus::WasClean: {
1284         if (seqno) {
1285             *seqno = static_cast<uint64_t>(v->getBySeqno());
1286         }
1287         // we unlock ht lock here because we want to avoid potential lock
1288         // inversions arising from notifyNewSeqno() call
1289         hbl.getHTLock().unlock();
1290         notifyNewSeqno(notifyCtx);
1291         break;
1292     }
1293     case MutationStatus::NeedBgFetch:
1294         hbl.getHTLock().unlock();
1295         bgFetch(key, cookie, engine, bgFetchDelay, true);
1296         return ENGINE_EWOULDBLOCK;
1297     }
1298     return ENGINE_SUCCESS;
1299 }
1300
1301 void VBucket::deleteExpiredItem(const Item& it,
1302                                 time_t startTime,
1303                                 ExpireBy source) {
1304
1305     // The item is correctly trimmed (by the caller). Fetch the one in the
1306     // hashtable and replace it if the CAS match (same item; no race).
1307     // If not found in the hashtable we should add it as a deleted item
1308     const DocKey& key = it.getKey();
1309     auto hbl = ht.getLockedBucket(key);
1310     StoredValue* v = ht.unlocked_find(
1311             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1312     if (v) {
1313         if (v->getCas() != it.getCas()) {
1314             return;
1315         }
1316
1317         if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1318             bool deleted = deleteStoredValue(hbl, *v);
1319             if (!deleted) {
1320                 throw std::logic_error(
1321                         "VBucket::deleteExpiredItem: "
1322                         "Failed to delete seqno:" +
1323                         std::to_string(v->getBySeqno()) + " from bucket " +
1324                         std::to_string(hbl.getBucketNum()));
1325             }
1326         } else if (v->isExpired(startTime) && !v->isDeleted()) {
1327             VBNotifyCtx notifyCtx;
1328             std::tie(std::ignore, std::ignore, notifyCtx) =
1329                     processExpiredItem(hbl, *v);
1330             // we unlock ht lock here because we want to avoid potential lock
1331             // inversions arising from notifyNewSeqno() call
1332             hbl.getHTLock().unlock();
1333             notifyNewSeqno(notifyCtx);
1334         }
1335     } else {
1336         if (eviction == FULL_EVICTION) {
1337             // Create a temp item and delete and push it
1338             // into the checkpoint queue, only if the bloomfilter
1339             // predicts that the item may exist on disk.
1340             if (maybeKeyExistsInFilter(key)) {
1341                 AddStatus rv = addTempStoredValue(hbl, key);
1342                 if (rv == AddStatus::NoMem) {
1343                     return;
1344                 }
1345                 v = ht.unlocked_find(key,
1346                                      hbl.getBucketNum(),
1347                                      WantsDeleted::Yes,
1348                                      TrackReference::No);
1349                 v->setDeleted();
1350                 v->setRevSeqno(it.getRevSeqno());
1351                 v->setValue(it, ht);
1352                 VBNotifyCtx notifyCtx;
1353                 std::tie(std::ignore, std::ignore, notifyCtx) =
1354                         processExpiredItem(hbl, *v);
1355                 // we unlock ht lock here because we want to avoid potential
1356                 // lock inversions arising from notifyNewSeqno() call
1357                 hbl.getHTLock().unlock();
1358                 notifyNewSeqno(notifyCtx);
1359             }
1360         }
1361     }
1362     incExpirationStat(source);
1363 }
1364
1365 ENGINE_ERROR_CODE VBucket::add(Item& itm,
1366                                const void* cookie,
1367                                EventuallyPersistentEngine& engine,
1368                                const int bgFetchDelay) {
1369     auto hbl = ht.getLockedBucket(itm.getKey());
1370     StoredValue* v = ht.unlocked_find(itm.getKey(),
1371                                       hbl.getBucketNum(),
1372                                       WantsDeleted::Yes,
1373                                       TrackReference::No);
1374
1375     bool maybeKeyExists = true;
1376     if ((v == nullptr || v->isTempInitialItem()) &&
1377         (eviction == FULL_EVICTION)) {
1378         // Check bloomfilter's prediction
1379         if (!maybeKeyExistsInFilter(itm.getKey())) {
1380             maybeKeyExists = false;
1381         }
1382     }
1383
1384     PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
1385     VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
1386                                GenerateCas::Yes,
1387                                TrackCasDrift::No,
1388                                /*isBackfillItem*/ false,
1389                                &preLinkDocumentContext);
1390     AddStatus status;
1391     VBNotifyCtx notifyCtx;
1392     std::tie(status, notifyCtx) =
1393             processAdd(hbl, v, itm, maybeKeyExists, false, &queueItmCtx);
1394
1395     switch (status) {
1396     case AddStatus::NoMem:
1397         return ENGINE_ENOMEM;
1398     case AddStatus::Exists:
1399         return ENGINE_NOT_STORED;
1400     case AddStatus::AddTmpAndBgFetch:
1401         return addTempItemAndBGFetch(
1402                 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
1403     case AddStatus::BgFetch:
1404         hbl.getHTLock().unlock();
1405         bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1406         return ENGINE_EWOULDBLOCK;
1407     case AddStatus::Success:
1408     case AddStatus::UnDel:
1409         notifyNewSeqno(notifyCtx);
1410         itm.setBySeqno(v->getBySeqno());
1411         itm.setCas(v->getCas());
1412         break;
1413     }
1414     return ENGINE_SUCCESS;
1415 }
1416
1417 GetValue VBucket::getAndUpdateTtl(const DocKey& key,
1418                                   const void* cookie,
1419                                   EventuallyPersistentEngine& engine,
1420                                   int bgFetchDelay,
1421                                   time_t exptime) {
1422     auto hbl = ht.getLockedBucket(key);
1423     StoredValue* v = fetchValidValue(hbl,
1424                                      key,
1425                                      WantsDeleted::Yes,
1426                                      TrackReference::Yes,
1427                                      QueueExpired::Yes);
1428
1429     if (v) {
1430         if (v->isDeleted() || v->isTempDeletedItem() ||
1431             v->isTempNonExistentItem()) {
1432             return {};
1433         }
1434
1435         if (!v->isResident()) {
1436             bgFetch(key, cookie, engine, bgFetchDelay);
1437             return GetValue(nullptr, ENGINE_EWOULDBLOCK, v->getBySeqno());
1438         }
1439         if (v->isLocked(ep_current_time())) {
1440             return GetValue(nullptr, ENGINE_KEY_EEXISTS, 0);
1441         }
1442
1443         const bool exptime_mutated = exptime != v->getExptime();
1444         if (exptime_mutated) {
1445             v->markDirty();
1446             v->setExptime(exptime);
1447             v->setRevSeqno(v->getRevSeqno() + 1);
1448         }
1449
1450         GetValue rv(
1451                 v->toItem(v->isLocked(ep_current_time()), getId()).release(),
1452                 ENGINE_SUCCESS,
1453                 v->getBySeqno());
1454
1455         if (exptime_mutated) {
1456             VBNotifyCtx notifyCtx = queueDirty(*v);
1457             rv.getValue()->setCas(v->getCas());
1458             // we unlock ht lock here because we want to avoid potential lock
1459             // inversions arising from notifyNewSeqno() call
1460             hbl.getHTLock().unlock();
1461             notifyNewSeqno(notifyCtx);
1462         }
1463
1464         return rv;
1465     } else {
1466         if (eviction == VALUE_ONLY) {
1467             return {};
1468         } else {
1469             if (maybeKeyExistsInFilter(key)) {
1470                 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1471                         hbl, key, cookie, engine, bgFetchDelay, false);
1472                 return GetValue(NULL, ec, -1, true);
1473             } else {
1474                 // As bloomfilter predicted that item surely doesn't exist
1475                 // on disk, return ENOENT for getAndUpdateTtl().
1476                 return {};
1477             }
1478         }
1479     }
1480 }
1481
1482 MutationStatus VBucket::insertFromWarmup(Item& itm,
1483                                          bool eject,
1484                                          bool keyMetaDataOnly) {
1485     if (!StoredValue::hasAvailableSpace(stats, itm)) {
1486         return MutationStatus::NoMem;
1487     }
1488
1489     auto hbl = ht.getLockedBucket(itm.getKey());
1490     StoredValue* v = ht.unlocked_find(itm.getKey(),
1491                                       hbl.getBucketNum(),
1492                                       WantsDeleted::Yes,
1493                                       TrackReference::No);
1494
1495     if (v == NULL) {
1496         v = addNewStoredValue(hbl, itm, /*queueItmCtx*/ nullptr).first;
1497         if (keyMetaDataOnly) {
1498             v->markNotResident();
1499             /* For now ht stats are updated from outside ht. This seems to be
1500                a better option for now than passing a flag to
1501                addNewStoredValue() just for this func */
1502             ++(ht.numNonResidentItems);
1503         }
1504         /* For now ht stats are updated from outside ht. This seems to be
1505            a better option for now than passing a flag to
1506            addNewStoredValue() just for this func.
1507            We need to decrNumTotalItems because ht.numTotalItems is already
1508            set by warmup when it estimated the item count from disk */
1509         ht.decrNumTotalItems();
1510         v->setNewCacheItem(false);
1511     } else {
1512         if (keyMetaDataOnly) {
1513             // We don't have a better error code ;)
1514             return MutationStatus::InvalidCas;
1515         }
1516
1517         // Verify that the CAS isn't changed
1518         if (v->getCas() != itm.getCas()) {
1519             if (v->getCas() == 0) {
1520                 v->setCas(itm.getCas());
1521                 v->setFlags(itm.getFlags());
1522                 v->setExptime(itm.getExptime());
1523                 v->setRevSeqno(itm.getRevSeqno());
1524             } else {
1525                 return MutationStatus::InvalidCas;
1526             }
1527         }
1528         updateStoredValue(hbl, *v, itm, /*queueItmCtx*/ nullptr);
1529     }
1530
1531     v->markClean();
1532
1533     if (eject && !keyMetaDataOnly) {
1534         ht.unlocked_ejectItem(v, eviction);
1535     }
1536
1537     return MutationStatus::NotFound;
1538 }
1539
1540 GetValue VBucket::getInternal(const DocKey& key,
1541                               const void* cookie,
1542                               EventuallyPersistentEngine& engine,
1543                               int bgFetchDelay,
1544                               get_options_t options,
1545                               bool diskFlushAll) {
1546     const TrackReference trackReference = (options & TRACK_REFERENCE)
1547                                                   ? TrackReference::Yes
1548                                                   : TrackReference::No;
1549     const bool getDeletedValue = (options & GET_DELETED_VALUE);
1550     auto hbl = ht.getLockedBucket(key);
1551     StoredValue* v = fetchValidValue(
1552             hbl, key, WantsDeleted::Yes, trackReference, QueueExpired::Yes);
1553     if (v) {
1554         if (v->isDeleted() && !getDeletedValue) {
1555             return GetValue();
1556         }
1557         if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1558             // Delete a temp non-existent item to ensure that
1559             // if the get were issued over an item that doesn't
1560             // exist, then we dont preserve a temp item.
1561             if (options & DELETE_TEMP) {
1562                 deleteStoredValue(hbl, *v);
1563             }
1564             return GetValue();
1565         }
1566
1567         // If the value is not resident, wait for it...
1568         if (!v->isResident()) {
1569             return getInternalNonResident(
1570                     key, cookie, engine, bgFetchDelay, options, *v);
1571         }
1572
1573         // Should we hide (return -1) for the items' CAS?
1574         const bool hide_cas =
1575                 (options & HIDE_LOCKED_CAS) && v->isLocked(ep_current_time());
1576         return GetValue(v->toItem(hide_cas, getId()).release(),
1577                         ENGINE_SUCCESS,
1578                         v->getBySeqno(),
1579                         false,
1580                         v->getNRUValue());
1581     } else {
1582         if (!getDeletedValue && (eviction == VALUE_ONLY || diskFlushAll)) {
1583             return GetValue();
1584         }
1585
1586         if (maybeKeyExistsInFilter(key)) {
1587             ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1588             if (options &
1589                 QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
1590                 ec = addTempItemAndBGFetch(
1591                         hbl, key, cookie, engine, bgFetchDelay, false);
1592             }
1593             return GetValue(NULL, ec, -1, true);
1594         } else {
1595             // As bloomfilter predicted that item surely doesn't exist
1596             // on disk, return ENOENT, for getInternal().
1597             return GetValue();
1598         }
1599     }
1600 }
1601
1602 ENGINE_ERROR_CODE VBucket::getMetaData(const DocKey& key,
1603                                        const void* cookie,
1604                                        EventuallyPersistentEngine& engine,
1605                                        int bgFetchDelay,
1606                                        ItemMetaData& metadata,
1607                                        uint32_t& deleted,
1608                                        uint8_t& datatype) {
1609     deleted = 0;
1610     auto hbl = ht.getLockedBucket(key);
1611     StoredValue* v = ht.unlocked_find(
1612             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1613
1614     if (v) {
1615         stats.numOpsGetMeta++;
1616         if (v->isTempInitialItem()) {
1617             // Need bg meta fetch.
1618             bgFetch(key, cookie, engine, bgFetchDelay, true);
1619             return ENGINE_EWOULDBLOCK;
1620         } else if (v->isTempNonExistentItem()) {
1621             metadata.cas = v->getCas();
1622             return ENGINE_KEY_ENOENT;
1623         } else {
1624             if (v->isTempDeletedItem() || v->isDeleted() ||
1625                 v->isExpired(ep_real_time())) {
1626                 deleted |= GET_META_ITEM_DELETED_FLAG;
1627             }
1628
1629             if (v->isLocked(ep_current_time())) {
1630                 metadata.cas = static_cast<uint64_t>(-1);
1631             } else {
1632                 metadata.cas = v->getCas();
1633             }
1634             metadata.flags = v->getFlags();
1635             metadata.exptime = v->getExptime();
1636             metadata.revSeqno = v->getRevSeqno();
1637             datatype = v->getDatatype();
1638
1639             return ENGINE_SUCCESS;
1640         }
1641     } else {
1642         // The key wasn't found. However, this may be because it was previously
1643         // deleted or evicted with the full eviction strategy.
1644         // So, add a temporary item corresponding to the key to the hash table
1645         // and schedule a background fetch for its metadata from the persistent
1646         // store. The item's state will be updated after the fetch completes.
1647         //
1648         // Schedule this bgFetch only if the key is predicted to be may-be
1649         // existent on disk by the bloomfilter.
1650
1651         if (maybeKeyExistsInFilter(key)) {
1652             return addTempItemAndBGFetch(
1653                     hbl, key, cookie, engine, bgFetchDelay, true);
1654         } else {
1655             stats.numOpsGetMeta++;
1656             return ENGINE_KEY_ENOENT;
1657         }
1658     }
1659 }
1660
1661 ENGINE_ERROR_CODE VBucket::getKeyStats(const DocKey& key,
1662                                        const void* cookie,
1663                                        EventuallyPersistentEngine& engine,
1664                                        int bgFetchDelay,
1665                                        struct key_stats& kstats,
1666                                        WantsDeleted wantsDeleted) {
1667     auto hbl = ht.getLockedBucket(key);
1668     StoredValue* v = fetchValidValue(hbl,
1669                                      key,
1670                                      WantsDeleted::Yes,
1671                                      TrackReference::Yes,
1672                                      QueueExpired::Yes);
1673
1674     if (v) {
1675         if ((v->isDeleted() && wantsDeleted == WantsDeleted::No) ||
1676             v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1677             return ENGINE_KEY_ENOENT;
1678         }
1679         if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
1680             hbl.getHTLock().unlock();
1681             bgFetch(key, cookie, engine, bgFetchDelay, true);
1682             return ENGINE_EWOULDBLOCK;
1683         }
1684         kstats.logically_deleted = v->isDeleted();
1685         kstats.dirty = v->isDirty();
1686         kstats.exptime = v->getExptime();
1687         kstats.flags = v->getFlags();
1688         kstats.cas = v->getCas();
1689         kstats.vb_state = getState();
1690         return ENGINE_SUCCESS;
1691     } else {
1692         if (eviction == VALUE_ONLY) {
1693             return ENGINE_KEY_ENOENT;
1694         } else {
1695             if (maybeKeyExistsInFilter(key)) {
1696                 return addTempItemAndBGFetch(
1697                         hbl, key, cookie, engine, bgFetchDelay, true);
1698             } else {
1699                 // If bgFetch were false, or bloomfilter predicted that
1700                 // item surely doesn't exist on disk, return ENOENT for
1701                 // getKeyStats().
1702                 return ENGINE_KEY_ENOENT;
1703             }
1704         }
1705     }
1706 }
1707
1708 GetValue VBucket::getLocked(const DocKey& key,
1709                             rel_time_t currentTime,
1710                             uint32_t lockTimeout,
1711                             const void* cookie,
1712                             EventuallyPersistentEngine& engine,
1713                             int bgFetchDelay) {
1714     auto hbl = ht.getLockedBucket(key);
1715     StoredValue* v = fetchValidValue(hbl,
1716                                      key,
1717                                      WantsDeleted::Yes,
1718                                      TrackReference::Yes,
1719                                      QueueExpired::Yes);
1720
1721     if (v) {
1722         if (v->isDeleted() || v->isTempNonExistentItem() ||
1723             v->isTempDeletedItem()) {
1724             return GetValue(NULL, ENGINE_KEY_ENOENT);
1725         }
1726
1727         // if v is locked return error
1728         if (v->isLocked(currentTime)) {
1729             return GetValue(NULL, ENGINE_TMPFAIL);
1730         }
1731
1732         // If the value is not resident, wait for it...
1733         if (!v->isResident()) {
1734             if (cookie) {
1735                 bgFetch(key, cookie, engine, bgFetchDelay);
1736             }
1737             return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
1738         }
1739
1740         // acquire lock and increment cas value
1741         v->lock(currentTime + lockTimeout);
1742
1743         auto it = v->toItem(false, getId());
1744         it->setCas(nextHLCCas());
1745         v->setCas(it->getCas());
1746
1747         return GetValue(it.release());
1748
1749     } else {
1750         // No value found in the hashtable.
1751         switch (eviction) {
1752         case VALUE_ONLY:
1753             return GetValue(NULL, ENGINE_KEY_ENOENT);
1754
1755         case FULL_EVICTION:
1756             if (maybeKeyExistsInFilter(key)) {
1757                 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1758                         hbl, key, cookie, engine, bgFetchDelay, false);
1759                 return GetValue(NULL, ec, -1, true);
1760             } else {
1761                 // As bloomfilter predicted that item surely doesn't exist
1762                 // on disk, return ENOENT for getLocked().
1763                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1764             }
1765         }
1766         return GetValue(); // just to prevent compiler warning
1767     }
1768 }
1769
1770 void VBucket::deletedOnDiskCbk(const Item& queuedItem, bool deleted) {
1771     auto hbl = ht.getLockedBucket(queuedItem.getKey());
1772     StoredValue* v = fetchValidValue(hbl,
1773                                      queuedItem.getKey(),
1774                                      WantsDeleted::Yes,
1775                                      TrackReference::No,
1776                                      QueueExpired::Yes);
1777     // Delete the item in the hash table iff:
1778     //  1. Item is existent in hashtable, and deleted flag is true
1779     //  2. rev seqno of queued item matches rev seqno of hash table item
1780     if (v && v->isDeleted() && (queuedItem.getRevSeqno() == v->getRevSeqno())) {
1781         bool isDeleted = deleteStoredValue(hbl, *v);
1782         if (!isDeleted) {
1783             throw std::logic_error(
1784                     "deletedOnDiskCbk:callback: "
1785                     "Failed to delete key with seqno:" +
1786                     std::to_string(v->getBySeqno()) + "' from bucket " +
1787                     std::to_string(hbl.getBucketNum()));
1788         }
1789
1790         /**
1791          * Deleted items are to be added to the bloomfilter,
1792          * in either eviction policy.
1793          */
1794         addToFilter(queuedItem.getKey());
1795     }
1796
1797     if (deleted) {
1798         ++stats.totalPersisted;
1799         ++opsDelete;
1800     }
1801     doStatsForFlushing(queuedItem, queuedItem.size());
1802     --stats.diskQueueSize;
1803     decrMetaDataDisk(queuedItem);
1804 }
1805
1806 bool VBucket::deleteKey(const DocKey& key) {
1807     auto hbl = ht.getLockedBucket(key);
1808     StoredValue* v = ht.unlocked_find(
1809             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1810     if (!v) {
1811         return false;
1812     }
1813     return deleteStoredValue(hbl, *v);
1814 }
1815
1816 void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
1817                                   uint64_t prevHighSeqno) {
1818     failovers->pruneEntries(rollbackResult.highSeqno);
1819     checkpointManager.clear(*this, rollbackResult.highSeqno);
1820     setPersistedSnapshot(rollbackResult.snapStartSeqno,
1821                          rollbackResult.snapEndSeqno);
1822     incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
1823     checkpointManager.setOpenCheckpointId(1);
1824 }
1825
1826 void VBucket::dump() const {
1827     std::cerr << "VBucket[" << this << "] with state: " << toString(getState())
1828               << " numItems:" << getNumItems()
1829               << " numNonResident:" << getNumNonResidentItems()
1830               << " ht: " << std::endl << "  " << ht << std::endl
1831               << "]" << std::endl;
1832 }
1833
1834 void VBucket::_addStats(bool details, ADD_STAT add_stat, const void* c) {
1835     addStat(NULL, toString(state), add_stat, c);
1836     if (details) {
1837         size_t numItems = getNumItems();
1838         size_t tempItems = getNumTempItems();
1839         addStat("num_items", numItems, add_stat, c);
1840         addStat("num_temp_items", tempItems, add_stat, c);
1841         addStat("num_non_resident", getNumNonResidentItems(), add_stat, c);
1842         addStat("ht_memory", ht.memorySize(), add_stat, c);
1843         addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
1844         addStat("ht_cache_size", ht.cacheSize.load(), add_stat, c);
1845         addStat("ht_size", ht.getSize(), add_stat, c);
1846         addStat("num_ejects", ht.getNumEjects(), add_stat, c);
1847         addStat("ops_create", opsCreate.load(), add_stat, c);
1848         addStat("ops_update", opsUpdate.load(), add_stat, c);
1849         addStat("ops_delete", opsDelete.load(), add_stat, c);
1850         addStat("ops_reject", opsReject.load(), add_stat, c);
1851         addStat("queue_size", dirtyQueueSize.load(), add_stat, c);
1852         addStat("backfill_queue_size", getBackfillSize(), add_stat, c);
1853         addStat("queue_memory", dirtyQueueMem.load(), add_stat, c);
1854         addStat("queue_fill", dirtyQueueFill.load(), add_stat, c);
1855         addStat("queue_drain", dirtyQueueDrain.load(), add_stat, c);
1856         addStat("queue_age", getQueueAge(), add_stat, c);
1857         addStat("pending_writes", dirtyQueuePendingWrites.load(), add_stat, c);
1858
1859         addStat("high_seqno", getHighSeqno(), add_stat, c);
1860         addStat("uuid", failovers->getLatestUUID(), add_stat, c);
1861         addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
1862         addStat("bloom_filter", getFilterStatusString().data(),
1863                 add_stat, c);
1864         addStat("bloom_filter_size", getFilterSize(), add_stat, c);
1865         addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
1866         addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
1867         addStat("hp_vb_req_size", getHighPriorityChkSize(), add_stat, c);
1868         hlc.addStats(statPrefix, add_stat, c);
1869     }
1870 }
1871
1872 void VBucket::decrDirtyQueueMem(size_t decrementBy)
1873 {
1874     size_t oldVal, newVal;
1875     do {
1876         oldVal = dirtyQueueMem.load(std::memory_order_relaxed);
1877         if (oldVal < decrementBy) {
1878             newVal = 0;
1879         } else {
1880             newVal = oldVal - decrementBy;
1881         }
1882     } while (!dirtyQueueMem.compare_exchange_strong(oldVal, newVal));
1883 }
1884
1885 void VBucket::decrDirtyQueueAge(uint32_t decrementBy)
1886 {
1887     uint64_t oldVal, newVal;
1888     do {
1889         oldVal = dirtyQueueAge.load(std::memory_order_relaxed);
1890         if (oldVal < decrementBy) {
1891             newVal = 0;
1892         } else {
1893             newVal = oldVal - decrementBy;
1894         }
1895     } while (!dirtyQueueAge.compare_exchange_strong(oldVal, newVal));
1896 }
1897
1898 void VBucket::decrDirtyQueuePendingWrites(size_t decrementBy)
1899 {
1900     size_t oldVal, newVal;
1901     do {
1902         oldVal = dirtyQueuePendingWrites.load(std::memory_order_relaxed);
1903         if (oldVal < decrementBy) {
1904             newVal = 0;
1905         } else {
1906             newVal = oldVal - decrementBy;
1907         }
1908     } while (!dirtyQueuePendingWrites.compare_exchange_strong(oldVal, newVal));
1909 }
1910
1911 std::pair<MutationStatus, VBNotifyCtx> VBucket::processSet(
1912         const HashTable::HashBucketLock& hbl,
1913         StoredValue*& v,
1914         Item& itm,
1915         uint64_t cas,
1916         bool allowExisting,
1917         bool hasMetaData,
1918         const VBQueueItemCtx* queueItmCtx,
1919         bool maybeKeyExists,
1920         bool isReplication) {
1921     if (!hbl.getHTLock()) {
1922         throw std::invalid_argument(
1923                 "VBucket::processSet: htLock not held for "
1924                 "VBucket " +
1925                 std::to_string(getId()));
1926     }
1927
1928     if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
1929         return {MutationStatus::NoMem, VBNotifyCtx()};
1930     }
1931
1932     if (cas && eviction == FULL_EVICTION && maybeKeyExists) {
1933         if (!v || v->isTempInitialItem()) {
1934             return {MutationStatus::NeedBgFetch, VBNotifyCtx()};
1935         }
1936     }
1937
1938     /*
1939      * prior to checking for the lock, we should check if this object
1940      * has expired. If so, then check if CAS value has been provided
1941      * for this set op. In this case the operation should be denied since
1942      * a cas operation for a key that doesn't exist is not a very cool
1943      * thing to do. See MB 3252
1944      */
1945     if (v && v->isExpired(ep_real_time()) && !hasMetaData && !itm.isDeleted()) {
1946         if (v->isLocked(ep_current_time())) {
1947             v->unlock();
1948         }
1949         if (cas) {
1950             /* item has expired and cas value provided. Deny ! */
1951             return {MutationStatus::NotFound, VBNotifyCtx()};
1952         }
1953     }
1954
1955     if (v) {
1956         if (!allowExisting && !v->isTempItem()) {
1957             return {MutationStatus::InvalidCas, VBNotifyCtx()};
1958         }
1959         if (v->isLocked(ep_current_time())) {
1960             /*
1961              * item is locked, deny if there is cas value mismatch
1962              * or no cas value is provided by the user
1963              */
1964             if (cas != v->getCas()) {
1965                 return {MutationStatus::IsLocked, VBNotifyCtx()};
1966             }
1967             /* allow operation*/
1968             v->unlock();
1969         } else if (cas && cas != v->getCas()) {
1970             if (v->isTempNonExistentItem()) {
1971                 // This is a temporary item which marks a key as non-existent;
1972                 // therefore specifying a non-matching CAS should be exposed
1973                 // as item not existing.
1974                 return {MutationStatus::NotFound, VBNotifyCtx()};
1975             }
1976             if ((v->isTempDeletedItem() || v->isDeleted()) && !itm.isDeleted()) {
1977                 // Existing item is deleted, and we are not replacing it with
1978                 // a (different) deleted value - return not existing.
1979                 return {MutationStatus::NotFound, VBNotifyCtx()};
1980             }
1981             // None of the above special cases; the existing item cannot be
1982             // modified with the specified CAS.
1983             return {MutationStatus::InvalidCas, VBNotifyCtx()};
1984         }
1985         if (!hasMetaData) {
1986             itm.setRevSeqno(v->getRevSeqno() + 1);
1987             /* MB-23530: We must ensure that a replace operation (i.e.
1988              * set with a CAS) /fails/ if the old document is deleted; it
1989              * logically "doesn't exist". However, if the new value is deleted
1990              * this op is a /delete/ with a CAS and we must permit a
1991              * deleted -> deleted transition for Deleted Bodies.
1992              */
1993             if (cas && (v->isDeleted() || v->isTempDeletedItem()) &&
1994                 !itm.isDeleted()) {
1995                 return {MutationStatus::NotFound, VBNotifyCtx()};
1996             }
1997         }
1998
1999         MutationStatus status;
2000         VBNotifyCtx notifyCtx;
2001         std::tie(v, status, notifyCtx) =
2002                 updateStoredValue(hbl, *v, itm, queueItmCtx);
2003         return {status, notifyCtx};
2004     } else if (cas != 0) {
2005         return {MutationStatus::NotFound, VBNotifyCtx()};
2006     } else {
2007         VBNotifyCtx notifyCtx;
2008         std::tie(v, notifyCtx) = addNewStoredValue(hbl, itm, queueItmCtx);
2009         if (!hasMetaData) {
2010             updateRevSeqNoOfNewStoredValue(*v);
2011             itm.setRevSeqno(v->getRevSeqno());
2012         }
2013         return {MutationStatus::WasClean, notifyCtx};
2014     }
2015 }
2016
2017 std::pair<AddStatus, VBNotifyCtx> VBucket::processAdd(
2018         const HashTable::HashBucketLock& hbl,
2019         StoredValue*& v,
2020         Item& itm,
2021         bool maybeKeyExists,
2022         bool isReplication,
2023         const VBQueueItemCtx* queueItmCtx) {
2024     if (!hbl.getHTLock()) {
2025         throw std::invalid_argument(
2026                 "VBucket::processAdd: htLock not held for "
2027                 "VBucket " +
2028                 std::to_string(getId()));
2029     }
2030
2031     if (v && !v->isDeleted() && !v->isExpired(ep_real_time()) &&
2032         !v->isTempItem()) {
2033         return {AddStatus::Exists, VBNotifyCtx()};
2034     }
2035     if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
2036         return {AddStatus::NoMem, VBNotifyCtx()};
2037     }
2038
2039     std::pair<AddStatus, VBNotifyCtx> rv = {AddStatus::Success, VBNotifyCtx()};
2040
2041     if (v) {
2042         if (v->isTempInitialItem() && eviction == FULL_EVICTION &&
2043             maybeKeyExists) {
2044             // Need to figure out if an item exists on disk
2045             return {AddStatus::BgFetch, VBNotifyCtx()};
2046         }
2047
2048         rv.first = (v->isDeleted() || v->isExpired(ep_real_time()))
2049                            ? AddStatus::UnDel
2050                            : AddStatus::Success;
2051
2052         if (v->isTempDeletedItem()) {
2053             itm.setRevSeqno(v->getRevSeqno() + 1);
2054         } else {
2055             itm.setRevSeqno(ht.getMaxDeletedRevSeqno() + 1);
2056         }
2057
2058         if (!v->isTempItem()) {
2059             itm.setRevSeqno(v->getRevSeqno() + 1);
2060         }
2061
2062         std::tie(v, std::ignore, rv.second) =
2063                 updateStoredValue(hbl, *v, itm, queueItmCtx);
2064     } else {
2065         if (itm.getBySeqno() != StoredValue::state_temp_init) {
2066             if (eviction == FULL_EVICTION && maybeKeyExists) {
2067                 return {AddStatus::AddTmpAndBgFetch, VBNotifyCtx()};
2068             }
2069         }
2070         std::tie(v, rv.second) = addNewStoredValue(hbl, itm, queueItmCtx);
2071         updateRevSeqNoOfNewStoredValue(*v);
2072         itm.setRevSeqno(v->getRevSeqno());
2073         if (v->isTempItem()) {
2074             rv.first = AddStatus::BgFetch;
2075         }
2076     }
2077
2078     if (v->isTempItem()) {
2079         v->setNRUValue(MAX_NRU_VALUE);
2080     }
2081     return rv;
2082 }
2083
2084 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2085 VBucket::processSoftDelete(const HashTable::HashBucketLock& hbl,
2086                            StoredValue& v,
2087                            uint64_t cas,
2088                            const ItemMetaData& metadata,
2089                            const VBQueueItemCtx& queueItmCtx,
2090                            bool use_meta,
2091                            uint64_t bySeqno) {
2092     if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2093         return std::make_tuple(MutationStatus::NeedBgFetch, &v, VBNotifyCtx());
2094     }
2095
2096     if (v.isLocked(ep_current_time())) {
2097         if (cas != v.getCas()) {
2098             return std::make_tuple(MutationStatus::IsLocked, &v, VBNotifyCtx());
2099         }
2100         v.unlock();
2101     }
2102
2103     if (cas != 0 && cas != v.getCas()) {
2104         return std::make_tuple(MutationStatus::InvalidCas, &v, VBNotifyCtx());
2105     }
2106
2107     /* allow operation */
2108     v.unlock();
2109
2110     MutationStatus rv =
2111             v.isDirty() ? MutationStatus::WasDirty : MutationStatus::WasClean;
2112
2113     if (use_meta) {
2114         v.setCas(metadata.cas);
2115         v.setFlags(metadata.flags);
2116         v.setExptime(metadata.exptime);
2117     }
2118
2119     v.setRevSeqno(metadata.revSeqno);
2120     VBNotifyCtx notifyCtx;
2121     StoredValue* newSv;
2122     std::tie(newSv, notifyCtx) =
2123             softDeleteStoredValue(hbl,
2124                                   v,
2125                                   /*onlyMarkDeleted*/ false,
2126                                   queueItmCtx,
2127                                   bySeqno);
2128     ht.updateMaxDeletedRevSeqno(metadata.revSeqno);
2129     return std::make_tuple(rv, newSv, notifyCtx);
2130 }
2131
2132 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2133 VBucket::processExpiredItem(const HashTable::HashBucketLock& hbl,
2134                             StoredValue& v) {
2135     if (!hbl.getHTLock()) {
2136         throw std::invalid_argument(
2137                 "VBucket::processExpiredItem: htLock not held for VBucket " +
2138                 std::to_string(getId()));
2139     }
2140
2141     if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2142         return std::make_tuple(MutationStatus::NeedBgFetch,
2143                                &v,
2144                                queueDirty(v,
2145                                           GenerateBySeqno::Yes,
2146                                           GenerateCas::Yes,
2147                                           /*isBackfillItem*/ false));
2148     }
2149
2150     /* If the datatype is XATTR, mark the item as deleted
2151      * but don't delete the value as system xattrs can
2152      * still be queried by mobile clients even after
2153      * deletion.
2154      * TODO: The current implementation is inefficient
2155      * but functionally correct and for performance reasons
2156      * only the system xattrs need to be stored.
2157      */
2158     value_t value = v.getValue();
2159     bool onlyMarkDeleted =
2160             value && mcbp::datatype::is_xattr(value->getDataType());
2161     v.setRevSeqno(v.getRevSeqno() + 1);
2162     VBNotifyCtx notifyCtx;
2163     StoredValue* newSv;
2164     std::tie(newSv, notifyCtx) =
2165             softDeleteStoredValue(hbl,
2166                                   v,
2167                                   onlyMarkDeleted,
2168                                   VBQueueItemCtx(GenerateBySeqno::Yes,
2169                                                  GenerateCas::Yes,
2170                                                  TrackCasDrift::No,
2171                                                  /*isBackfillItem*/ false,
2172                                                  nullptr /* no pre link */),
2173                                   v.getBySeqno());
2174     ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno() + 1);
2175     return std::make_tuple(MutationStatus::NotFound, newSv, notifyCtx);
2176 }
2177
2178 bool VBucket::deleteStoredValue(const HashTable::HashBucketLock& hbl,
2179                                 StoredValue& v) {
2180     if (!v.isDeleted() && v.isLocked(ep_current_time())) {
2181         return false;
2182     }
2183
2184     /* StoredValue deleted here. If any other in-memory data structures are
2185        using the StoredValue intrusively then they must have handled the delete
2186        by this point */
2187     ht.unlocked_del(hbl, v.getKey());
2188     return true;
2189 }
2190
2191 AddStatus VBucket::addTempStoredValue(const HashTable::HashBucketLock& hbl,
2192                                       const DocKey& key,
2193                                       bool isReplication) {
2194     uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_RAW_BYTES};
2195     static_assert(sizeof(ext_meta) == 1,
2196                   "VBucket::addTempStoredValue(): expected "
2197                   "EXT_META_LEN to be 1");
2198     Item itm(key,
2199              /*flags*/ 0,
2200              /*exp*/ 0,
2201              /*data*/ NULL,
2202              /*size*/ 0,
2203              ext_meta,
2204              sizeof(ext_meta),
2205              0,
2206              StoredValue::state_temp_init);
2207
2208     /* if a temp item for a possibly deleted, set it non-resident by resetting
2209        the value cuz normally a new item added is considered resident which
2210        does not apply for temp item. */
2211     StoredValue* v = nullptr;
2212     return processAdd(hbl, v, itm, true, isReplication, nullptr).first;
2213 }
2214
2215 void VBucket::notifyNewSeqno(const VBNotifyCtx& notifyCtx) {
2216     if (newSeqnoCb) {
2217         newSeqnoCb->callback(getId(), notifyCtx);
2218     }
2219 }
2220
2221 /*
2222  * Queue the item to the checkpoint and return the seqno the item was
2223  * allocated.
2224  */
2225 int64_t VBucket::queueItem(Item* item, OptionalSeqno seqno) {
2226     item->setVBucketId(id);
2227     queued_item qi(item);
2228     checkpointManager.queueDirty(
2229             *this,
2230             qi,
2231             seqno ? GenerateBySeqno::No : GenerateBySeqno::Yes,
2232             GenerateCas::Yes,
2233             nullptr /* No pre link step as this is for system events */);
2234     VBNotifyCtx notifyCtx;
2235     // If the seqno is initialized, skip replication notification
2236     notifyCtx.notifyReplication = !seqno.is_initialized();
2237     notifyCtx.notifyFlusher = true;
2238     notifyCtx.bySeqno = qi->getBySeqno();
2239     notifyNewSeqno(notifyCtx);
2240     return qi->getBySeqno();
2241 }
2242
2243 VBNotifyCtx VBucket::queueDirty(StoredValue& v,
2244                                 const VBQueueItemCtx& queueItmCtx) {
2245     if (queueItmCtx.trackCasDrift == TrackCasDrift::Yes) {
2246         setMaxCasAndTrackDrift(v.getCas());
2247     }
2248     return queueDirty(v,
2249                       queueItmCtx.genBySeqno,
2250                       queueItmCtx.genCas,
2251                       queueItmCtx.isBackfillItem,
2252                       queueItmCtx.preLinkDocumentContext);
2253 }
2254
2255 void VBucket::updateRevSeqNoOfNewStoredValue(StoredValue& v) {
2256     /**
2257      * Possibly, this item is being recreated. Conservatively assign it
2258      * a seqno that is greater than the greatest seqno of all deleted
2259      * items seen so far.
2260      */
2261     uint64_t seqno = ht.getMaxDeletedRevSeqno();
2262     if (!v.isTempItem()) {
2263         ++seqno;
2264     }
2265     v.setRevSeqno(seqno);
2266 }
2267
2268 void VBucket::addHighPriorityVBEntry(uint64_t seqnoOrChkId,
2269                                      const void* cookie,
2270                                      HighPriorityVBNotify reqType) {
2271     std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2272     hpVBReqs.push_back(HighPriorityVBEntry(cookie, seqnoOrChkId, reqType));
2273     numHpVBReqs.store(hpVBReqs.size());
2274
2275     LOG(EXTENSION_LOG_NOTICE,
2276         "Added high priority async request %s "
2277         "for vb:%" PRIu16 ", Check for:%" PRIu64 ", "
2278         "Persisted upto:%" PRIu64 ", cookie:%p",
2279         to_string(reqType).c_str(),
2280         getId(),
2281         seqnoOrChkId,
2282         getPersistenceSeqno(),
2283         cookie);
2284 }
2285
2286 std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifications(
2287         EventuallyPersistentEngine& engine,
2288         uint64_t idNum,
2289         HighPriorityVBNotify notifyType) {
2290     std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2291     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2292
2293     auto entry = hpVBReqs.begin();
2294
2295     while (entry != hpVBReqs.end()) {
2296         if (notifyType != entry->reqType) {
2297             ++entry;
2298             continue;
2299         }
2300
2301         std::string logStr(to_string(notifyType));
2302
2303         hrtime_t wall_time(gethrtime() - entry->start);
2304         size_t spent = wall_time / 1000000000;
2305         if (entry->id <= idNum) {
2306             toNotify[entry->cookie] = ENGINE_SUCCESS;
2307             stats.chkPersistenceHisto.add(wall_time / 1000);
2308             adjustCheckpointFlushTimeout(wall_time / 1000000000);
2309             LOG(EXTENSION_LOG_NOTICE,
2310                 "Notified the completion of %s "
2311                 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2312                 ", "
2313                 "Persisted upto: %" PRIu64 ", cookie %p",
2314                 logStr.c_str(),
2315                 getId(),
2316                 entry->id,
2317                 idNum,
2318                 entry->cookie);
2319             entry = hpVBReqs.erase(entry);
2320         } else if (spent > getCheckpointFlushTimeout()) {
2321             adjustCheckpointFlushTimeout(spent);
2322             engine.storeEngineSpecific(entry->cookie, NULL);
2323             toNotify[entry->cookie] = ENGINE_TMPFAIL;
2324             LOG(EXTENSION_LOG_WARNING,
2325                 "Notified the timeout on %s "
2326                 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2327                 ", "
2328                 "Persisted upto: %" PRIu64 ", cookie %p",
2329                 logStr.c_str(),
2330                 getId(),
2331                 entry->id,
2332                 idNum,
2333                 entry->cookie);
2334             entry = hpVBReqs.erase(entry);
2335         } else {
2336             ++entry;
2337         }
2338     }
2339     numHpVBReqs.store(hpVBReqs.size());
2340     return toNotify;
2341 }
2342
2343 std::map<const void*, ENGINE_ERROR_CODE> VBucket::tmpFailAndGetAllHpNotifies(
2344         EventuallyPersistentEngine& engine) {
2345     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2346
2347     LockHolder lh(hpVBReqsMutex);
2348
2349     for (auto& entry : hpVBReqs) {
2350         toNotify[entry.cookie] = ENGINE_TMPFAIL;
2351         engine.storeEngineSpecific(entry.cookie, NULL);
2352     }
2353     hpVBReqs.clear();
2354
2355     return toNotify;
2356 }
2357
2358 void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
2359     size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
2360
2361     if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
2362         chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
2363     } else if (wall_time <= middle) {
2364         chkFlushTimeout = middle;
2365     } else {
2366         chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
2367     }
2368 }
2369
2370 size_t VBucket::getCheckpointFlushTimeout() {
2371     return chkFlushTimeout;
2372 }
2373
2374 std::unique_ptr<Item> VBucket::pruneXattrDocument(
2375         StoredValue& v, const ItemMetaData& itemMeta) {
2376     // Need to take a copy of the value, prune it, and add it back
2377
2378     // Create work-space document
2379     std::vector<uint8_t> workspace(v.getValue()->vlength());
2380     std::copy_n(v.getValue()->getData(),
2381                 v.getValue()->vlength(),
2382                 workspace.begin());
2383
2384     // Now attach to the XATTRs in the document
2385     auto sz = cb::xattr::get_body_offset(
2386             {reinterpret_cast<char*>(workspace.data()), workspace.size()});
2387
2388     cb::xattr::Blob xattr({workspace.data(), sz});
2389     xattr.prune_user_keys();
2390
2391     auto prunedXattrs = xattr.finalize();
2392
2393     if (prunedXattrs.size()) {
2394         // Something remains - Create a Blob and copy-in just the XATTRs
2395         auto newValue =
2396                 Blob::New(reinterpret_cast<const char*>(prunedXattrs.data()),
2397                           prunedXattrs.size(),
2398                           const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(
2399                                   v.getValue()->getExtMeta())),
2400                           v.getValue()->getExtLen());
2401
2402         return std::make_unique<Item>(v.getKey(),
2403                                       itemMeta.flags,
2404                                       itemMeta.exptime,
2405                                       newValue,
2406                                       itemMeta.cas,
2407                                       v.getBySeqno(),
2408                                       getId(),
2409                                       itemMeta.revSeqno);
2410     } else {
2411         return {};
2412     }
2413 }
2414
2415 void VBucket::DeferredDeleter::operator()(VBucket* vb) const {
2416     // If the vbucket is marked as deleting then we must schedule task to
2417     // perform the resource destruction (memory/disk).
2418     if (vb->isDeletionDeferred()) {
2419         vb->scheduleDeferredDeletion(engine);
2420         return;
2421     }
2422     delete vb;
2423 }