85c90949700c40bc848fe80c73a40dbef2c60eed
[ep-engine.git] / src / vbucket.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <functional>
21 #include <list>
22 #include <set>
23 #include <string>
24 #include <vector>
25
26 #include "atomic.h"
27 #include "bgfetcher.h"
28 #include "conflict_resolution.h"
29 #include "ep_engine.h"
30 #include "ep_types.h"
31 #include "failover-table.h"
32 #include "flusher.h"
33 #include "pre_link_document_context.h"
34
35 #define STATWRITER_NAMESPACE vbucket
36 #include "statwriter.h"
37 #undef STATWRITER_NAMESPACE
38 #include "stored_value_factories.h"
39
40 #include "vbucket.h"
41
42 #include <xattr/blob.h>
43 #include <xattr/utils.h>
44
45 /* Macros */
46 const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
47 const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
48
49 /* Statics definitions */
50 std::atomic<size_t> VBucket::chkFlushTimeout(MIN_CHK_FLUSH_TIMEOUT);
51
52 VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
53     std::vector<uint16_t> tmp(acceptable.size() + other.size());
54     std::vector<uint16_t>::iterator end;
55     end = std::set_symmetric_difference(acceptable.begin(),
56                                         acceptable.end(),
57                                         other.acceptable.begin(),
58                                         other.acceptable.end(),
59                                         tmp.begin());
60     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
61 }
62
63 VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
64                                                                         const {
65     std::vector<uint16_t> tmp(acceptable.size() + other.size());
66     std::vector<uint16_t>::iterator end;
67
68     end = std::set_intersection(acceptable.begin(), acceptable.end(),
69                                 other.acceptable.begin(),
70                                 other.acceptable.end(),
71                                 tmp.begin());
72     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
73 }
74
75 static bool isRange(std::set<uint16_t>::const_iterator it,
76                     const std::set<uint16_t>::const_iterator &end,
77                     size_t &length)
78 {
79     length = 0;
80     for (uint16_t val = *it;
81          it != end && (val + length) == *it;
82          ++it, ++length) {
83         // empty
84     }
85
86     --length;
87
88     return length > 1;
89 }
90
91 std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
92 {
93     std::set<uint16_t>::const_iterator it;
94
95     if (filter.acceptable.empty()) {
96         out << "{ empty }";
97     } else {
98         bool needcomma = false;
99         out << "{ ";
100         for (it = filter.acceptable.begin();
101              it != filter.acceptable.end();
102              ++it) {
103             if (needcomma) {
104                 out << ", ";
105             }
106
107             size_t length;
108             if (isRange(it, filter.acceptable.end(), length)) {
109                 std::set<uint16_t>::iterator last = it;
110                 for (size_t i = 0; i < length; ++i) {
111                     ++last;
112                 }
113                 out << "[" << *it << "," << *last << "]";
114                 it = last;
115             } else {
116                 out << *it;
117             }
118             needcomma = true;
119         }
120         out << " }";
121     }
122
123     return out;
124 }
125
126 const vbucket_state_t VBucket::ACTIVE =
127                      static_cast<vbucket_state_t>(htonl(vbucket_state_active));
128 const vbucket_state_t VBucket::REPLICA =
129                     static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
130 const vbucket_state_t VBucket::PENDING =
131                     static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
132 const vbucket_state_t VBucket::DEAD =
133                     static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
134
135 VBucket::VBucket(id_type i,
136                  vbucket_state_t newState,
137                  EPStats& st,
138                  CheckpointConfig& chkConfig,
139                  int64_t lastSeqno,
140                  uint64_t lastSnapStart,
141                  uint64_t lastSnapEnd,
142                  std::unique_ptr<FailoverTable> table,
143                  std::shared_ptr<Callback<id_type> > flusherCb,
144                  std::unique_ptr<AbstractStoredValueFactory> valFact,
145                  NewSeqnoCallback newSeqnoCb,
146                  Configuration& config,
147                  item_eviction_policy_t evictionPolicy,
148                  vbucket_state_t initState,
149                  uint64_t purgeSeqno,
150                  uint64_t maxCas,
151                  const std::string& collectionsManifest)
152     : ht(st, std::move(valFact)),
153       checkpointManager(st,
154                         i,
155                         chkConfig,
156                         lastSeqno,
157                         lastSnapStart,
158                         lastSnapEnd,
159                         flusherCb),
160       failovers(std::move(table)),
161       opsCreate(0),
162       opsUpdate(0),
163       opsDelete(0),
164       opsReject(0),
165       dirtyQueueSize(0),
166       dirtyQueueMem(0),
167       dirtyQueueFill(0),
168       dirtyQueueDrain(0),
169       dirtyQueueAge(0),
170       dirtyQueuePendingWrites(0),
171       metaDataDisk(0),
172       numExpiredItems(0),
173       eviction(evictionPolicy),
174       stats(st),
175       persistenceSeqno(0),
176       numHpVBReqs(0),
177       id(i),
178       state(newState),
179       initialState(initState),
180       purge_seqno(purgeSeqno),
181       takeover_backed_up(false),
182       persisted_snapshot_start(lastSnapStart),
183       persisted_snapshot_end(lastSnapEnd),
184       rollbackItemCount(0),
185       hlc(maxCas,
186           std::chrono::microseconds(config.getHlcDriftAheadThresholdUs()),
187           std::chrono::microseconds(config.getHlcDriftBehindThresholdUs())),
188       statPrefix("vb_" + std::to_string(i)),
189       persistenceCheckpointId(0),
190       bucketCreation(false),
191       bucketDeletion(false),
192       newSeqnoCb(std::move(newSeqnoCb)),
193       manifest(collectionsManifest) {
194     if (config.getConflictResolutionType().compare("lww") == 0) {
195         conflictResolver.reset(new LastWriteWinsResolution());
196     } else {
197         conflictResolver.reset(new RevisionSeqnoResolution());
198     }
199
200     backfill.isBackfillPhase = false;
201     pendingOpsStart = 0;
202     stats.memOverhead->fetch_add(sizeof(VBucket)
203                                 + ht.memorySize() + sizeof(CheckpointManager));
204     LOG(EXTENSION_LOG_NOTICE,
205         "VBucket: created vbucket:%" PRIu16 " with state:%s "
206                 "initialState:%s "
207                 "lastSeqno:%" PRIu64 " "
208                 "lastSnapshot:{%" PRIu64 ",%" PRIu64 "} "
209                 "persisted_snapshot:{%" PRIu64 ",%" PRIu64 "} "
210                 "max_cas:%" PRIu64,
211         id, VBucket::toString(state), VBucket::toString(initialState),
212         lastSeqno, lastSnapStart, lastSnapEnd,
213         persisted_snapshot_start, persisted_snapshot_end,
214         getMaxCas());
215 }
216
217 VBucket::~VBucket() {
218     if (!pendingOps.empty()) {
219         LOG(EXTENSION_LOG_WARNING,
220             "~Vbucket(): vbucket:%" PRIu16 " has %ld pending ops",
221             id,
222             pendingOps.size());
223     }
224
225     stats.diskQueueSize.fetch_sub(dirtyQueueSize.load());
226     stats.vbBackfillQueueSize.fetch_sub(getBackfillSize());
227
228     // Clear out the bloomfilter(s)
229     clearFilter();
230
231     stats.memOverhead->fetch_sub(sizeof(VBucket) + ht.memorySize() +
232                                 sizeof(CheckpointManager));
233
234     LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
235 }
236
237 void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
238                          ENGINE_ERROR_CODE code) {
239     std::unique_lock<std::mutex> lh(pendingOpLock);
240
241     if (pendingOpsStart > 0) {
242         hrtime_t now = gethrtime();
243         if (now > pendingOpsStart) {
244             hrtime_t d = (now - pendingOpsStart) / 1000;
245             stats.pendingOpsHisto.add(d);
246             atomic_setIfBigger(stats.pendingOpsMaxDuration, d);
247         }
248     } else {
249         return;
250     }
251
252     pendingOpsStart = 0;
253     stats.pendingOps.fetch_sub(pendingOps.size());
254     atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
255
256     while (!pendingOps.empty()) {
257         const void *pendingOperation = pendingOps.back();
258         pendingOps.pop_back();
259         // We don't want to hold the pendingOpLock when
260         // calling notifyIOComplete.
261         lh.unlock();
262         engine.notifyIOComplete(pendingOperation, code);
263         lh.lock();
264     }
265
266     LOG(EXTENSION_LOG_INFO,
267         "Fired pendings ops for vbucket %" PRIu16 " in state %s\n",
268         id, VBucket::toString(state));
269 }
270
271 void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
272
273     if (state == vbucket_state_active) {
274         fireAllOps(engine, ENGINE_SUCCESS);
275     } else if (state == vbucket_state_pending) {
276         // Nothing
277     } else {
278         fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
279     }
280 }
281
282 void VBucket::setState(vbucket_state_t to) {
283     vbucket_state_t oldstate;
284     {
285         WriterLockHolder wlh(stateLock);
286         oldstate = state;
287
288         if (to == vbucket_state_active &&
289             checkpointManager.getOpenCheckpointId() < 2) {
290             checkpointManager.setOpenCheckpointId(2);
291         }
292
293         LOG(EXTENSION_LOG_NOTICE,
294             "VBucket::setState: transitioning vbucket:%" PRIu16 " from:%s to:%s",
295             id, VBucket::toString(oldstate), VBucket::toString(to));
296
297         state = to;
298     }
299 }
300
301 vbucket_state VBucket::getVBucketState() const {
302      auto persisted_range = getPersistedSnapshot();
303
304      return vbucket_state{getState(),
305                           getPersistenceCheckpointId(), 0, getHighSeqno(),
306                           getPurgeSeqno(),
307                           persisted_range.start, persisted_range.end,
308                           getMaxCas(), failovers->toJSON()};
309 }
310
311
312
313 void VBucket::doStatsForQueueing(const Item& qi, size_t itemBytes)
314 {
315     ++dirtyQueueSize;
316     dirtyQueueMem.fetch_add(sizeof(Item));
317     ++dirtyQueueFill;
318     dirtyQueueAge.fetch_add(qi.getQueuedTime());
319     dirtyQueuePendingWrites.fetch_add(itemBytes);
320 }
321
322 void VBucket::doStatsForFlushing(const Item& qi, size_t itemBytes) {
323     --dirtyQueueSize;
324     decrDirtyQueueMem(sizeof(Item));
325     ++dirtyQueueDrain;
326     decrDirtyQueueAge(qi.getQueuedTime());
327     decrDirtyQueuePendingWrites(itemBytes);
328 }
329
330 void VBucket::incrMetaDataDisk(const Item& qi) {
331     metaDataDisk.fetch_add(qi.getKey().size() + sizeof(ItemMetaData));
332 }
333
334 void VBucket::decrMetaDataDisk(const Item& qi) {
335     // assume couchstore remove approx this much data from disk
336     metaDataDisk.fetch_sub((qi.getKey().size() + sizeof(ItemMetaData)));
337 }
338
339 void VBucket::resetStats() {
340     opsCreate.store(0);
341     opsUpdate.store(0);
342     opsDelete.store(0);
343     opsReject.store(0);
344
345     stats.diskQueueSize.fetch_sub(dirtyQueueSize.exchange(0));
346     dirtyQueueMem.store(0);
347     dirtyQueueFill.store(0);
348     dirtyQueueAge.store(0);
349     dirtyQueuePendingWrites.store(0);
350     dirtyQueueDrain.store(0);
351
352     hlc.resetStats();
353 }
354
355 template <typename T>
356 void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
357                       const void *c) {
358     std::string stat = statPrefix;
359     if (nm != NULL) {
360         add_prefixed_stat(statPrefix, nm, val, add_stat, c);
361     } else {
362         add_casted_stat(statPrefix.data(), val, add_stat, c);
363     }
364 }
365
366 void VBucket::handlePreExpiry(StoredValue& v) {
367     value_t value = v.getValue();
368     if (value) {
369         std::unique_ptr<Item> itm(v.toItem(false, id));
370         item_info itm_info;
371         EventuallyPersistentEngine* engine = ObjectRegistry::getCurrentEngine();
372         itm_info = itm->toItemInfo(failovers->getLatestUUID());
373         value_t new_val(Blob::Copy(*value));
374         itm->setValue(new_val);
375
376         SERVER_HANDLE_V1* sapi = engine->getServerApi();
377         /* TODO: In order to minimize allocations, the callback needs to
378          * allocate an item whose value size will be exactly the size of the
379          * value after pre-expiry is performed.
380          */
381         if (sapi->document->pre_expiry(itm_info)) {
382             char* extMeta = const_cast<char *>(v.getValue()->getExtMeta());
383             Item new_item(v.getKey(), v.getFlags(), v.getExptime(),
384                           itm_info.value[0].iov_base, itm_info.value[0].iov_len,
385                           reinterpret_cast<uint8_t*>(extMeta),
386                           v.getValue()->getExtLen(), v.getCas(),
387                           v.getBySeqno(), id, v.getRevSeqno(),
388                           v.getNRUValue());
389
390             new_item.setDeleted();
391             v.setValue(new_item, ht);
392         }
393     }
394 }
395
396 size_t VBucket::getNumNonResidentItems() const {
397     if (eviction == VALUE_ONLY) {
398         return ht.getNumInMemoryNonResItems();
399     } else {
400         size_t num_items = ht.getNumItems();
401         size_t num_res_items = ht.getNumInMemoryItems() -
402                                ht.getNumInMemoryNonResItems();
403         return num_items > num_res_items ? (num_items - num_res_items) : 0;
404     }
405 }
406
407
408 uint64_t VBucket::getPersistenceCheckpointId() const {
409     return persistenceCheckpointId.load();
410 }
411
412 void VBucket::setPersistenceCheckpointId(uint64_t checkpointId) {
413     persistenceCheckpointId.store(checkpointId);
414 }
415
416 void VBucket::markDirty(const DocKey& key) {
417     auto hbl = ht.getLockedBucket(key);
418     StoredValue* v = ht.unlocked_find(
419             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::Yes);
420     if (v) {
421         v->markDirty();
422     } else {
423         LOG(EXTENSION_LOG_WARNING, "markDirty: Error marking dirty, a key "
424             "is missing from vb:%" PRIu16, id);
425     }
426 }
427
428 bool VBucket::isResidentRatioUnderThreshold(float threshold) {
429     if (eviction != FULL_EVICTION) {
430         throw std::invalid_argument("VBucket::isResidentRatioUnderThreshold: "
431                 "policy (which is " + std::to_string(eviction) +
432                 ") must be FULL_EVICTION");
433     }
434     size_t num_items = getNumItems();
435     size_t num_non_resident_items = getNumNonResidentItems();
436     if (threshold >= ((float)(num_items - num_non_resident_items) /
437                                                                 num_items)) {
438         return true;
439     } else {
440         return false;
441     }
442 }
443
444 void VBucket::createFilter(size_t key_count, double probability) {
445     // Create the actual bloom filter upon vbucket creation during
446     // scenarios:
447     //      - Bucket creation
448     //      - Rebalance
449     LockHolder lh(bfMutex);
450     if (bFilter == nullptr && tempFilter == nullptr) {
451         bFilter = std::make_unique<BloomFilter>(key_count, probability,
452                                         BFILTER_ENABLED);
453     } else {
454         LOG(EXTENSION_LOG_WARNING, "(vb %" PRIu16 ") Bloom filter / Temp filter"
455             " already exist!", id);
456     }
457 }
458
459 void VBucket::initTempFilter(size_t key_count, double probability) {
460     // Create a temp bloom filter with status as COMPACTING,
461     // if the main filter is found to exist, set its state to
462     // COMPACTING as well.
463     LockHolder lh(bfMutex);
464     tempFilter = std::make_unique<BloomFilter>(key_count, probability,
465                                      BFILTER_COMPACTING);
466     if (bFilter) {
467         bFilter->setStatus(BFILTER_COMPACTING);
468     }
469 }
470
471 void VBucket::addToFilter(const DocKey& key) {
472     LockHolder lh(bfMutex);
473     if (bFilter) {
474         bFilter->addKey(key);
475     }
476
477     // If the temp bloom filter is not found to be NULL,
478     // it means that compaction is running on the particular
479     // vbucket. Therefore add the key to the temp filter as
480     // well, as once compaction completes the temp filter
481     // will replace the main bloom filter.
482     if (tempFilter) {
483         tempFilter->addKey(key);
484     }
485 }
486
487 bool VBucket::maybeKeyExistsInFilter(const DocKey& key) {
488     LockHolder lh(bfMutex);
489     if (bFilter) {
490         return bFilter->maybeKeyExists(key);
491     } else {
492         // If filter doesn't exist, allow the BgFetch to go through.
493         return true;
494     }
495 }
496
497 bool VBucket::isTempFilterAvailable() {
498     LockHolder lh(bfMutex);
499     if (tempFilter &&
500         (tempFilter->getStatus() == BFILTER_COMPACTING ||
501          tempFilter->getStatus() == BFILTER_ENABLED)) {
502         return true;
503     } else {
504         return false;
505     }
506 }
507
508 void VBucket::addToTempFilter(const DocKey& key) {
509     // Keys will be added to only the temp filter during
510     // compaction.
511     LockHolder lh(bfMutex);
512     if (tempFilter) {
513         tempFilter->addKey(key);
514     }
515 }
516
517 void VBucket::swapFilter() {
518     // Delete the main bloom filter and replace it with
519     // the temp filter that was populated during compaction,
520     // only if the temp filter's state is found to be either at
521     // COMPACTING or ENABLED (if in the case the user enables
522     // bloomfilters for some reason while compaction was running).
523     // Otherwise, it indicates that the filter's state was
524     // possibly disabled during compaction, therefore clear out
525     // the temp filter. If it gets enabled at some point, a new
526     // bloom filter will be made available after the next
527     // compaction.
528
529     LockHolder lh(bfMutex);
530     if (tempFilter) {
531         bFilter.reset();
532
533         if (tempFilter->getStatus() == BFILTER_COMPACTING ||
534              tempFilter->getStatus() == BFILTER_ENABLED) {
535             bFilter = std::move(tempFilter);
536             bFilter->setStatus(BFILTER_ENABLED);
537         }
538         tempFilter.reset();
539     }
540 }
541
542 void VBucket::clearFilter() {
543     LockHolder lh(bfMutex);
544     bFilter.reset();
545     tempFilter.reset();
546 }
547
548 void VBucket::setFilterStatus(bfilter_status_t to) {
549     LockHolder lh(bfMutex);
550     if (bFilter) {
551         bFilter->setStatus(to);
552     }
553     if (tempFilter) {
554         tempFilter->setStatus(to);
555     }
556 }
557
558 std::string VBucket::getFilterStatusString() {
559     LockHolder lh(bfMutex);
560     if (bFilter) {
561         return bFilter->getStatusString();
562     } else if (tempFilter) {
563         return tempFilter->getStatusString();
564     } else {
565         return "DOESN'T EXIST";
566     }
567 }
568
569 size_t VBucket::getFilterSize() {
570     LockHolder lh(bfMutex);
571     if (bFilter) {
572         return bFilter->getFilterSize();
573     } else {
574         return 0;
575     }
576 }
577
578 size_t VBucket::getNumOfKeysInFilter() {
579     LockHolder lh(bfMutex);
580     if (bFilter) {
581         return bFilter->getNumOfKeysInFilter();
582     } else {
583         return 0;
584     }
585 }
586
587 VBNotifyCtx VBucket::queueDirty(
588         StoredValue& v,
589         const GenerateBySeqno generateBySeqno,
590         const GenerateCas generateCas,
591         const bool isBackfillItem,
592         PreLinkDocumentContext* preLinkDocumentContext) {
593     VBNotifyCtx notifyCtx;
594
595     queued_item qi(v.toItem(false, getId()));
596
597     if (isBackfillItem) {
598         queueBackfillItem(qi, generateBySeqno);
599         notifyCtx.notifyFlusher = true;
600         /* During backfill on a TAP receiver we need to update the snapshot
601          range in the checkpoint. Has to be done here because in case of TAP
602          backfill, above, we use vb.queueBackfillItem() instead of
603          vb.checkpointManager.queueDirty() */
604         if (generateBySeqno == GenerateBySeqno::Yes) {
605             checkpointManager.resetSnapshotRange();
606         }
607     } else {
608         notifyCtx.notifyFlusher =
609                 checkpointManager.queueDirty(*this,
610                                              qi,
611                                              generateBySeqno,
612                                              generateCas,
613                                              preLinkDocumentContext);
614         notifyCtx.notifyReplication = true;
615         if (GenerateCas::Yes == generateCas) {
616             v.setCas(qi->getCas());
617         }
618     }
619
620     v.setBySeqno(qi->getBySeqno());
621     notifyCtx.bySeqno = qi->getBySeqno();
622
623     return notifyCtx;
624 }
625
626 StoredValue* VBucket::fetchValidValue(HashTable::HashBucketLock& hbl,
627                                       const DocKey& key,
628                                       WantsDeleted wantsDeleted,
629                                       TrackReference trackReference,
630                                       QueueExpired queueExpired) {
631     if (!hbl.getHTLock()) {
632         throw std::logic_error(
633                 "Hash bucket lock not held in "
634                 "VBucket::fetchValidValue() for hash bucket: " +
635                 std::to_string(hbl.getBucketNum()) + "for key: " +
636                 std::string(reinterpret_cast<const char*>(key.data()),
637                             key.size()));
638     }
639     StoredValue* v = ht.unlocked_find(
640             key, hbl.getBucketNum(), wantsDeleted, trackReference);
641     if (v && !v->isDeleted() && !v->isTempItem()) {
642         // In the deleted case, we ignore expiration time.
643         if (v->isExpired(ep_real_time())) {
644             if (getState() != vbucket_state_active) {
645                 return wantsDeleted == WantsDeleted::Yes ? v : NULL;
646             }
647
648             // queueDirty only allowed on active VB
649             if (queueExpired == QueueExpired::Yes &&
650                 getState() == vbucket_state_active) {
651                 incExpirationStat(ExpireBy::Access);
652                 handlePreExpiry(*v);
653                 VBNotifyCtx notifyCtx;
654                 std::tie(std::ignore, v, notifyCtx) =
655                         processExpiredItem(hbl, *v);
656                 notifyNewSeqno(notifyCtx);
657             }
658             return wantsDeleted == WantsDeleted::Yes ? v : NULL;
659         }
660     }
661     return v;
662 }
663
664 void VBucket::incExpirationStat(const ExpireBy source) {
665     switch (source) {
666     case ExpireBy::Pager:
667         ++stats.expired_pager;
668         break;
669     case ExpireBy::Compactor:
670         ++stats.expired_compactor;
671         break;
672     case ExpireBy::Access:
673         ++stats.expired_access;
674         break;
675     }
676     ++numExpiredItems;
677 }
678
679 MutationStatus VBucket::setFromInternal(Item& itm) {
680     return ht.set(itm);
681 }
682
683 ENGINE_ERROR_CODE VBucket::set(Item& itm,
684                                const void* cookie,
685                                EventuallyPersistentEngine& engine,
686                                const int bgFetchDelay) {
687     bool cas_op = (itm.getCas() != 0);
688     auto hbl = ht.getLockedBucket(itm.getKey());
689     StoredValue* v = ht.unlocked_find(itm.getKey(),
690                                       hbl.getBucketNum(),
691                                       WantsDeleted::Yes,
692                                       TrackReference::No);
693     if (v && v->isLocked(ep_current_time()) &&
694         (getState() == vbucket_state_replica ||
695          getState() == vbucket_state_pending)) {
696         v->unlock();
697     }
698
699     bool maybeKeyExists = true;
700     // If we didn't find a valid item, check Bloomfilter's prediction if in
701     // full eviction policy and for a CAS operation.
702     if ((v == nullptr || v->isTempInitialItem()) &&
703         (eviction == FULL_EVICTION) && (itm.getCas() != 0)) {
704         // Check Bloomfilter's prediction
705         if (!maybeKeyExistsInFilter(itm.getKey())) {
706             maybeKeyExists = false;
707         }
708     }
709
710     PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
711     VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
712                                GenerateCas::Yes,
713                                TrackCasDrift::No,
714                                /*isBackfillItem*/ false,
715                                &preLinkDocumentContext);
716
717     MutationStatus status;
718     VBNotifyCtx notifyCtx;
719     std::tie(status, notifyCtx) = processSet(hbl,
720                                              v,
721                                              itm,
722                                              itm.getCas(),
723                                              /*allowExisting*/ true,
724                                              /*hashMetaData*/ false,
725                                              &queueItmCtx,
726                                              maybeKeyExists);
727
728     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
729     switch (status) {
730     case MutationStatus::NoMem:
731         ret = ENGINE_ENOMEM;
732         break;
733     case MutationStatus::InvalidCas:
734         ret = ENGINE_KEY_EEXISTS;
735         break;
736     case MutationStatus::IsLocked:
737         ret = ENGINE_LOCKED;
738         break;
739     case MutationStatus::NotFound:
740         if (cas_op) {
741             ret = ENGINE_KEY_ENOENT;
742             break;
743         }
744     // FALLTHROUGH
745     case MutationStatus::WasDirty:
746     // Even if the item was dirty, push it into the vbucket's open
747     // checkpoint.
748     case MutationStatus::WasClean:
749         notifyNewSeqno(notifyCtx);
750
751         itm.setBySeqno(v->getBySeqno());
752         itm.setCas(v->getCas());
753         break;
754     case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
755         // +
756         // full eviction.
757         if (v) {
758             // temp item is already created. Simply schedule a bg fetch job
759             hbl.getHTLock().unlock();
760             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
761             return ENGINE_EWOULDBLOCK;
762         }
763         ret = addTempItemAndBGFetch(
764                 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
765         break;
766     }
767     }
768
769     return ret;
770 }
771
772 ENGINE_ERROR_CODE VBucket::replace(Item& itm,
773                                    const void* cookie,
774                                    EventuallyPersistentEngine& engine,
775                                    const int bgFetchDelay) {
776     auto hbl = ht.getLockedBucket(itm.getKey());
777     StoredValue* v = ht.unlocked_find(itm.getKey(),
778                                       hbl.getBucketNum(),
779                                       WantsDeleted::Yes,
780                                       TrackReference::No);
781     if (v) {
782         if (v->isDeleted() || v->isTempDeletedItem() ||
783             v->isTempNonExistentItem()) {
784             return ENGINE_KEY_ENOENT;
785         }
786
787         MutationStatus mtype;
788         VBNotifyCtx notifyCtx;
789         if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
790             mtype = MutationStatus::NeedBgFetch;
791         } else {
792             PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
793             VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
794                                        GenerateCas::Yes,
795                                        TrackCasDrift::No,
796                                        /*isBackfillItem*/ false,
797                                        &preLinkDocumentContext);
798             std::tie(mtype, notifyCtx) = processSet(hbl,
799                                                     v,
800                                                     itm,
801                                                     0,
802                                                     /*allowExisting*/ true,
803                                                     /*hasMetaData*/ false,
804                                                     &queueItmCtx);
805         }
806
807         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
808         switch (mtype) {
809         case MutationStatus::NoMem:
810             ret = ENGINE_ENOMEM;
811             break;
812         case MutationStatus::IsLocked:
813             ret = ENGINE_LOCKED;
814             break;
815         case MutationStatus::InvalidCas:
816         case MutationStatus::NotFound:
817             ret = ENGINE_NOT_STORED;
818             break;
819         // FALLTHROUGH
820         case MutationStatus::WasDirty:
821         // Even if the item was dirty, push it into the vbucket's open
822         // checkpoint.
823         case MutationStatus::WasClean:
824             notifyNewSeqno(notifyCtx);
825
826             itm.setBySeqno(v->getBySeqno());
827             itm.setCas(v->getCas());
828             break;
829         case MutationStatus::NeedBgFetch: {
830             // temp item is already created. Simply schedule a bg fetch job
831             hbl.getHTLock().unlock();
832             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
833             ret = ENGINE_EWOULDBLOCK;
834             break;
835         }
836         }
837
838         return ret;
839     } else {
840         if (eviction == VALUE_ONLY) {
841             return ENGINE_KEY_ENOENT;
842         }
843
844         if (maybeKeyExistsInFilter(itm.getKey())) {
845             return addTempItemAndBGFetch(
846                     hbl, itm.getKey(), cookie, engine, bgFetchDelay, false);
847         } else {
848             // As bloomfilter predicted that item surely doesn't exist
849             // on disk, return ENOENT for replace().
850             return ENGINE_KEY_ENOENT;
851         }
852     }
853 }
854
855 ENGINE_ERROR_CODE VBucket::addBackfillItem(Item& itm,
856                                            const GenerateBySeqno genBySeqno) {
857     auto hbl = ht.getLockedBucket(itm.getKey());
858     StoredValue* v = ht.unlocked_find(itm.getKey(),
859                                       hbl.getBucketNum(),
860                                       WantsDeleted::Yes,
861                                       TrackReference::No);
862
863     // Note that this function is only called on replica or pending vbuckets.
864     if (v && v->isLocked(ep_current_time())) {
865         v->unlock();
866     }
867
868     VBQueueItemCtx queueItmCtx(genBySeqno,
869                                GenerateCas::No,
870                                TrackCasDrift::No,
871                                /*isBackfillItem*/ true,
872                                nullptr /* No pre link should happen */);
873     MutationStatus status;
874     VBNotifyCtx notifyCtx;
875     std::tie(status, notifyCtx) = processSet(hbl,
876                                              v,
877                                              itm,
878                                              0,
879                                              /*allowExisting*/ true,
880                                              /*hasMetaData*/ true,
881                                              &queueItmCtx);
882
883     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
884     switch (status) {
885     case MutationStatus::NoMem:
886         ret = ENGINE_ENOMEM;
887         break;
888     case MutationStatus::InvalidCas:
889     case MutationStatus::IsLocked:
890         ret = ENGINE_KEY_EEXISTS;
891         break;
892     case MutationStatus::WasDirty:
893     // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
894     // set correctly, and also the sequence numbers are ordered correctly.
895     // (MB-14003)
896     case MutationStatus::NotFound:
897     // FALLTHROUGH
898     case MutationStatus::WasClean: {
899         setMaxCas(v->getCas());
900         // we unlock ht lock here because we want to avoid potential lock
901         // inversions arising from notifyNewSeqno() call
902         hbl.getHTLock().unlock();
903         notifyNewSeqno(notifyCtx);
904     } break;
905     case MutationStatus::NeedBgFetch:
906         throw std::logic_error(
907                 "VBucket::addBackfillItem: "
908                 "SET on a non-active vbucket should not require a "
909                 "bg_metadata_fetch.");
910     }
911
912     return ret;
913 }
914
915 ENGINE_ERROR_CODE VBucket::setWithMeta(Item& itm,
916                                        const uint64_t cas,
917                                        uint64_t* seqno,
918                                        const void* cookie,
919                                        EventuallyPersistentEngine& engine,
920                                        const int bgFetchDelay,
921                                        const bool force,
922                                        const bool allowExisting,
923                                        const GenerateBySeqno genBySeqno,
924                                        const GenerateCas genCas,
925                                        const bool isReplication) {
926     auto hbl = ht.getLockedBucket(itm.getKey());
927     StoredValue* v = ht.unlocked_find(itm.getKey(),
928                                       hbl.getBucketNum(),
929                                       WantsDeleted::Yes,
930                                       TrackReference::No);
931
932     bool maybeKeyExists = true;
933     if (!force) {
934         if (v) {
935             if (v->isTempInitialItem()) {
936                 bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
937                 return ENGINE_EWOULDBLOCK;
938             }
939
940             if (!(conflictResolver->resolve(*v,
941                                             itm.getMetaData(),
942                                             itm.getDataType(),
943                                             itm.isDeleted()))) {
944                 ++stats.numOpsSetMetaResolutionFailed;
945                 return ENGINE_KEY_EEXISTS;
946             }
947         } else {
948             if (maybeKeyExistsInFilter(itm.getKey())) {
949                 return addTempItemAndBGFetch(hbl,
950                                              itm.getKey(),
951                                              cookie,
952                                              engine,
953                                              bgFetchDelay,
954                                              true,
955                                              isReplication);
956             } else {
957                 maybeKeyExists = false;
958             }
959         }
960     } else {
961         if (eviction == FULL_EVICTION) {
962             // Check Bloomfilter's prediction
963             if (!maybeKeyExistsInFilter(itm.getKey())) {
964                 maybeKeyExists = false;
965             }
966         }
967     }
968
969     if (v && v->isLocked(ep_current_time()) &&
970         (getState() == vbucket_state_replica ||
971          getState() == vbucket_state_pending)) {
972         v->unlock();
973     }
974
975     VBQueueItemCtx queueItmCtx(genBySeqno,
976                                genCas,
977                                TrackCasDrift::Yes,
978                                /*isBackfillItem*/ false,
979                                nullptr /* No pre link step needed */);
980     MutationStatus status;
981     VBNotifyCtx notifyCtx;
982     std::tie(status, notifyCtx) = processSet(hbl,
983                                              v,
984                                              itm,
985                                              cas,
986                                              allowExisting,
987                                              true,
988                                              &queueItmCtx,
989                                              maybeKeyExists,
990                                              isReplication);
991
992     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
993     switch (status) {
994     case MutationStatus::NoMem:
995         ret = ENGINE_ENOMEM;
996         break;
997     case MutationStatus::InvalidCas:
998         ret = ENGINE_KEY_EEXISTS;
999         break;
1000     case MutationStatus::IsLocked:
1001         ret = ENGINE_LOCKED;
1002         break;
1003     case MutationStatus::WasDirty:
1004     case MutationStatus::WasClean: {
1005         if (seqno) {
1006             *seqno = static_cast<uint64_t>(v->getBySeqno());
1007         }
1008         // we unlock ht lock here because we want to avoid potential lock
1009         // inversions arising from notifyNewSeqno() call
1010         hbl.getHTLock().unlock();
1011         notifyNewSeqno(notifyCtx);
1012     } break;
1013     case MutationStatus::NotFound:
1014         ret = ENGINE_KEY_ENOENT;
1015         break;
1016     case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
1017         // + full eviction.
1018         if (v) { // temp item is already created. Simply schedule a
1019             hbl.getHTLock().unlock(); // bg fetch job.
1020             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1021             return ENGINE_EWOULDBLOCK;
1022         }
1023         ret = addTempItemAndBGFetch(hbl,
1024                                     itm.getKey(),
1025                                     cookie,
1026                                     engine,
1027                                     bgFetchDelay,
1028                                     true,
1029                                     isReplication);
1030     }
1031     }
1032
1033     return ret;
1034 }
1035
1036 ENGINE_ERROR_CODE VBucket::deleteItem(const DocKey& key,
1037                                       uint64_t& cas,
1038                                       const void* cookie,
1039                                       EventuallyPersistentEngine& engine,
1040                                       const int bgFetchDelay,
1041                                       ItemMetaData* itemMeta,
1042                                       mutation_descr_t* mutInfo) {
1043     auto hbl = ht.getLockedBucket(key);
1044     StoredValue* v = ht.unlocked_find(
1045             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1046
1047     if (!v || v->isDeleted() || v->isTempItem()) {
1048         if (eviction == VALUE_ONLY) {
1049             return ENGINE_KEY_ENOENT;
1050         } else { // Full eviction.
1051             if (!v) { // Item might be evicted from cache.
1052                 if (maybeKeyExistsInFilter(key)) {
1053                     return addTempItemAndBGFetch(
1054                             hbl, key, cookie, engine, bgFetchDelay, true);
1055                 } else {
1056                     // As bloomfilter predicted that item surely doesn't
1057                     // exist on disk, return ENOENT for deleteItem().
1058                     return ENGINE_KEY_ENOENT;
1059                 }
1060             } else if (v->isTempInitialItem()) {
1061                 hbl.getHTLock().unlock();
1062                 bgFetch(key, cookie, engine, bgFetchDelay, true);
1063                 return ENGINE_EWOULDBLOCK;
1064             } else { // Non-existent or deleted key.
1065                 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1066                     // Delete a temp non-existent item to ensure that
1067                     // if a delete were issued over an item that doesn't
1068                     // exist, then we don't preserve a temp item.
1069                     deleteStoredValue(hbl, *v);
1070                 }
1071                 return ENGINE_KEY_ENOENT;
1072             }
1073         }
1074     }
1075
1076     if (v->isLocked(ep_current_time()) &&
1077         (getState() == vbucket_state_replica ||
1078          getState() == vbucket_state_pending)) {
1079         v->unlock();
1080     }
1081
1082     if (itemMeta != nullptr) {
1083         itemMeta->cas = v->getCas();
1084     }
1085
1086     MutationStatus delrv;
1087     VBNotifyCtx notifyCtx;
1088     if (v->isExpired(ep_real_time())) {
1089         std::tie(delrv, v, notifyCtx) = processExpiredItem(hbl, *v);
1090     } else {
1091         ItemMetaData metadata;
1092         metadata.revSeqno = v->getRevSeqno() + 1;
1093         std::tie(delrv, v, notifyCtx) =
1094                 processSoftDelete(hbl,
1095                                   *v,
1096                                   cas,
1097                                   metadata,
1098                                   VBQueueItemCtx(GenerateBySeqno::Yes,
1099                                                  GenerateCas::Yes,
1100                                                  TrackCasDrift::No,
1101                                                  /*isBackfillItem*/ false,
1102                                                  nullptr /* no pre link */),
1103                                   /*use_meta*/ false,
1104                                   /*bySeqno*/ v->getBySeqno());
1105     }
1106
1107     uint64_t seqno = 0;
1108     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1109     switch (delrv) {
1110     case MutationStatus::NoMem:
1111         ret = ENGINE_ENOMEM;
1112         break;
1113     case MutationStatus::InvalidCas:
1114         ret = ENGINE_KEY_EEXISTS;
1115         break;
1116     case MutationStatus::IsLocked:
1117         ret = ENGINE_LOCKED_TMPFAIL;
1118         break;
1119     case MutationStatus::NotFound:
1120         ret = ENGINE_KEY_ENOENT;
1121     /* Fallthrough:
1122      * A NotFound return value at this point indicates that the
1123      * item has expired. But, a deletion still needs to be queued
1124      * for the item in order to persist it.
1125      */
1126     case MutationStatus::WasClean:
1127     case MutationStatus::WasDirty:
1128         if (itemMeta != nullptr) {
1129             itemMeta->revSeqno = v->getRevSeqno();
1130             itemMeta->flags = v->getFlags();
1131             itemMeta->exptime = v->getExptime();
1132         }
1133
1134         notifyNewSeqno(notifyCtx);
1135         seqno = static_cast<uint64_t>(v->getBySeqno());
1136         cas = v->getCas();
1137
1138         if (delrv != MutationStatus::NotFound) {
1139             if (mutInfo) {
1140                 mutInfo->seqno = seqno;
1141                 mutInfo->vbucket_uuid = failovers->getLatestUUID();
1142             }
1143             if (itemMeta != nullptr) {
1144                 itemMeta->cas = v->getCas();
1145             }
1146         }
1147         break;
1148     case MutationStatus::NeedBgFetch:
1149         // We already figured out if a bg fetch is requred for a full-evicted
1150         // item above.
1151         throw std::logic_error(
1152                 "VBucket::deleteItem: "
1153                 "Unexpected NEEDS_BG_FETCH from processSoftDelete");
1154     }
1155     return ret;
1156 }
1157
1158 ENGINE_ERROR_CODE VBucket::deleteWithMeta(const DocKey& key,
1159                                           uint64_t& cas,
1160                                           uint64_t* seqno,
1161                                           const void* cookie,
1162                                           EventuallyPersistentEngine& engine,
1163                                           const int bgFetchDelay,
1164                                           const bool force,
1165                                           const ItemMetaData& itemMeta,
1166                                           const bool backfill,
1167                                           const GenerateBySeqno genBySeqno,
1168                                           const GenerateCas generateCas,
1169                                           const uint64_t bySeqno,
1170                                           const bool isReplication) {
1171     auto hbl = ht.getLockedBucket(key);
1172     StoredValue* v = ht.unlocked_find(
1173             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1174     if (!force) { // Need conflict resolution.
1175         if (v) {
1176             if (v->isTempInitialItem()) {
1177                 bgFetch(key, cookie, engine, bgFetchDelay, true);
1178                 return ENGINE_EWOULDBLOCK;
1179             }
1180
1181             if (!(conflictResolver->resolve(*v,
1182                                             itemMeta,
1183                                             PROTOCOL_BINARY_RAW_BYTES,
1184                                             true))) {
1185                 ++stats.numOpsDelMetaResolutionFailed;
1186                 return ENGINE_KEY_EEXISTS;
1187             }
1188         } else {
1189             // Item is 1) deleted or not existent in the value eviction case OR
1190             // 2) deleted or evicted in the full eviction.
1191             if (maybeKeyExistsInFilter(key)) {
1192                 return addTempItemAndBGFetch(hbl,
1193                                              key,
1194                                              cookie,
1195                                              engine,
1196                                              bgFetchDelay,
1197                                              true,
1198                                              isReplication);
1199             } else {
1200                 // Even though bloomfilter predicted that item doesn't exist
1201                 // on disk, we must put this delete on disk if the cas is valid.
1202                 AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1203                 if (rv == AddStatus::NoMem) {
1204                     return ENGINE_ENOMEM;
1205                 }
1206                 v = ht.unlocked_find(key,
1207                                      hbl.getBucketNum(),
1208                                      WantsDeleted::Yes,
1209                                      TrackReference::No);
1210                 v->setDeleted();
1211             }
1212         }
1213     } else {
1214         if (!v) {
1215             // We should always try to persist a delete here.
1216             AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1217             if (rv == AddStatus::NoMem) {
1218                 return ENGINE_ENOMEM;
1219             }
1220             v = ht.unlocked_find(key,
1221                                  hbl.getBucketNum(),
1222                                  WantsDeleted::Yes,
1223                                  TrackReference::No);
1224             v->setDeleted();
1225             v->setCas(cas);
1226         } else if (v->isTempInitialItem()) {
1227             v->setDeleted();
1228             v->setCas(cas);
1229         }
1230     }
1231
1232     if (v && v->isLocked(ep_current_time()) &&
1233         (getState() == vbucket_state_replica ||
1234          getState() == vbucket_state_pending)) {
1235         v->unlock();
1236     }
1237
1238     MutationStatus delrv;
1239     VBNotifyCtx notifyCtx;
1240     if (!v) {
1241         if (eviction == FULL_EVICTION) {
1242             delrv = MutationStatus::NeedBgFetch;
1243         } else {
1244             delrv = MutationStatus::NotFound;
1245         }
1246     } else {
1247         VBQueueItemCtx queueItmCtx(genBySeqno,
1248                                    generateCas,
1249                                    TrackCasDrift::Yes,
1250                                    backfill,
1251                                    nullptr /* No pre link step needed */);
1252
1253         // system xattrs must remain
1254         std::unique_ptr<Item> itm;
1255         if (mcbp::datatype::is_xattr(v->getDatatype()) &&
1256             (itm = pruneXattrDocument(*v, itemMeta))) {
1257             std::tie(v, delrv, notifyCtx) =
1258                     updateStoredValue(hbl, *v, *itm, &queueItmCtx);
1259         } else {
1260             std::tie(delrv, v, notifyCtx) = processSoftDelete(hbl,
1261                                                               *v,
1262                                                               cas,
1263                                                               itemMeta,
1264                                                               queueItmCtx,
1265                                                               /*use_meta*/ true,
1266                                                               bySeqno);
1267         }
1268     }
1269     cas = v ? v->getCas() : 0;
1270
1271     switch (delrv) {
1272     case MutationStatus::NoMem:
1273         return ENGINE_ENOMEM;
1274     case MutationStatus::InvalidCas:
1275         return ENGINE_KEY_EEXISTS;
1276     case MutationStatus::IsLocked:
1277         return ENGINE_LOCKED_TMPFAIL;
1278     case MutationStatus::NotFound:
1279         return ENGINE_KEY_ENOENT;
1280     case MutationStatus::WasDirty:
1281     case MutationStatus::WasClean: {
1282         if (seqno) {
1283             *seqno = static_cast<uint64_t>(v->getBySeqno());
1284         }
1285         // we unlock ht lock here because we want to avoid potential lock
1286         // inversions arising from notifyNewSeqno() call
1287         hbl.getHTLock().unlock();
1288         notifyNewSeqno(notifyCtx);
1289         break;
1290     }
1291     case MutationStatus::NeedBgFetch:
1292         hbl.getHTLock().unlock();
1293         bgFetch(key, cookie, engine, bgFetchDelay, true);
1294         return ENGINE_EWOULDBLOCK;
1295     }
1296     return ENGINE_SUCCESS;
1297 }
1298
1299 void VBucket::deleteExpiredItem(const DocKey& key,
1300                                 time_t startTime,
1301                                 uint64_t revSeqno,
1302                                 ExpireBy source) {
1303     auto hbl = ht.getLockedBucket(key);
1304     StoredValue* v = ht.unlocked_find(
1305             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1306     if (v) {
1307         if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1308             // This is a temporary item whose background fetch for metadata
1309             // has completed.
1310             bool deleted = deleteStoredValue(hbl, *v);
1311             if (!deleted) {
1312                 throw std::logic_error(
1313                         "VBucket::deleteExpiredItem: "
1314                         "Failed to delete seqno:" +
1315                         std::to_string(v->getBySeqno()) + " from bucket " +
1316                         std::to_string(hbl.getBucketNum()));
1317             }
1318         } else if (v->isExpired(startTime) && !v->isDeleted()) {
1319             handlePreExpiry(*v);
1320             VBNotifyCtx notifyCtx;
1321             std::tie(std::ignore, std::ignore, notifyCtx) =
1322                     processExpiredItem(hbl, *v);
1323             // we unlock ht lock here because we want to avoid potential lock
1324             // inversions arising from notifyNewSeqno() call
1325             hbl.getHTLock().unlock();
1326             notifyNewSeqno(notifyCtx);
1327         }
1328     } else {
1329         if (eviction == FULL_EVICTION) {
1330             // Create a temp item and delete and push it
1331             // into the checkpoint queue, only if the bloomfilter
1332             // predicts that the item may exist on disk.
1333             if (maybeKeyExistsInFilter(key)) {
1334                 AddStatus rv = addTempStoredValue(hbl, key);
1335                 if (rv == AddStatus::NoMem) {
1336                     return;
1337                 }
1338                 v = ht.unlocked_find(key,
1339                                      hbl.getBucketNum(),
1340                                      WantsDeleted::Yes,
1341                                      TrackReference::No);
1342                 v->setDeleted();
1343                 v->setRevSeqno(revSeqno);
1344                 VBNotifyCtx notifyCtx;
1345                 std::tie(std::ignore, std::ignore, notifyCtx) =
1346                         processExpiredItem(hbl, *v);
1347                 // we unlock ht lock here because we want to avoid potential
1348                 // lock inversions arising from notifyNewSeqno() call
1349                 hbl.getHTLock().unlock();
1350                 notifyNewSeqno(notifyCtx);
1351             }
1352         }
1353     }
1354     incExpirationStat(source);
1355 }
1356
1357 ENGINE_ERROR_CODE VBucket::add(Item& itm,
1358                                const void* cookie,
1359                                EventuallyPersistentEngine& engine,
1360                                const int bgFetchDelay) {
1361     auto hbl = ht.getLockedBucket(itm.getKey());
1362     StoredValue* v = ht.unlocked_find(itm.getKey(),
1363                                       hbl.getBucketNum(),
1364                                       WantsDeleted::Yes,
1365                                       TrackReference::No);
1366
1367     bool maybeKeyExists = true;
1368     if ((v == nullptr || v->isTempInitialItem()) &&
1369         (eviction == FULL_EVICTION)) {
1370         // Check bloomfilter's prediction
1371         if (!maybeKeyExistsInFilter(itm.getKey())) {
1372             maybeKeyExists = false;
1373         }
1374     }
1375
1376     PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
1377     VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
1378                                GenerateCas::Yes,
1379                                TrackCasDrift::No,
1380                                /*isBackfillItem*/ false,
1381                                &preLinkDocumentContext);
1382     AddStatus status;
1383     VBNotifyCtx notifyCtx;
1384     std::tie(status, notifyCtx) =
1385             processAdd(hbl, v, itm, maybeKeyExists, false, &queueItmCtx);
1386
1387     switch (status) {
1388     case AddStatus::NoMem:
1389         return ENGINE_ENOMEM;
1390     case AddStatus::Exists:
1391         return ENGINE_NOT_STORED;
1392     case AddStatus::AddTmpAndBgFetch:
1393         return addTempItemAndBGFetch(
1394                 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
1395     case AddStatus::BgFetch:
1396         hbl.getHTLock().unlock();
1397         bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1398         return ENGINE_EWOULDBLOCK;
1399     case AddStatus::Success:
1400     case AddStatus::UnDel:
1401         notifyNewSeqno(notifyCtx);
1402         itm.setBySeqno(v->getBySeqno());
1403         itm.setCas(v->getCas());
1404         break;
1405     }
1406     return ENGINE_SUCCESS;
1407 }
1408
1409 GetValue VBucket::getAndUpdateTtl(const DocKey& key,
1410                                   const void* cookie,
1411                                   EventuallyPersistentEngine& engine,
1412                                   int bgFetchDelay,
1413                                   time_t exptime) {
1414     auto hbl = ht.getLockedBucket(key);
1415     StoredValue* v = fetchValidValue(hbl,
1416                                      key,
1417                                      WantsDeleted::Yes,
1418                                      TrackReference::Yes,
1419                                      QueueExpired::Yes);
1420
1421     if (v) {
1422         if (v->isDeleted() || v->isTempDeletedItem() ||
1423             v->isTempNonExistentItem()) {
1424             return {};
1425         }
1426
1427         if (!v->isResident()) {
1428             bgFetch(key, cookie, engine, bgFetchDelay);
1429             return GetValue(nullptr, ENGINE_EWOULDBLOCK, v->getBySeqno());
1430         }
1431         if (v->isLocked(ep_current_time())) {
1432             return GetValue(nullptr, ENGINE_KEY_EEXISTS, 0);
1433         }
1434
1435         const bool exptime_mutated = exptime != v->getExptime();
1436         if (exptime_mutated) {
1437             v->markDirty();
1438             v->setExptime(exptime);
1439             v->setRevSeqno(v->getRevSeqno() + 1);
1440         }
1441
1442         GetValue rv(
1443                 v->toItem(v->isLocked(ep_current_time()), getId()).release(),
1444                 ENGINE_SUCCESS,
1445                 v->getBySeqno());
1446
1447         if (exptime_mutated) {
1448             VBNotifyCtx notifyCtx = queueDirty(*v);
1449             rv.getValue()->setCas(v->getCas());
1450             // we unlock ht lock here because we want to avoid potential lock
1451             // inversions arising from notifyNewSeqno() call
1452             hbl.getHTLock().unlock();
1453             notifyNewSeqno(notifyCtx);
1454         }
1455
1456         return rv;
1457     } else {
1458         if (eviction == VALUE_ONLY) {
1459             return {};
1460         } else {
1461             if (maybeKeyExistsInFilter(key)) {
1462                 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1463                         hbl, key, cookie, engine, bgFetchDelay, false);
1464                 return GetValue(NULL, ec, -1, true);
1465             } else {
1466                 // As bloomfilter predicted that item surely doesn't exist
1467                 // on disk, return ENOENT for getAndUpdateTtl().
1468                 return {};
1469             }
1470         }
1471     }
1472 }
1473
1474 MutationStatus VBucket::insertFromWarmup(Item& itm,
1475                                          bool eject,
1476                                          bool keyMetaDataOnly) {
1477     if (!StoredValue::hasAvailableSpace(stats, itm)) {
1478         return MutationStatus::NoMem;
1479     }
1480
1481     auto hbl = ht.getLockedBucket(itm.getKey());
1482     StoredValue* v = ht.unlocked_find(itm.getKey(),
1483                                       hbl.getBucketNum(),
1484                                       WantsDeleted::Yes,
1485                                       TrackReference::No);
1486
1487     if (v == NULL) {
1488         v = addNewStoredValue(hbl, itm, /*queueItmCtx*/ nullptr).first;
1489         if (keyMetaDataOnly) {
1490             v->markNotResident();
1491             /* For now ht stats are updated from outside ht. This seems to be
1492                a better option for now than passing a flag to
1493                addNewStoredValue() just for this func */
1494             ++(ht.numNonResidentItems);
1495         }
1496         /* For now ht stats are updated from outside ht. This seems to be
1497            a better option for now than passing a flag to
1498            addNewStoredValue() just for this func.
1499            We need to decrNumTotalItems because ht.numTotalItems is already
1500            set by warmup when it estimated the item count from disk */
1501         ht.decrNumTotalItems();
1502         v->setNewCacheItem(false);
1503     } else {
1504         if (keyMetaDataOnly) {
1505             // We don't have a better error code ;)
1506             return MutationStatus::InvalidCas;
1507         }
1508
1509         // Verify that the CAS isn't changed
1510         if (v->getCas() != itm.getCas()) {
1511             if (v->getCas() == 0) {
1512                 v->setCas(itm.getCas());
1513                 v->setFlags(itm.getFlags());
1514                 v->setExptime(itm.getExptime());
1515                 v->setRevSeqno(itm.getRevSeqno());
1516             } else {
1517                 return MutationStatus::InvalidCas;
1518             }
1519         }
1520         updateStoredValue(hbl, *v, itm, /*queueItmCtx*/ nullptr);
1521     }
1522
1523     v->markClean();
1524
1525     if (eject && !keyMetaDataOnly) {
1526         ht.unlocked_ejectItem(v, eviction);
1527     }
1528
1529     return MutationStatus::NotFound;
1530 }
1531
1532 GetValue VBucket::getInternal(const DocKey& key,
1533                               const void* cookie,
1534                               EventuallyPersistentEngine& engine,
1535                               int bgFetchDelay,
1536                               get_options_t options,
1537                               bool diskFlushAll) {
1538     const TrackReference trackReference = (options & TRACK_REFERENCE)
1539                                                   ? TrackReference::Yes
1540                                                   : TrackReference::No;
1541     const bool getDeletedValue = (options & GET_DELETED_VALUE);
1542     auto hbl = ht.getLockedBucket(key);
1543     StoredValue* v = fetchValidValue(
1544             hbl, key, WantsDeleted::Yes, trackReference, QueueExpired::Yes);
1545     if (v) {
1546         if (v->isDeleted() && !getDeletedValue) {
1547             return GetValue();
1548         }
1549         if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1550             // Delete a temp non-existent item to ensure that
1551             // if the get were issued over an item that doesn't
1552             // exist, then we dont preserve a temp item.
1553             if (options & DELETE_TEMP) {
1554                 deleteStoredValue(hbl, *v);
1555             }
1556             return GetValue();
1557         }
1558
1559         // If the value is not resident, wait for it...
1560         if (!v->isResident()) {
1561             return getInternalNonResident(
1562                     key, cookie, engine, bgFetchDelay, options, *v);
1563         }
1564
1565         // Should we hide (return -1) for the items' CAS?
1566         const bool hide_cas =
1567                 (options & HIDE_LOCKED_CAS) && v->isLocked(ep_current_time());
1568         return GetValue(v->toItem(hide_cas, getId()).release(),
1569                         ENGINE_SUCCESS,
1570                         v->getBySeqno(),
1571                         false,
1572                         v->getNRUValue());
1573     } else {
1574         if (!getDeletedValue && (eviction == VALUE_ONLY || diskFlushAll)) {
1575             return GetValue();
1576         }
1577
1578         if (maybeKeyExistsInFilter(key)) {
1579             ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1580             if (options &
1581                 QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
1582                 ec = addTempItemAndBGFetch(
1583                         hbl, key, cookie, engine, bgFetchDelay, false);
1584             }
1585             return GetValue(NULL, ec, -1, true);
1586         } else {
1587             // As bloomfilter predicted that item surely doesn't exist
1588             // on disk, return ENOENT, for getInternal().
1589             return GetValue();
1590         }
1591     }
1592 }
1593
1594 ENGINE_ERROR_CODE VBucket::getMetaData(const DocKey& key,
1595                                        const void* cookie,
1596                                        EventuallyPersistentEngine& engine,
1597                                        int bgFetchDelay,
1598                                        ItemMetaData& metadata,
1599                                        uint32_t& deleted,
1600                                        uint8_t& datatype) {
1601     deleted = 0;
1602     auto hbl = ht.getLockedBucket(key);
1603     StoredValue* v = ht.unlocked_find(
1604             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1605
1606     if (v) {
1607         stats.numOpsGetMeta++;
1608         if (v->isTempInitialItem()) {
1609             // Need bg meta fetch.
1610             bgFetch(key, cookie, engine, bgFetchDelay, true);
1611             return ENGINE_EWOULDBLOCK;
1612         } else if (v->isTempNonExistentItem()) {
1613             metadata.cas = v->getCas();
1614             return ENGINE_KEY_ENOENT;
1615         } else {
1616             if (v->isTempDeletedItem() || v->isDeleted() ||
1617                 v->isExpired(ep_real_time())) {
1618                 deleted |= GET_META_ITEM_DELETED_FLAG;
1619             }
1620
1621             if (v->isLocked(ep_current_time())) {
1622                 metadata.cas = static_cast<uint64_t>(-1);
1623             } else {
1624                 metadata.cas = v->getCas();
1625             }
1626             metadata.flags = v->getFlags();
1627             metadata.exptime = v->getExptime();
1628             metadata.revSeqno = v->getRevSeqno();
1629             datatype = v->getDatatype();
1630
1631             return ENGINE_SUCCESS;
1632         }
1633     } else {
1634         // The key wasn't found. However, this may be because it was previously
1635         // deleted or evicted with the full eviction strategy.
1636         // So, add a temporary item corresponding to the key to the hash table
1637         // and schedule a background fetch for its metadata from the persistent
1638         // store. The item's state will be updated after the fetch completes.
1639         //
1640         // Schedule this bgFetch only if the key is predicted to be may-be
1641         // existent on disk by the bloomfilter.
1642
1643         if (maybeKeyExistsInFilter(key)) {
1644             return addTempItemAndBGFetch(
1645                     hbl, key, cookie, engine, bgFetchDelay, true);
1646         } else {
1647             stats.numOpsGetMeta++;
1648             return ENGINE_KEY_ENOENT;
1649         }
1650     }
1651 }
1652
1653 ENGINE_ERROR_CODE VBucket::getKeyStats(const DocKey& key,
1654                                        const void* cookie,
1655                                        EventuallyPersistentEngine& engine,
1656                                        int bgFetchDelay,
1657                                        struct key_stats& kstats,
1658                                        WantsDeleted wantsDeleted) {
1659     auto hbl = ht.getLockedBucket(key);
1660     StoredValue* v = fetchValidValue(hbl,
1661                                      key,
1662                                      WantsDeleted::Yes,
1663                                      TrackReference::Yes,
1664                                      QueueExpired::Yes);
1665
1666     if (v) {
1667         if ((v->isDeleted() && wantsDeleted == WantsDeleted::No) ||
1668             v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1669             return ENGINE_KEY_ENOENT;
1670         }
1671         if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
1672             hbl.getHTLock().unlock();
1673             bgFetch(key, cookie, engine, bgFetchDelay, true);
1674             return ENGINE_EWOULDBLOCK;
1675         }
1676         kstats.logically_deleted = v->isDeleted();
1677         kstats.dirty = v->isDirty();
1678         kstats.exptime = v->getExptime();
1679         kstats.flags = v->getFlags();
1680         kstats.cas = v->getCas();
1681         kstats.vb_state = getState();
1682         return ENGINE_SUCCESS;
1683     } else {
1684         if (eviction == VALUE_ONLY) {
1685             return ENGINE_KEY_ENOENT;
1686         } else {
1687             if (maybeKeyExistsInFilter(key)) {
1688                 return addTempItemAndBGFetch(
1689                         hbl, key, cookie, engine, bgFetchDelay, true);
1690             } else {
1691                 // If bgFetch were false, or bloomfilter predicted that
1692                 // item surely doesn't exist on disk, return ENOENT for
1693                 // getKeyStats().
1694                 return ENGINE_KEY_ENOENT;
1695             }
1696         }
1697     }
1698 }
1699
1700 GetValue VBucket::getLocked(const DocKey& key,
1701                             rel_time_t currentTime,
1702                             uint32_t lockTimeout,
1703                             const void* cookie,
1704                             EventuallyPersistentEngine& engine,
1705                             int bgFetchDelay) {
1706     auto hbl = ht.getLockedBucket(key);
1707     StoredValue* v = fetchValidValue(hbl,
1708                                      key,
1709                                      WantsDeleted::Yes,
1710                                      TrackReference::Yes,
1711                                      QueueExpired::Yes);
1712
1713     if (v) {
1714         if (v->isDeleted() || v->isTempNonExistentItem() ||
1715             v->isTempDeletedItem()) {
1716             return GetValue(NULL, ENGINE_KEY_ENOENT);
1717         }
1718
1719         // if v is locked return error
1720         if (v->isLocked(currentTime)) {
1721             return GetValue(NULL, ENGINE_TMPFAIL);
1722         }
1723
1724         // If the value is not resident, wait for it...
1725         if (!v->isResident()) {
1726             if (cookie) {
1727                 bgFetch(key, cookie, engine, bgFetchDelay);
1728             }
1729             return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
1730         }
1731
1732         // acquire lock and increment cas value
1733         v->lock(currentTime + lockTimeout);
1734
1735         auto it = v->toItem(false, getId());
1736         it->setCas(nextHLCCas());
1737         v->setCas(it->getCas());
1738
1739         return GetValue(it.release());
1740
1741     } else {
1742         // No value found in the hashtable.
1743         switch (eviction) {
1744         case VALUE_ONLY:
1745             return GetValue(NULL, ENGINE_KEY_ENOENT);
1746
1747         case FULL_EVICTION:
1748             if (maybeKeyExistsInFilter(key)) {
1749                 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1750                         hbl, key, cookie, engine, bgFetchDelay, false);
1751                 return GetValue(NULL, ec, -1, true);
1752             } else {
1753                 // As bloomfilter predicted that item surely doesn't exist
1754                 // on disk, return ENOENT for getLocked().
1755                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1756             }
1757         }
1758         return GetValue(); // just to prevent compiler warning
1759     }
1760 }
1761
1762 void VBucket::deletedOnDiskCbk(const Item& queuedItem, bool deleted) {
1763     auto hbl = ht.getLockedBucket(queuedItem.getKey());
1764     StoredValue* v = fetchValidValue(hbl,
1765                                      queuedItem.getKey(),
1766                                      WantsDeleted::Yes,
1767                                      TrackReference::No,
1768                                      QueueExpired::Yes);
1769     // Delete the item in the hash table iff:
1770     //  1. Item is existent in hashtable, and deleted flag is true
1771     //  2. rev seqno of queued item matches rev seqno of hash table item
1772     if (v && v->isDeleted() && (queuedItem.getRevSeqno() == v->getRevSeqno())) {
1773         bool isDeleted = deleteStoredValue(hbl, *v);
1774         if (!isDeleted) {
1775             throw std::logic_error(
1776                     "deletedOnDiskCbk:callback: "
1777                     "Failed to delete key with seqno:" +
1778                     std::to_string(v->getBySeqno()) + "' from bucket " +
1779                     std::to_string(hbl.getBucketNum()));
1780         }
1781
1782         /**
1783          * Deleted items are to be added to the bloomfilter,
1784          * in either eviction policy.
1785          */
1786         addToFilter(queuedItem.getKey());
1787     }
1788
1789     if (deleted) {
1790         ++stats.totalPersisted;
1791         ++opsDelete;
1792     }
1793     doStatsForFlushing(queuedItem, queuedItem.size());
1794     --stats.diskQueueSize;
1795     decrMetaDataDisk(queuedItem);
1796 }
1797
1798 bool VBucket::deleteKey(const DocKey& key) {
1799     auto hbl = ht.getLockedBucket(key);
1800     StoredValue* v = ht.unlocked_find(
1801             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1802     if (!v) {
1803         return false;
1804     }
1805     return deleteStoredValue(hbl, *v);
1806 }
1807
1808 void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
1809                                   uint64_t prevHighSeqno) {
1810     failovers->pruneEntries(rollbackResult.highSeqno);
1811     checkpointManager.clear(*this, rollbackResult.highSeqno);
1812     setPersistedSnapshot(rollbackResult.snapStartSeqno,
1813                          rollbackResult.snapEndSeqno);
1814     incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
1815     setBackfillPhase(false);
1816 }
1817
1818 void VBucket::dump() const {
1819     std::cerr << "VBucket[" << this << "] with state: " << toString(getState())
1820               << " numItems:" << getNumItems()
1821               << " numNonResident:" << getNumNonResidentItems()
1822               << " ht: " << std::endl << "  " << ht << std::endl
1823               << "]" << std::endl;
1824 }
1825
1826 void VBucket::_addStats(bool details, ADD_STAT add_stat, const void* c) {
1827     addStat(NULL, toString(state), add_stat, c);
1828     if (details) {
1829         size_t numItems = getNumItems();
1830         size_t tempItems = getNumTempItems();
1831         addStat("num_items", numItems, add_stat, c);
1832         addStat("num_temp_items", tempItems, add_stat, c);
1833         addStat("num_non_resident", getNumNonResidentItems(), add_stat, c);
1834         addStat("ht_memory", ht.memorySize(), add_stat, c);
1835         addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
1836         addStat("ht_cache_size", ht.cacheSize.load(), add_stat, c);
1837         addStat("num_ejects", ht.getNumEjects(), add_stat, c);
1838         addStat("ops_create", opsCreate.load(), add_stat, c);
1839         addStat("ops_update", opsUpdate.load(), add_stat, c);
1840         addStat("ops_delete", opsDelete.load(), add_stat, c);
1841         addStat("ops_reject", opsReject.load(), add_stat, c);
1842         addStat("queue_size", dirtyQueueSize.load(), add_stat, c);
1843         addStat("backfill_queue_size", getBackfillSize(), add_stat, c);
1844         addStat("queue_memory", dirtyQueueMem.load(), add_stat, c);
1845         addStat("queue_fill", dirtyQueueFill.load(), add_stat, c);
1846         addStat("queue_drain", dirtyQueueDrain.load(), add_stat, c);
1847         addStat("queue_age", getQueueAge(), add_stat, c);
1848         addStat("pending_writes", dirtyQueuePendingWrites.load(), add_stat, c);
1849
1850         addStat("high_seqno", getHighSeqno(), add_stat, c);
1851         addStat("uuid", failovers->getLatestUUID(), add_stat, c);
1852         addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
1853         addStat("bloom_filter", getFilterStatusString().data(),
1854                 add_stat, c);
1855         addStat("bloom_filter_size", getFilterSize(), add_stat, c);
1856         addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
1857         addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
1858         addStat("hp_vb_req_size", getHighPriorityChkSize(), add_stat, c);
1859         hlc.addStats(statPrefix, add_stat, c);
1860     }
1861 }
1862
1863 void VBucket::decrDirtyQueueMem(size_t decrementBy)
1864 {
1865     size_t oldVal, newVal;
1866     do {
1867         oldVal = dirtyQueueMem.load(std::memory_order_relaxed);
1868         if (oldVal < decrementBy) {
1869             newVal = 0;
1870         } else {
1871             newVal = oldVal - decrementBy;
1872         }
1873     } while (!dirtyQueueMem.compare_exchange_strong(oldVal, newVal));
1874 }
1875
1876 void VBucket::decrDirtyQueueAge(uint32_t decrementBy)
1877 {
1878     uint64_t oldVal, newVal;
1879     do {
1880         oldVal = dirtyQueueAge.load(std::memory_order_relaxed);
1881         if (oldVal < decrementBy) {
1882             newVal = 0;
1883         } else {
1884             newVal = oldVal - decrementBy;
1885         }
1886     } while (!dirtyQueueAge.compare_exchange_strong(oldVal, newVal));
1887 }
1888
1889 void VBucket::decrDirtyQueuePendingWrites(size_t decrementBy)
1890 {
1891     size_t oldVal, newVal;
1892     do {
1893         oldVal = dirtyQueuePendingWrites.load(std::memory_order_relaxed);
1894         if (oldVal < decrementBy) {
1895             newVal = 0;
1896         } else {
1897             newVal = oldVal - decrementBy;
1898         }
1899     } while (!dirtyQueuePendingWrites.compare_exchange_strong(oldVal, newVal));
1900 }
1901
1902 std::pair<MutationStatus, VBNotifyCtx> VBucket::processSet(
1903         const HashTable::HashBucketLock& hbl,
1904         StoredValue*& v,
1905         Item& itm,
1906         uint64_t cas,
1907         bool allowExisting,
1908         bool hasMetaData,
1909         const VBQueueItemCtx* queueItmCtx,
1910         bool maybeKeyExists,
1911         bool isReplication) {
1912     if (!hbl.getHTLock()) {
1913         throw std::invalid_argument(
1914                 "VBucket::processSet: htLock not held for "
1915                 "VBucket " +
1916                 std::to_string(getId()));
1917     }
1918
1919     if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
1920         return {MutationStatus::NoMem, VBNotifyCtx()};
1921     }
1922
1923     if (cas && eviction == FULL_EVICTION && maybeKeyExists) {
1924         if (!v || v->isTempInitialItem()) {
1925             return {MutationStatus::NeedBgFetch, VBNotifyCtx()};
1926         }
1927     }
1928
1929     /*
1930      * prior to checking for the lock, we should check if this object
1931      * has expired. If so, then check if CAS value has been provided
1932      * for this set op. In this case the operation should be denied since
1933      * a cas operation for a key that doesn't exist is not a very cool
1934      * thing to do. See MB 3252
1935      */
1936     if (v && v->isExpired(ep_real_time()) && !hasMetaData) {
1937         if (v->isLocked(ep_current_time())) {
1938             v->unlock();
1939         }
1940         if (cas) {
1941             /* item has expired and cas value provided. Deny ! */
1942             return {MutationStatus::NotFound, VBNotifyCtx()};
1943         }
1944     }
1945
1946     if (v) {
1947         if (!allowExisting && !v->isTempItem()) {
1948             return {MutationStatus::InvalidCas, VBNotifyCtx()};
1949         }
1950         if (v->isLocked(ep_current_time())) {
1951             /*
1952              * item is locked, deny if there is cas value mismatch
1953              * or no cas value is provided by the user
1954              */
1955             if (cas != v->getCas()) {
1956                 return {MutationStatus::IsLocked, VBNotifyCtx()};
1957             }
1958             /* allow operation*/
1959             v->unlock();
1960         } else if (cas && cas != v->getCas()) {
1961             if (v->isTempNonExistentItem()) {
1962                 // This is a temporary item which marks a key as non-existent;
1963                 // therefore specifying a non-matching CAS should be exposed
1964                 // as item not existing.
1965                 return {MutationStatus::NotFound, VBNotifyCtx()};
1966             }
1967             if ((v->isTempDeletedItem() || v->isDeleted()) && !itm.isDeleted()) {
1968                 // Existing item is deleted, and we are not replacing it with
1969                 // a (different) deleted value - return not existing.
1970                 return {MutationStatus::NotFound, VBNotifyCtx()};
1971             }
1972             // None of the above special cases; the existing item cannot be
1973             // modified with the specified CAS.
1974             return {MutationStatus::InvalidCas, VBNotifyCtx()};
1975         }
1976         if (!hasMetaData) {
1977             itm.setRevSeqno(v->getRevSeqno() + 1);
1978             /* MB-23530: We must ensure that a replace operation (i.e.
1979              * set with a CAS) /fails/ if the old document is deleted; it
1980              * logically "doesn't exist". However, if the new value is deleted
1981              * this op is a /delete/ with a CAS and we must permit a
1982              * deleted -> deleted transition for Deleted Bodies.
1983              */
1984             if (cas && (v->isDeleted() || v->isTempDeletedItem()) &&
1985                 !itm.isDeleted()) {
1986                 return {MutationStatus::NotFound, VBNotifyCtx()};
1987             }
1988         }
1989
1990         MutationStatus status;
1991         VBNotifyCtx notifyCtx;
1992         std::tie(v, status, notifyCtx) =
1993                 updateStoredValue(hbl, *v, itm, queueItmCtx);
1994         return {status, notifyCtx};
1995     } else if (cas != 0) {
1996         return {MutationStatus::NotFound, VBNotifyCtx()};
1997     } else {
1998         VBNotifyCtx notifyCtx;
1999         std::tie(v, notifyCtx) = addNewStoredValue(hbl, itm, queueItmCtx);
2000         if (!hasMetaData) {
2001             updateRevSeqNoOfNewStoredValue(*v);
2002             itm.setRevSeqno(v->getRevSeqno());
2003         }
2004         return {MutationStatus::WasClean, notifyCtx};
2005     }
2006 }
2007
2008 std::pair<AddStatus, VBNotifyCtx> VBucket::processAdd(
2009         const HashTable::HashBucketLock& hbl,
2010         StoredValue*& v,
2011         Item& itm,
2012         bool maybeKeyExists,
2013         bool isReplication,
2014         const VBQueueItemCtx* queueItmCtx) {
2015     if (!hbl.getHTLock()) {
2016         throw std::invalid_argument(
2017                 "VBucket::processAdd: htLock not held for "
2018                 "VBucket " +
2019                 std::to_string(getId()));
2020     }
2021
2022     if (v && !v->isDeleted() && !v->isExpired(ep_real_time()) &&
2023         !v->isTempItem()) {
2024         return {AddStatus::Exists, VBNotifyCtx()};
2025     }
2026     if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
2027         return {AddStatus::NoMem, VBNotifyCtx()};
2028     }
2029
2030     std::pair<AddStatus, VBNotifyCtx> rv = {AddStatus::Success, VBNotifyCtx()};
2031
2032     if (v) {
2033         if (v->isTempInitialItem() && eviction == FULL_EVICTION &&
2034             maybeKeyExists) {
2035             // Need to figure out if an item exists on disk
2036             return {AddStatus::BgFetch, VBNotifyCtx()};
2037         }
2038
2039         rv.first = (v->isDeleted() || v->isExpired(ep_real_time()))
2040                            ? AddStatus::UnDel
2041                            : AddStatus::Success;
2042
2043         if (v->isTempDeletedItem()) {
2044             itm.setRevSeqno(v->getRevSeqno() + 1);
2045         } else {
2046             itm.setRevSeqno(ht.getMaxDeletedRevSeqno() + 1);
2047         }
2048
2049         if (!v->isTempItem()) {
2050             itm.setRevSeqno(v->getRevSeqno() + 1);
2051         }
2052
2053         std::tie(v, std::ignore, rv.second) =
2054                 updateStoredValue(hbl, *v, itm, queueItmCtx);
2055     } else {
2056         if (itm.getBySeqno() != StoredValue::state_temp_init) {
2057             if (eviction == FULL_EVICTION && maybeKeyExists) {
2058                 return {AddStatus::AddTmpAndBgFetch, VBNotifyCtx()};
2059             }
2060         }
2061         std::tie(v, rv.second) = addNewStoredValue(hbl, itm, queueItmCtx);
2062         updateRevSeqNoOfNewStoredValue(*v);
2063         itm.setRevSeqno(v->getRevSeqno());
2064         if (v->isTempItem()) {
2065             rv.first = AddStatus::BgFetch;
2066         }
2067     }
2068
2069     if (v->isTempItem()) {
2070         v->setNRUValue(MAX_NRU_VALUE);
2071     }
2072     return rv;
2073 }
2074
2075 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2076 VBucket::processSoftDelete(const HashTable::HashBucketLock& hbl,
2077                            StoredValue& v,
2078                            uint64_t cas,
2079                            const ItemMetaData& metadata,
2080                            const VBQueueItemCtx& queueItmCtx,
2081                            bool use_meta,
2082                            uint64_t bySeqno) {
2083     if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2084         return std::make_tuple(MutationStatus::NeedBgFetch, &v, VBNotifyCtx());
2085     }
2086
2087     if (v.isLocked(ep_current_time())) {
2088         if (cas != v.getCas()) {
2089             return std::make_tuple(MutationStatus::IsLocked, &v, VBNotifyCtx());
2090         }
2091         v.unlock();
2092     }
2093
2094     if (cas != 0 && cas != v.getCas()) {
2095         return std::make_tuple(MutationStatus::InvalidCas, &v, VBNotifyCtx());
2096     }
2097
2098     /* allow operation */
2099     v.unlock();
2100
2101     MutationStatus rv =
2102             v.isDirty() ? MutationStatus::WasDirty : MutationStatus::WasClean;
2103
2104     if (use_meta) {
2105         v.setCas(metadata.cas);
2106         v.setFlags(metadata.flags);
2107         v.setExptime(metadata.exptime);
2108     }
2109
2110     v.setRevSeqno(metadata.revSeqno);
2111     VBNotifyCtx notifyCtx;
2112     StoredValue* newSv;
2113     std::tie(newSv, notifyCtx) =
2114             softDeleteStoredValue(hbl,
2115                                   v,
2116                                   /*onlyMarkDeleted*/ false,
2117                                   queueItmCtx,
2118                                   bySeqno);
2119     ht.updateMaxDeletedRevSeqno(metadata.revSeqno);
2120     return std::make_tuple(rv, newSv, notifyCtx);
2121 }
2122
2123 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2124 VBucket::processExpiredItem(const HashTable::HashBucketLock& hbl,
2125                             StoredValue& v) {
2126     if (!hbl.getHTLock()) {
2127         throw std::invalid_argument(
2128                 "VBucket::processExpiredItem: htLock not held for VBucket " +
2129                 std::to_string(getId()));
2130     }
2131
2132     if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2133         return std::make_tuple(MutationStatus::NeedBgFetch,
2134                                &v,
2135                                queueDirty(v,
2136                                           GenerateBySeqno::Yes,
2137                                           GenerateCas::Yes,
2138                                           /*isBackfillItem*/ false));
2139     }
2140
2141     /* If the datatype is XATTR, mark the item as deleted
2142      * but don't delete the value as system xattrs can
2143      * still be queried by mobile clients even after
2144      * deletion.
2145      * TODO: The current implementation is inefficient
2146      * but functionally correct and for performance reasons
2147      * only the system xattrs need to be stored.
2148      */
2149     value_t value = v.getValue();
2150     bool onlyMarkDeleted =
2151             value && mcbp::datatype::is_xattr(value->getDataType());
2152     v.setRevSeqno(v.getRevSeqno() + 1);
2153     VBNotifyCtx notifyCtx;
2154     StoredValue* newSv;
2155     std::tie(newSv, notifyCtx) =
2156             softDeleteStoredValue(hbl,
2157                                   v,
2158                                   onlyMarkDeleted,
2159                                   VBQueueItemCtx(GenerateBySeqno::Yes,
2160                                                  GenerateCas::Yes,
2161                                                  TrackCasDrift::No,
2162                                                  /*isBackfillItem*/ false,
2163                                                  nullptr /* no pre link */),
2164                                   v.getBySeqno());
2165     ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno() + 1);
2166     return std::make_tuple(MutationStatus::NotFound, newSv, notifyCtx);
2167 }
2168
2169 bool VBucket::deleteStoredValue(const HashTable::HashBucketLock& hbl,
2170                                 StoredValue& v) {
2171     if (!v.isDeleted() && v.isLocked(ep_current_time())) {
2172         return false;
2173     }
2174
2175     /* StoredValue deleted here. If any other in-memory data structures are
2176        using the StoredValue intrusively then they must have handled the delete
2177        by this point */
2178     ht.unlocked_del(hbl, v.getKey());
2179     return true;
2180 }
2181
2182 AddStatus VBucket::addTempStoredValue(const HashTable::HashBucketLock& hbl,
2183                                       const DocKey& key,
2184                                       bool isReplication) {
2185     uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_RAW_BYTES};
2186     static_assert(sizeof(ext_meta) == 1,
2187                   "VBucket::addTempStoredValue(): expected "
2188                   "EXT_META_LEN to be 1");
2189     Item itm(key,
2190              /*flags*/ 0,
2191              /*exp*/ 0,
2192              /*data*/ NULL,
2193              /*size*/ 0,
2194              ext_meta,
2195              sizeof(ext_meta),
2196              0,
2197              StoredValue::state_temp_init);
2198
2199     /* if a temp item for a possibly deleted, set it non-resident by resetting
2200        the value cuz normally a new item added is considered resident which
2201        does not apply for temp item. */
2202     StoredValue* v = nullptr;
2203     return processAdd(hbl, v, itm, true, isReplication, nullptr).first;
2204 }
2205
2206 void VBucket::notifyNewSeqno(const VBNotifyCtx& notifyCtx) {
2207     if (newSeqnoCb) {
2208         newSeqnoCb->callback(getId(), notifyCtx);
2209     }
2210 }
2211
2212 /*
2213  * Queue the item to the checkpoint and return the seqno the item was
2214  * allocated.
2215  */
2216 int64_t VBucket::queueItem(Item* item, OptionalSeqno seqno) {
2217     item->setVBucketId(id);
2218     queued_item qi(item);
2219     checkpointManager.queueDirty(
2220             *this,
2221             qi,
2222             seqno ? GenerateBySeqno::No : GenerateBySeqno::Yes,
2223             GenerateCas::Yes,
2224             nullptr /* No pre link step as this is for system events */);
2225     VBNotifyCtx notifyCtx;
2226     // If the seqno is initialized, skip replication notification
2227     notifyCtx.notifyReplication = !seqno.is_initialized();
2228     notifyCtx.notifyFlusher = true;
2229     notifyCtx.bySeqno = qi->getBySeqno();
2230     notifyNewSeqno(notifyCtx);
2231     return qi->getBySeqno();
2232 }
2233
2234 VBNotifyCtx VBucket::queueDirty(StoredValue& v,
2235                                 const VBQueueItemCtx& queueItmCtx) {
2236     if (queueItmCtx.trackCasDrift == TrackCasDrift::Yes) {
2237         setMaxCasAndTrackDrift(v.getCas());
2238     }
2239     return queueDirty(v,
2240                       queueItmCtx.genBySeqno,
2241                       queueItmCtx.genCas,
2242                       queueItmCtx.isBackfillItem,
2243                       queueItmCtx.preLinkDocumentContext);
2244 }
2245
2246 void VBucket::updateRevSeqNoOfNewStoredValue(StoredValue& v) {
2247     /**
2248      * Possibly, this item is being recreated. Conservatively assign it
2249      * a seqno that is greater than the greatest seqno of all deleted
2250      * items seen so far.
2251      */
2252     uint64_t seqno = ht.getMaxDeletedRevSeqno();
2253     if (!v.isTempItem()) {
2254         ++seqno;
2255     }
2256     v.setRevSeqno(seqno);
2257 }
2258
2259 void VBucket::addHighPriorityVBEntry(uint64_t seqnoOrChkId,
2260                                      const void* cookie,
2261                                      HighPriorityVBNotify reqType) {
2262     std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2263     hpVBReqs.push_back(HighPriorityVBEntry(cookie, seqnoOrChkId, reqType));
2264     numHpVBReqs.store(hpVBReqs.size());
2265
2266     LOG(EXTENSION_LOG_NOTICE,
2267         "Added high priority async request %s "
2268         "for vb:%" PRIu16 ", Check for:%" PRIu64 ", "
2269         "Persisted upto:%" PRIu64 ", cookie:%p",
2270         to_string(reqType).c_str(),
2271         getId(),
2272         seqnoOrChkId,
2273         getPersistenceSeqno(),
2274         cookie);
2275 }
2276
2277 std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifications(
2278         EventuallyPersistentEngine& engine,
2279         uint64_t idNum,
2280         HighPriorityVBNotify notifyType) {
2281     std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2282     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2283
2284     auto entry = hpVBReqs.begin();
2285
2286     while (entry != hpVBReqs.end()) {
2287         if (notifyType != entry->reqType) {
2288             ++entry;
2289             continue;
2290         }
2291
2292         std::string logStr(to_string(notifyType));
2293
2294         hrtime_t wall_time(gethrtime() - entry->start);
2295         size_t spent = wall_time / 1000000000;
2296         if (entry->id <= idNum) {
2297             toNotify[entry->cookie] = ENGINE_SUCCESS;
2298             stats.chkPersistenceHisto.add(wall_time / 1000);
2299             adjustCheckpointFlushTimeout(wall_time / 1000000000);
2300             LOG(EXTENSION_LOG_NOTICE,
2301                 "Notified the completion of %s "
2302                 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2303                 ", "
2304                 "Persisted upto: %" PRIu64 ", cookie %p",
2305                 logStr.c_str(),
2306                 getId(),
2307                 entry->id,
2308                 idNum,
2309                 entry->cookie);
2310             entry = hpVBReqs.erase(entry);
2311         } else if (spent > getCheckpointFlushTimeout()) {
2312             adjustCheckpointFlushTimeout(spent);
2313             engine.storeEngineSpecific(entry->cookie, NULL);
2314             toNotify[entry->cookie] = ENGINE_TMPFAIL;
2315             LOG(EXTENSION_LOG_WARNING,
2316                 "Notified the timeout on %s "
2317                 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2318                 ", "
2319                 "Persisted upto: %" PRIu64 ", cookie %p",
2320                 logStr.c_str(),
2321                 getId(),
2322                 entry->id,
2323                 idNum,
2324                 entry->cookie);
2325             entry = hpVBReqs.erase(entry);
2326         } else {
2327             ++entry;
2328         }
2329     }
2330     numHpVBReqs.store(hpVBReqs.size());
2331     return toNotify;
2332 }
2333
2334 std::map<const void*, ENGINE_ERROR_CODE> VBucket::tmpFailAndGetAllHpNotifies(
2335         EventuallyPersistentEngine& engine) {
2336     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2337
2338     LockHolder lh(hpVBReqsMutex);
2339
2340     for (auto& entry : hpVBReqs) {
2341         toNotify[entry.cookie] = ENGINE_TMPFAIL;
2342         engine.storeEngineSpecific(entry.cookie, NULL);
2343     }
2344     hpVBReqs.clear();
2345
2346     return toNotify;
2347 }
2348
2349 void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
2350     size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
2351
2352     if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
2353         chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
2354     } else if (wall_time <= middle) {
2355         chkFlushTimeout = middle;
2356     } else {
2357         chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
2358     }
2359 }
2360
2361 size_t VBucket::getCheckpointFlushTimeout() {
2362     return chkFlushTimeout;
2363 }
2364
2365 std::unique_ptr<Item> VBucket::pruneXattrDocument(
2366         StoredValue& v, const ItemMetaData& itemMeta) {
2367     // Need to take a copy of the value, prune it, and add it back
2368
2369     // Create work-space document
2370     std::vector<uint8_t> workspace(v.getValue()->vlength());
2371     std::copy_n(v.getValue()->getData(),
2372                 v.getValue()->vlength(),
2373                 workspace.begin());
2374
2375     // Now attach to the XATTRs in the document
2376     auto sz = cb::xattr::get_body_offset(
2377             {reinterpret_cast<char*>(workspace.data()), workspace.size()});
2378
2379     cb::xattr::Blob xattr({workspace.data(), sz});
2380     xattr.prune_user_keys();
2381
2382     auto prunedXattrs = xattr.finalize();
2383
2384     if (prunedXattrs.size()) {
2385         // Something remains - Create a Blob and copy-in just the XATTRs
2386         auto newValue =
2387                 Blob::New(reinterpret_cast<const char*>(prunedXattrs.data()),
2388                           prunedXattrs.size(),
2389                           const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(
2390                                   v.getValue()->getExtMeta())),
2391                           v.getValue()->getExtLen());
2392
2393         return std::make_unique<Item>(v.getKey(),
2394                                       itemMeta.flags,
2395                                       itemMeta.exptime,
2396                                       newValue,
2397                                       itemMeta.cas,
2398                                       v.getBySeqno(),
2399                                       getId(),
2400                                       itemMeta.revSeqno);
2401     } else {
2402         return {};
2403     }
2404 }