DEBUG: Add HashTable::operator<<, expand StoredValue::operator<<
[ep-engine.git] / src / vbucket.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <functional>
21 #include <list>
22 #include <set>
23 #include <string>
24 #include <vector>
25
26 #include "atomic.h"
27 #include "bgfetcher.h"
28 #include "conflict_resolution.h"
29 #include "ep_engine.h"
30 #include "ep_types.h"
31 #include "failover-table.h"
32 #include "flusher.h"
33 #include "pre_link_document_context.h"
34
35 #define STATWRITER_NAMESPACE vbucket
36 #include "statwriter.h"
37 #undef STATWRITER_NAMESPACE
38 #include "stored_value_factories.h"
39
40 #include "vbucket.h"
41
42 #include <xattr/blob.h>
43 #include <xattr/utils.h>
44
45 /* Macros */
46 const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
47 const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
48
49 /* Statics definitions */
50 std::atomic<size_t> VBucket::chkFlushTimeout(MIN_CHK_FLUSH_TIMEOUT);
51
52 VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
53     std::vector<uint16_t> tmp(acceptable.size() + other.size());
54     std::vector<uint16_t>::iterator end;
55     end = std::set_symmetric_difference(acceptable.begin(),
56                                         acceptable.end(),
57                                         other.acceptable.begin(),
58                                         other.acceptable.end(),
59                                         tmp.begin());
60     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
61 }
62
63 VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
64                                                                         const {
65     std::vector<uint16_t> tmp(acceptable.size() + other.size());
66     std::vector<uint16_t>::iterator end;
67
68     end = std::set_intersection(acceptable.begin(), acceptable.end(),
69                                 other.acceptable.begin(),
70                                 other.acceptable.end(),
71                                 tmp.begin());
72     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
73 }
74
75 static bool isRange(std::set<uint16_t>::const_iterator it,
76                     const std::set<uint16_t>::const_iterator &end,
77                     size_t &length)
78 {
79     length = 0;
80     for (uint16_t val = *it;
81          it != end && (val + length) == *it;
82          ++it, ++length) {
83         // empty
84     }
85
86     --length;
87
88     return length > 1;
89 }
90
91 std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
92 {
93     std::set<uint16_t>::const_iterator it;
94
95     if (filter.acceptable.empty()) {
96         out << "{ empty }";
97     } else {
98         bool needcomma = false;
99         out << "{ ";
100         for (it = filter.acceptable.begin();
101              it != filter.acceptable.end();
102              ++it) {
103             if (needcomma) {
104                 out << ", ";
105             }
106
107             size_t length;
108             if (isRange(it, filter.acceptable.end(), length)) {
109                 std::set<uint16_t>::iterator last = it;
110                 for (size_t i = 0; i < length; ++i) {
111                     ++last;
112                 }
113                 out << "[" << *it << "," << *last << "]";
114                 it = last;
115             } else {
116                 out << *it;
117             }
118             needcomma = true;
119         }
120         out << " }";
121     }
122
123     return out;
124 }
125
126 const vbucket_state_t VBucket::ACTIVE =
127                      static_cast<vbucket_state_t>(htonl(vbucket_state_active));
128 const vbucket_state_t VBucket::REPLICA =
129                     static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
130 const vbucket_state_t VBucket::PENDING =
131                     static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
132 const vbucket_state_t VBucket::DEAD =
133                     static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
134
135 VBucket::VBucket(id_type i,
136                  vbucket_state_t newState,
137                  EPStats& st,
138                  CheckpointConfig& chkConfig,
139                  int64_t lastSeqno,
140                  uint64_t lastSnapStart,
141                  uint64_t lastSnapEnd,
142                  std::unique_ptr<FailoverTable> table,
143                  std::shared_ptr<Callback<id_type> > flusherCb,
144                  std::unique_ptr<AbstractStoredValueFactory> valFact,
145                  NewSeqnoCallback newSeqnoCb,
146                  Configuration& config,
147                  item_eviction_policy_t evictionPolicy,
148                  vbucket_state_t initState,
149                  uint64_t purgeSeqno,
150                  uint64_t maxCas,
151                  const std::string& collectionsManifest)
152     : ht(st, std::move(valFact)),
153       checkpointManager(st,
154                         i,
155                         chkConfig,
156                         lastSeqno,
157                         lastSnapStart,
158                         lastSnapEnd,
159                         flusherCb),
160       failovers(std::move(table)),
161       opsCreate(0),
162       opsUpdate(0),
163       opsDelete(0),
164       opsReject(0),
165       dirtyQueueSize(0),
166       dirtyQueueMem(0),
167       dirtyQueueFill(0),
168       dirtyQueueDrain(0),
169       dirtyQueueAge(0),
170       dirtyQueuePendingWrites(0),
171       metaDataDisk(0),
172       numExpiredItems(0),
173       eviction(evictionPolicy),
174       stats(st),
175       persistenceSeqno(0),
176       numHpVBReqs(0),
177       id(i),
178       state(newState),
179       initialState(initState),
180       purge_seqno(purgeSeqno),
181       takeover_backed_up(false),
182       persisted_snapshot_start(lastSnapStart),
183       persisted_snapshot_end(lastSnapEnd),
184       rollbackItemCount(0),
185       hlc(maxCas,
186           std::chrono::microseconds(config.getHlcDriftAheadThresholdUs()),
187           std::chrono::microseconds(config.getHlcDriftBehindThresholdUs())),
188       statPrefix("vb_" + std::to_string(i)),
189       persistenceCheckpointId(0),
190       bucketCreation(false),
191       bucketDeletion(false),
192       newSeqnoCb(std::move(newSeqnoCb)),
193       manifest(collectionsManifest) {
194     if (config.getConflictResolutionType().compare("lww") == 0) {
195         conflictResolver.reset(new LastWriteWinsResolution());
196     } else {
197         conflictResolver.reset(new RevisionSeqnoResolution());
198     }
199
200     backfill.isBackfillPhase = false;
201     pendingOpsStart = 0;
202     stats.memOverhead->fetch_add(sizeof(VBucket)
203                                 + ht.memorySize() + sizeof(CheckpointManager));
204     LOG(EXTENSION_LOG_NOTICE,
205         "VBucket: created vbucket:%" PRIu16 " with state:%s "
206                 "initialState:%s "
207                 "lastSeqno:%" PRIu64 " "
208                 "lastSnapshot:{%" PRIu64 ",%" PRIu64 "} "
209                 "persisted_snapshot:{%" PRIu64 ",%" PRIu64 "} "
210                 "max_cas:%" PRIu64,
211         id, VBucket::toString(state), VBucket::toString(initialState),
212         lastSeqno, lastSnapStart, lastSnapEnd,
213         persisted_snapshot_start, persisted_snapshot_end,
214         getMaxCas());
215 }
216
217 VBucket::~VBucket() {
218     if (!pendingOps.empty()) {
219         LOG(EXTENSION_LOG_WARNING,
220             "~Vbucket(): vbucket:%" PRIu16 " has %ld pending ops",
221             id,
222             pendingOps.size());
223     }
224
225     stats.diskQueueSize.fetch_sub(dirtyQueueSize.load());
226     stats.vbBackfillQueueSize.fetch_sub(getBackfillSize());
227
228     // Clear out the bloomfilter(s)
229     clearFilter();
230
231     stats.memOverhead->fetch_sub(sizeof(VBucket) + ht.memorySize() +
232                                 sizeof(CheckpointManager));
233
234     LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
235 }
236
237 void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
238                          ENGINE_ERROR_CODE code) {
239     std::unique_lock<std::mutex> lh(pendingOpLock);
240
241     if (pendingOpsStart > 0) {
242         hrtime_t now = gethrtime();
243         if (now > pendingOpsStart) {
244             hrtime_t d = (now - pendingOpsStart) / 1000;
245             stats.pendingOpsHisto.add(d);
246             atomic_setIfBigger(stats.pendingOpsMaxDuration, d);
247         }
248     } else {
249         return;
250     }
251
252     pendingOpsStart = 0;
253     stats.pendingOps.fetch_sub(pendingOps.size());
254     atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
255
256     while (!pendingOps.empty()) {
257         const void *pendingOperation = pendingOps.back();
258         pendingOps.pop_back();
259         // We don't want to hold the pendingOpLock when
260         // calling notifyIOComplete.
261         lh.unlock();
262         engine.notifyIOComplete(pendingOperation, code);
263         lh.lock();
264     }
265
266     LOG(EXTENSION_LOG_INFO,
267         "Fired pendings ops for vbucket %" PRIu16 " in state %s\n",
268         id, VBucket::toString(state));
269 }
270
271 void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
272
273     if (state == vbucket_state_active) {
274         fireAllOps(engine, ENGINE_SUCCESS);
275     } else if (state == vbucket_state_pending) {
276         // Nothing
277     } else {
278         fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
279     }
280 }
281
282 void VBucket::setState(vbucket_state_t to) {
283     vbucket_state_t oldstate;
284     {
285         WriterLockHolder wlh(stateLock);
286         oldstate = state;
287
288         if (to == vbucket_state_active &&
289             checkpointManager.getOpenCheckpointId() < 2) {
290             checkpointManager.setOpenCheckpointId(2);
291         }
292
293         LOG(EXTENSION_LOG_NOTICE,
294             "VBucket::setState: transitioning vbucket:%" PRIu16 " from:%s to:%s",
295             id, VBucket::toString(oldstate), VBucket::toString(to));
296
297         state = to;
298     }
299 }
300
301 vbucket_state VBucket::getVBucketState() const {
302      auto persisted_range = getPersistedSnapshot();
303
304      return vbucket_state{getState(),
305                           getPersistenceCheckpointId(), 0, getHighSeqno(),
306                           getPurgeSeqno(),
307                           persisted_range.start, persisted_range.end,
308                           getMaxCas(), failovers->toJSON()};
309 }
310
311
312
313 void VBucket::doStatsForQueueing(const Item& qi, size_t itemBytes)
314 {
315     ++dirtyQueueSize;
316     dirtyQueueMem.fetch_add(sizeof(Item));
317     ++dirtyQueueFill;
318     dirtyQueueAge.fetch_add(qi.getQueuedTime());
319     dirtyQueuePendingWrites.fetch_add(itemBytes);
320 }
321
322 void VBucket::doStatsForFlushing(const Item& qi, size_t itemBytes) {
323     --dirtyQueueSize;
324     decrDirtyQueueMem(sizeof(Item));
325     ++dirtyQueueDrain;
326     decrDirtyQueueAge(qi.getQueuedTime());
327     decrDirtyQueuePendingWrites(itemBytes);
328 }
329
330 void VBucket::incrMetaDataDisk(const Item& qi) {
331     metaDataDisk.fetch_add(qi.getKey().size() + sizeof(ItemMetaData));
332 }
333
334 void VBucket::decrMetaDataDisk(const Item& qi) {
335     // assume couchstore remove approx this much data from disk
336     metaDataDisk.fetch_sub((qi.getKey().size() + sizeof(ItemMetaData)));
337 }
338
339 void VBucket::resetStats() {
340     opsCreate.store(0);
341     opsUpdate.store(0);
342     opsDelete.store(0);
343     opsReject.store(0);
344
345     stats.diskQueueSize.fetch_sub(dirtyQueueSize.exchange(0));
346     dirtyQueueMem.store(0);
347     dirtyQueueFill.store(0);
348     dirtyQueueAge.store(0);
349     dirtyQueuePendingWrites.store(0);
350     dirtyQueueDrain.store(0);
351
352     hlc.resetStats();
353 }
354
355 template <typename T>
356 void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
357                       const void *c) {
358     std::string stat = statPrefix;
359     if (nm != NULL) {
360         add_prefixed_stat(statPrefix, nm, val, add_stat, c);
361     } else {
362         add_casted_stat(statPrefix.data(), val, add_stat, c);
363     }
364 }
365
366 void VBucket::handlePreExpiry(StoredValue& v) {
367     value_t value = v.getValue();
368     if (value) {
369         std::unique_ptr<Item> itm(v.toItem(false, id));
370         item_info itm_info;
371         EventuallyPersistentEngine* engine = ObjectRegistry::getCurrentEngine();
372         itm_info = itm->toItemInfo(failovers->getLatestUUID());
373         value_t new_val(Blob::Copy(*value));
374         itm->setValue(new_val);
375
376         SERVER_HANDLE_V1* sapi = engine->getServerApi();
377         /* TODO: In order to minimize allocations, the callback needs to
378          * allocate an item whose value size will be exactly the size of the
379          * value after pre-expiry is performed.
380          */
381         if (sapi->document->pre_expiry(itm_info)) {
382             char* extMeta = const_cast<char *>(v.getValue()->getExtMeta());
383             Item new_item(v.getKey(), v.getFlags(), v.getExptime(),
384                           itm_info.value[0].iov_base, itm_info.value[0].iov_len,
385                           reinterpret_cast<uint8_t*>(extMeta),
386                           v.getValue()->getExtLen(), v.getCas(),
387                           v.getBySeqno(), id, v.getRevSeqno(),
388                           v.getNRUValue());
389
390             new_item.setDeleted();
391             v.setValue(new_item, ht);
392         }
393     }
394 }
395
396 size_t VBucket::getNumNonResidentItems() const {
397     if (eviction == VALUE_ONLY) {
398         return ht.getNumInMemoryNonResItems();
399     } else {
400         size_t num_items = ht.getNumItems();
401         size_t num_res_items = ht.getNumInMemoryItems() -
402                                ht.getNumInMemoryNonResItems();
403         return num_items > num_res_items ? (num_items - num_res_items) : 0;
404     }
405 }
406
407
408 uint64_t VBucket::getPersistenceCheckpointId() const {
409     return persistenceCheckpointId.load();
410 }
411
412 void VBucket::setPersistenceCheckpointId(uint64_t checkpointId) {
413     persistenceCheckpointId.store(checkpointId);
414 }
415
416 void VBucket::markDirty(const DocKey& key) {
417     auto hbl = ht.getLockedBucket(key);
418     StoredValue* v = ht.unlocked_find(
419             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::Yes);
420     if (v) {
421         v->markDirty();
422     } else {
423         LOG(EXTENSION_LOG_WARNING, "markDirty: Error marking dirty, a key "
424             "is missing from vb:%" PRIu16, id);
425     }
426 }
427
428 bool VBucket::isResidentRatioUnderThreshold(float threshold) {
429     if (eviction != FULL_EVICTION) {
430         throw std::invalid_argument("VBucket::isResidentRatioUnderThreshold: "
431                 "policy (which is " + std::to_string(eviction) +
432                 ") must be FULL_EVICTION");
433     }
434     size_t num_items = getNumItems();
435     size_t num_non_resident_items = getNumNonResidentItems();
436     if (threshold >= ((float)(num_items - num_non_resident_items) /
437                                                                 num_items)) {
438         return true;
439     } else {
440         return false;
441     }
442 }
443
444 void VBucket::createFilter(size_t key_count, double probability) {
445     // Create the actual bloom filter upon vbucket creation during
446     // scenarios:
447     //      - Bucket creation
448     //      - Rebalance
449     LockHolder lh(bfMutex);
450     if (bFilter == nullptr && tempFilter == nullptr) {
451         bFilter = std::make_unique<BloomFilter>(key_count, probability,
452                                         BFILTER_ENABLED);
453     } else {
454         LOG(EXTENSION_LOG_WARNING, "(vb %" PRIu16 ") Bloom filter / Temp filter"
455             " already exist!", id);
456     }
457 }
458
459 void VBucket::initTempFilter(size_t key_count, double probability) {
460     // Create a temp bloom filter with status as COMPACTING,
461     // if the main filter is found to exist, set its state to
462     // COMPACTING as well.
463     LockHolder lh(bfMutex);
464     tempFilter = std::make_unique<BloomFilter>(key_count, probability,
465                                      BFILTER_COMPACTING);
466     if (bFilter) {
467         bFilter->setStatus(BFILTER_COMPACTING);
468     }
469 }
470
471 void VBucket::addToFilter(const DocKey& key) {
472     LockHolder lh(bfMutex);
473     if (bFilter) {
474         bFilter->addKey(key);
475     }
476
477     // If the temp bloom filter is not found to be NULL,
478     // it means that compaction is running on the particular
479     // vbucket. Therefore add the key to the temp filter as
480     // well, as once compaction completes the temp filter
481     // will replace the main bloom filter.
482     if (tempFilter) {
483         tempFilter->addKey(key);
484     }
485 }
486
487 bool VBucket::maybeKeyExistsInFilter(const DocKey& key) {
488     LockHolder lh(bfMutex);
489     if (bFilter) {
490         return bFilter->maybeKeyExists(key);
491     } else {
492         // If filter doesn't exist, allow the BgFetch to go through.
493         return true;
494     }
495 }
496
497 bool VBucket::isTempFilterAvailable() {
498     LockHolder lh(bfMutex);
499     if (tempFilter &&
500         (tempFilter->getStatus() == BFILTER_COMPACTING ||
501          tempFilter->getStatus() == BFILTER_ENABLED)) {
502         return true;
503     } else {
504         return false;
505     }
506 }
507
508 void VBucket::addToTempFilter(const DocKey& key) {
509     // Keys will be added to only the temp filter during
510     // compaction.
511     LockHolder lh(bfMutex);
512     if (tempFilter) {
513         tempFilter->addKey(key);
514     }
515 }
516
517 void VBucket::swapFilter() {
518     // Delete the main bloom filter and replace it with
519     // the temp filter that was populated during compaction,
520     // only if the temp filter's state is found to be either at
521     // COMPACTING or ENABLED (if in the case the user enables
522     // bloomfilters for some reason while compaction was running).
523     // Otherwise, it indicates that the filter's state was
524     // possibly disabled during compaction, therefore clear out
525     // the temp filter. If it gets enabled at some point, a new
526     // bloom filter will be made available after the next
527     // compaction.
528
529     LockHolder lh(bfMutex);
530     if (tempFilter) {
531         bFilter.reset();
532
533         if (tempFilter->getStatus() == BFILTER_COMPACTING ||
534              tempFilter->getStatus() == BFILTER_ENABLED) {
535             bFilter = std::move(tempFilter);
536             bFilter->setStatus(BFILTER_ENABLED);
537         }
538         tempFilter.reset();
539     }
540 }
541
542 void VBucket::clearFilter() {
543     LockHolder lh(bfMutex);
544     bFilter.reset();
545     tempFilter.reset();
546 }
547
548 void VBucket::setFilterStatus(bfilter_status_t to) {
549     LockHolder lh(bfMutex);
550     if (bFilter) {
551         bFilter->setStatus(to);
552     }
553     if (tempFilter) {
554         tempFilter->setStatus(to);
555     }
556 }
557
558 std::string VBucket::getFilterStatusString() {
559     LockHolder lh(bfMutex);
560     if (bFilter) {
561         return bFilter->getStatusString();
562     } else if (tempFilter) {
563         return tempFilter->getStatusString();
564     } else {
565         return "DOESN'T EXIST";
566     }
567 }
568
569 size_t VBucket::getFilterSize() {
570     LockHolder lh(bfMutex);
571     if (bFilter) {
572         return bFilter->getFilterSize();
573     } else {
574         return 0;
575     }
576 }
577
578 size_t VBucket::getNumOfKeysInFilter() {
579     LockHolder lh(bfMutex);
580     if (bFilter) {
581         return bFilter->getNumOfKeysInFilter();
582     } else {
583         return 0;
584     }
585 }
586
587 VBNotifyCtx VBucket::queueDirty(
588         StoredValue& v,
589         const GenerateBySeqno generateBySeqno,
590         const GenerateCas generateCas,
591         const bool isBackfillItem,
592         PreLinkDocumentContext* preLinkDocumentContext) {
593     VBNotifyCtx notifyCtx;
594
595     queued_item qi(v.toItem(false, getId()));
596
597     if (isBackfillItem) {
598         queueBackfillItem(qi, generateBySeqno);
599         notifyCtx.notifyFlusher = true;
600         /* During backfill on a TAP receiver we need to update the snapshot
601          range in the checkpoint. Has to be done here because in case of TAP
602          backfill, above, we use vb.queueBackfillItem() instead of
603          vb.checkpointManager.queueDirty() */
604         if (generateBySeqno == GenerateBySeqno::Yes) {
605             checkpointManager.resetSnapshotRange();
606         }
607     } else {
608         notifyCtx.notifyFlusher =
609                 checkpointManager.queueDirty(*this,
610                                              qi,
611                                              generateBySeqno,
612                                              generateCas,
613                                              preLinkDocumentContext);
614         notifyCtx.notifyReplication = true;
615         if (GenerateCas::Yes == generateCas) {
616             v.setCas(qi->getCas());
617         }
618     }
619
620     v.setBySeqno(qi->getBySeqno());
621     notifyCtx.bySeqno = qi->getBySeqno();
622
623     return notifyCtx;
624 }
625
626 StoredValue* VBucket::fetchValidValue(HashTable::HashBucketLock& hbl,
627                                       const DocKey& key,
628                                       WantsDeleted wantsDeleted,
629                                       TrackReference trackReference,
630                                       QueueExpired queueExpired) {
631     if (!hbl.getHTLock()) {
632         throw std::logic_error(
633                 "Hash bucket lock not held in "
634                 "VBucket::fetchValidValue() for hash bucket: " +
635                 std::to_string(hbl.getBucketNum()) + "for key: " +
636                 std::string(reinterpret_cast<const char*>(key.data()),
637                             key.size()));
638     }
639     StoredValue* v = ht.unlocked_find(
640             key, hbl.getBucketNum(), wantsDeleted, trackReference);
641     if (v && !v->isDeleted() && !v->isTempItem()) {
642         // In the deleted case, we ignore expiration time.
643         if (v->isExpired(ep_real_time())) {
644             if (getState() != vbucket_state_active) {
645                 return wantsDeleted == WantsDeleted::Yes ? v : NULL;
646             }
647
648             // queueDirty only allowed on active VB
649             if (queueExpired == QueueExpired::Yes &&
650                 getState() == vbucket_state_active) {
651                 incExpirationStat(ExpireBy::Access);
652                 handlePreExpiry(*v);
653                 VBNotifyCtx notifyCtx;
654                 std::tie(std::ignore, v, notifyCtx) =
655                         processExpiredItem(hbl, *v);
656                 notifyNewSeqno(notifyCtx);
657             }
658             return wantsDeleted == WantsDeleted::Yes ? v : NULL;
659         }
660     }
661     return v;
662 }
663
664 void VBucket::incExpirationStat(const ExpireBy source) {
665     switch (source) {
666     case ExpireBy::Pager:
667         ++stats.expired_pager;
668         break;
669     case ExpireBy::Compactor:
670         ++stats.expired_compactor;
671         break;
672     case ExpireBy::Access:
673         ++stats.expired_access;
674         break;
675     }
676     ++numExpiredItems;
677 }
678
679 MutationStatus VBucket::setFromInternal(Item& itm) {
680     return ht.set(itm);
681 }
682
683 ENGINE_ERROR_CODE VBucket::set(Item& itm,
684                                const void* cookie,
685                                EventuallyPersistentEngine& engine,
686                                const int bgFetchDelay) {
687     bool cas_op = (itm.getCas() != 0);
688     auto hbl = ht.getLockedBucket(itm.getKey());
689     StoredValue* v = ht.unlocked_find(itm.getKey(),
690                                       hbl.getBucketNum(),
691                                       WantsDeleted::Yes,
692                                       TrackReference::No);
693     if (v && v->isLocked(ep_current_time()) &&
694         (getState() == vbucket_state_replica ||
695          getState() == vbucket_state_pending)) {
696         v->unlock();
697     }
698
699     bool maybeKeyExists = true;
700     // If we didn't find a valid item, check Bloomfilter's prediction if in
701     // full eviction policy and for a CAS operation.
702     if ((v == nullptr || v->isTempInitialItem()) &&
703         (eviction == FULL_EVICTION) && (itm.getCas() != 0)) {
704         // Check Bloomfilter's prediction
705         if (!maybeKeyExistsInFilter(itm.getKey())) {
706             maybeKeyExists = false;
707         }
708     }
709
710     PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
711     VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
712                                GenerateCas::Yes,
713                                TrackCasDrift::No,
714                                /*isBackfillItem*/ false,
715                                &preLinkDocumentContext);
716
717     MutationStatus status;
718     VBNotifyCtx notifyCtx;
719     std::tie(status, notifyCtx) = processSet(hbl,
720                                              v,
721                                              itm,
722                                              itm.getCas(),
723                                              /*allowExisting*/ true,
724                                              /*hashMetaData*/ false,
725                                              &queueItmCtx,
726                                              maybeKeyExists);
727
728     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
729     switch (status) {
730     case MutationStatus::NoMem:
731         ret = ENGINE_ENOMEM;
732         break;
733     case MutationStatus::InvalidCas:
734         ret = ENGINE_KEY_EEXISTS;
735         break;
736     case MutationStatus::IsLocked:
737         ret = ENGINE_LOCKED;
738         break;
739     case MutationStatus::NotFound:
740         if (cas_op) {
741             ret = ENGINE_KEY_ENOENT;
742             break;
743         }
744     // FALLTHROUGH
745     case MutationStatus::WasDirty:
746     // Even if the item was dirty, push it into the vbucket's open
747     // checkpoint.
748     case MutationStatus::WasClean:
749         notifyNewSeqno(notifyCtx);
750
751         itm.setBySeqno(v->getBySeqno());
752         itm.setCas(v->getCas());
753         break;
754     case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
755         // +
756         // full eviction.
757         if (v) {
758             // temp item is already created. Simply schedule a bg fetch job
759             hbl.getHTLock().unlock();
760             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
761             return ENGINE_EWOULDBLOCK;
762         }
763         ret = addTempItemAndBGFetch(
764                 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
765         break;
766     }
767     }
768
769     return ret;
770 }
771
772 ENGINE_ERROR_CODE VBucket::replace(Item& itm,
773                                    const void* cookie,
774                                    EventuallyPersistentEngine& engine,
775                                    const int bgFetchDelay) {
776     auto hbl = ht.getLockedBucket(itm.getKey());
777     StoredValue* v = ht.unlocked_find(itm.getKey(),
778                                       hbl.getBucketNum(),
779                                       WantsDeleted::Yes,
780                                       TrackReference::No);
781     if (v) {
782         if (v->isDeleted() || v->isTempDeletedItem() ||
783             v->isTempNonExistentItem()) {
784             return ENGINE_KEY_ENOENT;
785         }
786
787         MutationStatus mtype;
788         VBNotifyCtx notifyCtx;
789         if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
790             mtype = MutationStatus::NeedBgFetch;
791         } else {
792             PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
793             VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
794                                        GenerateCas::Yes,
795                                        TrackCasDrift::No,
796                                        /*isBackfillItem*/ false,
797                                        &preLinkDocumentContext);
798             std::tie(mtype, notifyCtx) = processSet(hbl,
799                                                     v,
800                                                     itm,
801                                                     0,
802                                                     /*allowExisting*/ true,
803                                                     /*hasMetaData*/ false,
804                                                     &queueItmCtx);
805         }
806
807         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
808         switch (mtype) {
809         case MutationStatus::NoMem:
810             ret = ENGINE_ENOMEM;
811             break;
812         case MutationStatus::IsLocked:
813             ret = ENGINE_LOCKED;
814             break;
815         case MutationStatus::InvalidCas:
816         case MutationStatus::NotFound:
817             ret = ENGINE_NOT_STORED;
818             break;
819         // FALLTHROUGH
820         case MutationStatus::WasDirty:
821         // Even if the item was dirty, push it into the vbucket's open
822         // checkpoint.
823         case MutationStatus::WasClean:
824             notifyNewSeqno(notifyCtx);
825
826             itm.setBySeqno(v->getBySeqno());
827             itm.setCas(v->getCas());
828             break;
829         case MutationStatus::NeedBgFetch: {
830             // temp item is already created. Simply schedule a bg fetch job
831             hbl.getHTLock().unlock();
832             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
833             ret = ENGINE_EWOULDBLOCK;
834             break;
835         }
836         }
837
838         return ret;
839     } else {
840         if (eviction == VALUE_ONLY) {
841             return ENGINE_KEY_ENOENT;
842         }
843
844         if (maybeKeyExistsInFilter(itm.getKey())) {
845             return addTempItemAndBGFetch(
846                     hbl, itm.getKey(), cookie, engine, bgFetchDelay, false);
847         } else {
848             // As bloomfilter predicted that item surely doesn't exist
849             // on disk, return ENOENT for replace().
850             return ENGINE_KEY_ENOENT;
851         }
852     }
853 }
854
855 ENGINE_ERROR_CODE VBucket::addBackfillItem(Item& itm,
856                                            const GenerateBySeqno genBySeqno) {
857     auto hbl = ht.getLockedBucket(itm.getKey());
858     StoredValue* v = ht.unlocked_find(itm.getKey(),
859                                       hbl.getBucketNum(),
860                                       WantsDeleted::Yes,
861                                       TrackReference::No);
862
863     // Note that this function is only called on replica or pending vbuckets.
864     if (v && v->isLocked(ep_current_time())) {
865         v->unlock();
866     }
867
868     VBQueueItemCtx queueItmCtx(genBySeqno,
869                                GenerateCas::No,
870                                TrackCasDrift::No,
871                                /*isBackfillItem*/ true,
872                                nullptr /* No pre link should happen */);
873     MutationStatus status;
874     VBNotifyCtx notifyCtx;
875     std::tie(status, notifyCtx) = processSet(hbl,
876                                              v,
877                                              itm,
878                                              0,
879                                              /*allowExisting*/ true,
880                                              /*hasMetaData*/ true,
881                                              &queueItmCtx);
882
883     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
884     switch (status) {
885     case MutationStatus::NoMem:
886         ret = ENGINE_ENOMEM;
887         break;
888     case MutationStatus::InvalidCas:
889     case MutationStatus::IsLocked:
890         ret = ENGINE_KEY_EEXISTS;
891         break;
892     case MutationStatus::WasDirty:
893     // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
894     // set correctly, and also the sequence numbers are ordered correctly.
895     // (MB-14003)
896     case MutationStatus::NotFound:
897     // FALLTHROUGH
898     case MutationStatus::WasClean: {
899         setMaxCas(v->getCas());
900         // we unlock ht lock here because we want to avoid potential lock
901         // inversions arising from notifyNewSeqno() call
902         hbl.getHTLock().unlock();
903         notifyNewSeqno(notifyCtx);
904     } break;
905     case MutationStatus::NeedBgFetch:
906         throw std::logic_error(
907                 "VBucket::addBackfillItem: "
908                 "SET on a non-active vbucket should not require a "
909                 "bg_metadata_fetch.");
910     }
911
912     return ret;
913 }
914
915 ENGINE_ERROR_CODE VBucket::setWithMeta(Item& itm,
916                                        const uint64_t cas,
917                                        uint64_t* seqno,
918                                        const void* cookie,
919                                        EventuallyPersistentEngine& engine,
920                                        const int bgFetchDelay,
921                                        const bool force,
922                                        const bool allowExisting,
923                                        const GenerateBySeqno genBySeqno,
924                                        const GenerateCas genCas,
925                                        const bool isReplication) {
926     auto hbl = ht.getLockedBucket(itm.getKey());
927     StoredValue* v = ht.unlocked_find(itm.getKey(),
928                                       hbl.getBucketNum(),
929                                       WantsDeleted::Yes,
930                                       TrackReference::No);
931
932     bool maybeKeyExists = true;
933     if (!force) {
934         if (v) {
935             if (v->isTempInitialItem()) {
936                 bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
937                 return ENGINE_EWOULDBLOCK;
938             }
939
940             if (!(conflictResolver->resolve(*v,
941                                             itm.getMetaData(),
942                                             itm.getDataType(),
943                                             itm.isDeleted()))) {
944                 ++stats.numOpsSetMetaResolutionFailed;
945                 return ENGINE_KEY_EEXISTS;
946             }
947         } else {
948             if (maybeKeyExistsInFilter(itm.getKey())) {
949                 return addTempItemAndBGFetch(hbl,
950                                              itm.getKey(),
951                                              cookie,
952                                              engine,
953                                              bgFetchDelay,
954                                              true,
955                                              isReplication);
956             } else {
957                 maybeKeyExists = false;
958             }
959         }
960     } else {
961         if (eviction == FULL_EVICTION) {
962             // Check Bloomfilter's prediction
963             if (!maybeKeyExistsInFilter(itm.getKey())) {
964                 maybeKeyExists = false;
965             }
966         }
967     }
968
969     if (v && v->isLocked(ep_current_time()) &&
970         (getState() == vbucket_state_replica ||
971          getState() == vbucket_state_pending)) {
972         v->unlock();
973     }
974
975     VBQueueItemCtx queueItmCtx(genBySeqno,
976                                genCas,
977                                TrackCasDrift::Yes,
978                                /*isBackfillItem*/ false,
979                                nullptr /* No pre link step needed */);
980     MutationStatus status;
981     VBNotifyCtx notifyCtx;
982     std::tie(status, notifyCtx) = processSet(hbl,
983                                              v,
984                                              itm,
985                                              cas,
986                                              allowExisting,
987                                              true,
988                                              &queueItmCtx,
989                                              maybeKeyExists,
990                                              isReplication);
991
992     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
993     switch (status) {
994     case MutationStatus::NoMem:
995         ret = ENGINE_ENOMEM;
996         break;
997     case MutationStatus::InvalidCas:
998         ret = ENGINE_KEY_EEXISTS;
999         break;
1000     case MutationStatus::IsLocked:
1001         ret = ENGINE_LOCKED;
1002         break;
1003     case MutationStatus::WasDirty:
1004     case MutationStatus::WasClean: {
1005         if (seqno) {
1006             *seqno = static_cast<uint64_t>(v->getBySeqno());
1007         }
1008         // we unlock ht lock here because we want to avoid potential lock
1009         // inversions arising from notifyNewSeqno() call
1010         hbl.getHTLock().unlock();
1011         notifyNewSeqno(notifyCtx);
1012     } break;
1013     case MutationStatus::NotFound:
1014         ret = ENGINE_KEY_ENOENT;
1015         break;
1016     case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
1017         // + full eviction.
1018         if (v) { // temp item is already created. Simply schedule a
1019             hbl.getHTLock().unlock(); // bg fetch job.
1020             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1021             return ENGINE_EWOULDBLOCK;
1022         }
1023         ret = addTempItemAndBGFetch(hbl,
1024                                     itm.getKey(),
1025                                     cookie,
1026                                     engine,
1027                                     bgFetchDelay,
1028                                     true,
1029                                     isReplication);
1030     }
1031     }
1032
1033     return ret;
1034 }
1035
1036 ENGINE_ERROR_CODE VBucket::deleteItem(const DocKey& key,
1037                                       uint64_t& cas,
1038                                       const void* cookie,
1039                                       EventuallyPersistentEngine& engine,
1040                                       const int bgFetchDelay,
1041                                       ItemMetaData* itemMeta,
1042                                       mutation_descr_t* mutInfo) {
1043     auto hbl = ht.getLockedBucket(key);
1044     StoredValue* v = ht.unlocked_find(
1045             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1046
1047     if (!v || v->isDeleted() || v->isTempItem()) {
1048         if (eviction == VALUE_ONLY) {
1049             return ENGINE_KEY_ENOENT;
1050         } else { // Full eviction.
1051             if (!v) { // Item might be evicted from cache.
1052                 if (maybeKeyExistsInFilter(key)) {
1053                     return addTempItemAndBGFetch(
1054                             hbl, key, cookie, engine, bgFetchDelay, true);
1055                 } else {
1056                     // As bloomfilter predicted that item surely doesn't
1057                     // exist on disk, return ENOENT for deleteItem().
1058                     return ENGINE_KEY_ENOENT;
1059                 }
1060             } else if (v->isTempInitialItem()) {
1061                 hbl.getHTLock().unlock();
1062                 bgFetch(key, cookie, engine, bgFetchDelay, true);
1063                 return ENGINE_EWOULDBLOCK;
1064             } else { // Non-existent or deleted key.
1065                 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1066                     // Delete a temp non-existent item to ensure that
1067                     // if a delete were issued over an item that doesn't
1068                     // exist, then we don't preserve a temp item.
1069                     deleteStoredValue(hbl, *v);
1070                 }
1071                 return ENGINE_KEY_ENOENT;
1072             }
1073         }
1074     }
1075
1076     if (v->isLocked(ep_current_time()) &&
1077         (getState() == vbucket_state_replica ||
1078          getState() == vbucket_state_pending)) {
1079         v->unlock();
1080     }
1081
1082     if (itemMeta != nullptr) {
1083         itemMeta->cas = v->getCas();
1084     }
1085
1086     MutationStatus delrv;
1087     VBNotifyCtx notifyCtx;
1088     if (v->isExpired(ep_real_time())) {
1089         std::tie(delrv, v, notifyCtx) = processExpiredItem(hbl, *v);
1090     } else {
1091         ItemMetaData metadata;
1092         metadata.revSeqno = v->getRevSeqno() + 1;
1093         std::tie(delrv, v, notifyCtx) =
1094                 processSoftDelete(hbl,
1095                                   *v,
1096                                   cas,
1097                                   metadata,
1098                                   VBQueueItemCtx(GenerateBySeqno::Yes,
1099                                                  GenerateCas::Yes,
1100                                                  TrackCasDrift::No,
1101                                                  /*isBackfillItem*/ false,
1102                                                  nullptr /* no pre link */),
1103                                   /*use_meta*/ false,
1104                                   /*bySeqno*/ v->getBySeqno());
1105     }
1106
1107     uint64_t seqno = 0;
1108     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1109     switch (delrv) {
1110     case MutationStatus::NoMem:
1111         ret = ENGINE_ENOMEM;
1112         break;
1113     case MutationStatus::InvalidCas:
1114         ret = ENGINE_KEY_EEXISTS;
1115         break;
1116     case MutationStatus::IsLocked:
1117         ret = ENGINE_LOCKED_TMPFAIL;
1118         break;
1119     case MutationStatus::NotFound:
1120         ret = ENGINE_KEY_ENOENT;
1121     /* Fallthrough:
1122      * A NotFound return value at this point indicates that the
1123      * item has expired. But, a deletion still needs to be queued
1124      * for the item in order to persist it.
1125      */
1126     case MutationStatus::WasClean:
1127     case MutationStatus::WasDirty:
1128         if (itemMeta != nullptr) {
1129             itemMeta->revSeqno = v->getRevSeqno();
1130             itemMeta->flags = v->getFlags();
1131             itemMeta->exptime = v->getExptime();
1132         }
1133
1134         notifyNewSeqno(notifyCtx);
1135         seqno = static_cast<uint64_t>(v->getBySeqno());
1136         cas = v->getCas();
1137
1138         if (delrv != MutationStatus::NotFound) {
1139             if (mutInfo) {
1140                 mutInfo->seqno = seqno;
1141                 mutInfo->vbucket_uuid = failovers->getLatestUUID();
1142             }
1143             if (itemMeta != nullptr) {
1144                 itemMeta->cas = v->getCas();
1145             }
1146         }
1147         break;
1148     case MutationStatus::NeedBgFetch:
1149         // We already figured out if a bg fetch is requred for a full-evicted
1150         // item above.
1151         throw std::logic_error(
1152                 "VBucket::deleteItem: "
1153                 "Unexpected NEEDS_BG_FETCH from processSoftDelete");
1154     }
1155     return ret;
1156 }
1157
1158 ENGINE_ERROR_CODE VBucket::deleteWithMeta(const DocKey& key,
1159                                           uint64_t& cas,
1160                                           uint64_t* seqno,
1161                                           const void* cookie,
1162                                           EventuallyPersistentEngine& engine,
1163                                           const int bgFetchDelay,
1164                                           const bool force,
1165                                           const ItemMetaData& itemMeta,
1166                                           const bool backfill,
1167                                           const GenerateBySeqno genBySeqno,
1168                                           const GenerateCas generateCas,
1169                                           const uint64_t bySeqno,
1170                                           const bool isReplication) {
1171     auto hbl = ht.getLockedBucket(key);
1172     StoredValue* v = ht.unlocked_find(
1173             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1174     if (!force) { // Need conflict resolution.
1175         if (v) {
1176             if (v->isTempInitialItem()) {
1177                 bgFetch(key, cookie, engine, bgFetchDelay, true);
1178                 return ENGINE_EWOULDBLOCK;
1179             }
1180
1181             if (!(conflictResolver->resolve(*v,
1182                                             itemMeta,
1183                                             PROTOCOL_BINARY_RAW_BYTES,
1184                                             true))) {
1185                 ++stats.numOpsDelMetaResolutionFailed;
1186                 return ENGINE_KEY_EEXISTS;
1187             }
1188         } else {
1189             // Item is 1) deleted or not existent in the value eviction case OR
1190             // 2) deleted or evicted in the full eviction.
1191             if (maybeKeyExistsInFilter(key)) {
1192                 return addTempItemAndBGFetch(hbl,
1193                                              key,
1194                                              cookie,
1195                                              engine,
1196                                              bgFetchDelay,
1197                                              true,
1198                                              isReplication);
1199             } else {
1200                 // Even though bloomfilter predicted that item doesn't exist
1201                 // on disk, we must put this delete on disk if the cas is valid.
1202                 AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1203                 if (rv == AddStatus::NoMem) {
1204                     return ENGINE_ENOMEM;
1205                 }
1206                 v = ht.unlocked_find(key,
1207                                      hbl.getBucketNum(),
1208                                      WantsDeleted::Yes,
1209                                      TrackReference::No);
1210                 v->setDeleted();
1211             }
1212         }
1213     } else {
1214         if (!v) {
1215             // We should always try to persist a delete here.
1216             AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1217             if (rv == AddStatus::NoMem) {
1218                 return ENGINE_ENOMEM;
1219             }
1220             v = ht.unlocked_find(key,
1221                                  hbl.getBucketNum(),
1222                                  WantsDeleted::Yes,
1223                                  TrackReference::No);
1224             v->setDeleted();
1225             v->setCas(cas);
1226         } else if (v->isTempInitialItem()) {
1227             v->setDeleted();
1228             v->setCas(cas);
1229         }
1230     }
1231
1232     if (v && v->isLocked(ep_current_time()) &&
1233         (getState() == vbucket_state_replica ||
1234          getState() == vbucket_state_pending)) {
1235         v->unlock();
1236     }
1237
1238     MutationStatus delrv;
1239     VBNotifyCtx notifyCtx;
1240     if (!v) {
1241         if (eviction == FULL_EVICTION) {
1242             delrv = MutationStatus::NeedBgFetch;
1243         } else {
1244             delrv = MutationStatus::NotFound;
1245         }
1246     } else {
1247         VBQueueItemCtx queueItmCtx(genBySeqno,
1248                                    generateCas,
1249                                    TrackCasDrift::Yes,
1250                                    backfill,
1251                                    nullptr /* No pre link step needed */);
1252
1253         // system xattrs must remain
1254         std::unique_ptr<Item> itm;
1255         if (mcbp::datatype::is_xattr(v->getDatatype()) &&
1256             (itm = pruneXattrDocument(*v, itemMeta))) {
1257             std::tie(v, delrv, notifyCtx) =
1258                     updateStoredValue(hbl, *v, *itm, &queueItmCtx);
1259         } else {
1260             std::tie(delrv, v, notifyCtx) = processSoftDelete(hbl,
1261                                                               *v,
1262                                                               cas,
1263                                                               itemMeta,
1264                                                               queueItmCtx,
1265                                                               /*use_meta*/ true,
1266                                                               bySeqno);
1267         }
1268     }
1269     cas = v ? v->getCas() : 0;
1270
1271     switch (delrv) {
1272     case MutationStatus::NoMem:
1273         return ENGINE_ENOMEM;
1274     case MutationStatus::InvalidCas:
1275         return ENGINE_KEY_EEXISTS;
1276     case MutationStatus::IsLocked:
1277         return ENGINE_LOCKED_TMPFAIL;
1278     case MutationStatus::NotFound:
1279         return ENGINE_KEY_ENOENT;
1280     case MutationStatus::WasDirty:
1281     case MutationStatus::WasClean: {
1282         if (seqno) {
1283             *seqno = static_cast<uint64_t>(v->getBySeqno());
1284         }
1285         // we unlock ht lock here because we want to avoid potential lock
1286         // inversions arising from notifyNewSeqno() call
1287         hbl.getHTLock().unlock();
1288         notifyNewSeqno(notifyCtx);
1289         break;
1290     }
1291     case MutationStatus::NeedBgFetch:
1292         hbl.getHTLock().unlock();
1293         bgFetch(key, cookie, engine, bgFetchDelay, true);
1294         return ENGINE_EWOULDBLOCK;
1295     }
1296     return ENGINE_SUCCESS;
1297 }
1298
1299 void VBucket::deleteExpiredItem(const DocKey& key,
1300                                 time_t startTime,
1301                                 uint64_t revSeqno,
1302                                 ExpireBy source) {
1303     auto hbl = ht.getLockedBucket(key);
1304     StoredValue* v = ht.unlocked_find(
1305             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1306     if (v) {
1307         if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1308             // This is a temporary item whose background fetch for metadata
1309             // has completed.
1310             bool deleted = deleteStoredValue(hbl, *v);
1311             if (!deleted) {
1312                 throw std::logic_error(
1313                         "VBucket::deleteExpiredItem: "
1314                         "Failed to delete seqno:" +
1315                         std::to_string(v->getBySeqno()) + " from bucket " +
1316                         std::to_string(hbl.getBucketNum()));
1317             }
1318         } else if (v->isExpired(startTime) && !v->isDeleted()) {
1319             handlePreExpiry(*v);
1320             VBNotifyCtx notifyCtx;
1321             std::tie(std::ignore, std::ignore, notifyCtx) =
1322                     processExpiredItem(hbl, *v);
1323             // we unlock ht lock here because we want to avoid potential lock
1324             // inversions arising from notifyNewSeqno() call
1325             hbl.getHTLock().unlock();
1326             notifyNewSeqno(notifyCtx);
1327         }
1328     } else {
1329         if (eviction == FULL_EVICTION) {
1330             // Create a temp item and delete and push it
1331             // into the checkpoint queue, only if the bloomfilter
1332             // predicts that the item may exist on disk.
1333             if (maybeKeyExistsInFilter(key)) {
1334                 AddStatus rv = addTempStoredValue(hbl, key);
1335                 if (rv == AddStatus::NoMem) {
1336                     return;
1337                 }
1338                 v = ht.unlocked_find(key,
1339                                      hbl.getBucketNum(),
1340                                      WantsDeleted::Yes,
1341                                      TrackReference::No);
1342                 v->setDeleted();
1343                 v->setRevSeqno(revSeqno);
1344                 VBNotifyCtx notifyCtx;
1345                 std::tie(std::ignore, std::ignore, notifyCtx) =
1346                         processExpiredItem(hbl, *v);
1347                 // we unlock ht lock here because we want to avoid potential
1348                 // lock inversions arising from notifyNewSeqno() call
1349                 hbl.getHTLock().unlock();
1350                 notifyNewSeqno(notifyCtx);
1351             }
1352         }
1353     }
1354     incExpirationStat(source);
1355 }
1356
1357 ENGINE_ERROR_CODE VBucket::add(Item& itm,
1358                                const void* cookie,
1359                                EventuallyPersistentEngine& engine,
1360                                const int bgFetchDelay) {
1361     auto hbl = ht.getLockedBucket(itm.getKey());
1362     StoredValue* v = ht.unlocked_find(itm.getKey(),
1363                                       hbl.getBucketNum(),
1364                                       WantsDeleted::Yes,
1365                                       TrackReference::No);
1366
1367     bool maybeKeyExists = true;
1368     if ((v == nullptr || v->isTempInitialItem()) &&
1369         (eviction == FULL_EVICTION)) {
1370         // Check bloomfilter's prediction
1371         if (!maybeKeyExistsInFilter(itm.getKey())) {
1372             maybeKeyExists = false;
1373         }
1374     }
1375
1376     PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
1377     VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
1378                                GenerateCas::Yes,
1379                                TrackCasDrift::No,
1380                                /*isBackfillItem*/ false,
1381                                &preLinkDocumentContext);
1382     AddStatus status;
1383     VBNotifyCtx notifyCtx;
1384     std::tie(status, notifyCtx) =
1385             processAdd(hbl, v, itm, maybeKeyExists, false, &queueItmCtx);
1386
1387     switch (status) {
1388     case AddStatus::NoMem:
1389         return ENGINE_ENOMEM;
1390     case AddStatus::Exists:
1391         return ENGINE_NOT_STORED;
1392     case AddStatus::AddTmpAndBgFetch:
1393         return addTempItemAndBGFetch(
1394                 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
1395     case AddStatus::BgFetch:
1396         hbl.getHTLock().unlock();
1397         bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1398         return ENGINE_EWOULDBLOCK;
1399     case AddStatus::Success:
1400     case AddStatus::UnDel:
1401         notifyNewSeqno(notifyCtx);
1402         itm.setBySeqno(v->getBySeqno());
1403         itm.setCas(v->getCas());
1404         break;
1405     }
1406     return ENGINE_SUCCESS;
1407 }
1408
1409 GetValue VBucket::getAndUpdateTtl(const DocKey& key,
1410                                   const void* cookie,
1411                                   EventuallyPersistentEngine& engine,
1412                                   int bgFetchDelay,
1413                                   time_t exptime) {
1414     auto hbl = ht.getLockedBucket(key);
1415     StoredValue* v = fetchValidValue(hbl,
1416                                      key,
1417                                      WantsDeleted::Yes,
1418                                      TrackReference::Yes,
1419                                      QueueExpired::Yes);
1420
1421     if (v) {
1422         if (v->isDeleted() || v->isTempDeletedItem() ||
1423             v->isTempNonExistentItem()) {
1424             return {};
1425         }
1426
1427         if (!v->isResident()) {
1428             bgFetch(key, cookie, engine, bgFetchDelay);
1429             return GetValue(nullptr, ENGINE_EWOULDBLOCK, v->getBySeqno());
1430         }
1431         if (v->isLocked(ep_current_time())) {
1432             return GetValue(nullptr, ENGINE_KEY_EEXISTS, 0);
1433         }
1434
1435         const bool exptime_mutated = exptime != v->getExptime();
1436         if (exptime_mutated) {
1437             v->markDirty();
1438             v->setExptime(exptime);
1439             v->setRevSeqno(v->getRevSeqno() + 1);
1440         }
1441
1442         GetValue rv(
1443                 v->toItem(v->isLocked(ep_current_time()), getId()).release(),
1444                 ENGINE_SUCCESS,
1445                 v->getBySeqno());
1446
1447         if (exptime_mutated) {
1448             VBNotifyCtx notifyCtx = queueDirty(*v);
1449             rv.getValue()->setCas(v->getCas());
1450             // we unlock ht lock here because we want to avoid potential lock
1451             // inversions arising from notifyNewSeqno() call
1452             hbl.getHTLock().unlock();
1453             notifyNewSeqno(notifyCtx);
1454         }
1455
1456         return rv;
1457     } else {
1458         if (eviction == VALUE_ONLY) {
1459             return {};
1460         } else {
1461             if (maybeKeyExistsInFilter(key)) {
1462                 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1463                         hbl, key, cookie, engine, bgFetchDelay, false);
1464                 return GetValue(NULL, ec, -1, true);
1465             } else {
1466                 // As bloomfilter predicted that item surely doesn't exist
1467                 // on disk, return ENOENT for getAndUpdateTtl().
1468                 return {};
1469             }
1470         }
1471     }
1472 }
1473
1474 MutationStatus VBucket::insertFromWarmup(Item& itm,
1475                                          bool eject,
1476                                          bool keyMetaDataOnly) {
1477     if (!StoredValue::hasAvailableSpace(stats, itm)) {
1478         return MutationStatus::NoMem;
1479     }
1480
1481     auto hbl = ht.getLockedBucket(itm.getKey());
1482     StoredValue* v = ht.unlocked_find(itm.getKey(),
1483                                       hbl.getBucketNum(),
1484                                       WantsDeleted::Yes,
1485                                       TrackReference::No);
1486
1487     if (v == NULL) {
1488         v = addNewStoredValue(hbl, itm, /*queueItmCtx*/ nullptr).first;
1489         if (keyMetaDataOnly) {
1490             v->markNotResident();
1491             /* For now ht stats are updated from outside ht. This seems to be
1492                a better option for now than passing a flag to
1493                addNewStoredValue() just for this func */
1494             ++(ht.numNonResidentItems);
1495         }
1496         /* For now ht stats are updated from outside ht. This seems to be
1497            a better option for now than passing a flag to
1498            addNewStoredValue() just for this func.
1499            We need to decrNumTotalItems because ht.numTotalItems is already
1500            set by warmup when it estimated the item count from disk */
1501         ht.decrNumTotalItems();
1502         v->setNewCacheItem(false);
1503     } else {
1504         if (keyMetaDataOnly) {
1505             // We don't have a better error code ;)
1506             return MutationStatus::InvalidCas;
1507         }
1508
1509         // Verify that the CAS isn't changed
1510         if (v->getCas() != itm.getCas()) {
1511             if (v->getCas() == 0) {
1512                 v->setCas(itm.getCas());
1513                 v->setFlags(itm.getFlags());
1514                 v->setExptime(itm.getExptime());
1515                 v->setRevSeqno(itm.getRevSeqno());
1516             } else {
1517                 return MutationStatus::InvalidCas;
1518             }
1519         }
1520         updateStoredValue(hbl, *v, itm, /*queueItmCtx*/ nullptr);
1521     }
1522
1523     v->markClean();
1524
1525     if (eject && !keyMetaDataOnly) {
1526         ht.unlocked_ejectItem(v, eviction);
1527     }
1528
1529     return MutationStatus::NotFound;
1530 }
1531
1532 GetValue VBucket::getInternal(const DocKey& key,
1533                               const void* cookie,
1534                               EventuallyPersistentEngine& engine,
1535                               int bgFetchDelay,
1536                               get_options_t options,
1537                               bool diskFlushAll) {
1538     const TrackReference trackReference = (options & TRACK_REFERENCE)
1539                                                   ? TrackReference::Yes
1540                                                   : TrackReference::No;
1541     const bool getDeletedValue = (options & GET_DELETED_VALUE);
1542     auto hbl = ht.getLockedBucket(key);
1543     StoredValue* v = fetchValidValue(
1544             hbl, key, WantsDeleted::Yes, trackReference, QueueExpired::Yes);
1545     if (v) {
1546         if (v->isDeleted() && !getDeletedValue) {
1547             return GetValue();
1548         }
1549         if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1550             // Delete a temp non-existent item to ensure that
1551             // if the get were issued over an item that doesn't
1552             // exist, then we dont preserve a temp item.
1553             if (options & DELETE_TEMP) {
1554                 deleteStoredValue(hbl, *v);
1555             }
1556             return GetValue();
1557         }
1558
1559         // If the value is not resident, wait for it...
1560         if (!v->isResident()) {
1561             return getInternalNonResident(
1562                     key, cookie, engine, bgFetchDelay, options, *v);
1563         }
1564
1565         // Should we hide (return -1) for the items' CAS?
1566         const bool hide_cas =
1567                 (options & HIDE_LOCKED_CAS) && v->isLocked(ep_current_time());
1568         return GetValue(v->toItem(hide_cas, getId()).release(),
1569                         ENGINE_SUCCESS,
1570                         v->getBySeqno(),
1571                         false,
1572                         v->getNRUValue());
1573     } else {
1574         if (!getDeletedValue && (eviction == VALUE_ONLY || diskFlushAll)) {
1575             return GetValue();
1576         }
1577
1578         if (maybeKeyExistsInFilter(key)) {
1579             ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1580             if (options &
1581                 QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
1582                 ec = addTempItemAndBGFetch(
1583                         hbl, key, cookie, engine, bgFetchDelay, false);
1584             }
1585             return GetValue(NULL, ec, -1, true);
1586         } else {
1587             // As bloomfilter predicted that item surely doesn't exist
1588             // on disk, return ENOENT, for getInternal().
1589             return GetValue();
1590         }
1591     }
1592 }
1593
1594 ENGINE_ERROR_CODE VBucket::getMetaData(const DocKey& key,
1595                                        const void* cookie,
1596                                        EventuallyPersistentEngine& engine,
1597                                        int bgFetchDelay,
1598                                        bool fetchDatatype,
1599                                        ItemMetaData& metadata,
1600                                        uint32_t& deleted,
1601                                        uint8_t& datatype) {
1602     deleted = 0;
1603     auto hbl = ht.getLockedBucket(key);
1604     StoredValue* v = ht.unlocked_find(
1605             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1606
1607     if (v) {
1608         stats.numOpsGetMeta++;
1609         if (v->isTempInitialItem() || (fetchDatatype && !v->isResident())) {
1610             // Need bg meta fetch.
1611             bgFetch(key, cookie, engine, bgFetchDelay, !fetchDatatype);
1612             return ENGINE_EWOULDBLOCK;
1613         } else if (v->isTempNonExistentItem()) {
1614             metadata.cas = v->getCas();
1615             return ENGINE_KEY_ENOENT;
1616         } else {
1617             if (v->isTempDeletedItem() || v->isDeleted() ||
1618                 v->isExpired(ep_real_time())) {
1619                 deleted |= GET_META_ITEM_DELETED_FLAG;
1620             }
1621
1622             if (v->isLocked(ep_current_time())) {
1623                 metadata.cas = static_cast<uint64_t>(-1);
1624             } else {
1625                 metadata.cas = v->getCas();
1626             }
1627             metadata.flags = v->getFlags();
1628             metadata.exptime = v->getExptime();
1629             metadata.revSeqno = v->getRevSeqno();
1630
1631             if (fetchDatatype) {
1632                 value_t value = v->getValue();
1633                 if (value) {
1634                     datatype = value->getDataType();
1635                 }
1636             }
1637
1638             return ENGINE_SUCCESS;
1639         }
1640     } else {
1641         // The key wasn't found. However, this may be because it was previously
1642         // deleted or evicted with the full eviction strategy.
1643         // So, add a temporary item corresponding to the key to the hash table
1644         // and schedule a background fetch for its metadata from the persistent
1645         // store. The item's state will be updated after the fetch completes.
1646         //
1647         // Schedule this bgFetch only if the key is predicted to be may-be
1648         // existent on disk by the bloomfilter.
1649
1650         if (maybeKeyExistsInFilter(key)) {
1651             return addTempItemAndBGFetch(
1652                     hbl, key, cookie, engine, bgFetchDelay, !fetchDatatype);
1653         } else {
1654             stats.numOpsGetMeta++;
1655             return ENGINE_KEY_ENOENT;
1656         }
1657     }
1658 }
1659
1660 ENGINE_ERROR_CODE VBucket::getKeyStats(const DocKey& key,
1661                                        const void* cookie,
1662                                        EventuallyPersistentEngine& engine,
1663                                        int bgFetchDelay,
1664                                        struct key_stats& kstats,
1665                                        WantsDeleted wantsDeleted) {
1666     auto hbl = ht.getLockedBucket(key);
1667     StoredValue* v = fetchValidValue(hbl,
1668                                      key,
1669                                      WantsDeleted::Yes,
1670                                      TrackReference::Yes,
1671                                      QueueExpired::Yes);
1672
1673     if (v) {
1674         if ((v->isDeleted() && wantsDeleted == WantsDeleted::No) ||
1675             v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1676             return ENGINE_KEY_ENOENT;
1677         }
1678         if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
1679             hbl.getHTLock().unlock();
1680             bgFetch(key, cookie, engine, bgFetchDelay, true);
1681             return ENGINE_EWOULDBLOCK;
1682         }
1683         kstats.logically_deleted = v->isDeleted();
1684         kstats.dirty = v->isDirty();
1685         kstats.exptime = v->getExptime();
1686         kstats.flags = v->getFlags();
1687         kstats.cas = v->getCas();
1688         kstats.vb_state = getState();
1689         return ENGINE_SUCCESS;
1690     } else {
1691         if (eviction == VALUE_ONLY) {
1692             return ENGINE_KEY_ENOENT;
1693         } else {
1694             if (maybeKeyExistsInFilter(key)) {
1695                 return addTempItemAndBGFetch(
1696                         hbl, key, cookie, engine, bgFetchDelay, true);
1697             } else {
1698                 // If bgFetch were false, or bloomfilter predicted that
1699                 // item surely doesn't exist on disk, return ENOENT for
1700                 // getKeyStats().
1701                 return ENGINE_KEY_ENOENT;
1702             }
1703         }
1704     }
1705 }
1706
1707 GetValue VBucket::getLocked(const DocKey& key,
1708                             rel_time_t currentTime,
1709                             uint32_t lockTimeout,
1710                             const void* cookie,
1711                             EventuallyPersistentEngine& engine,
1712                             int bgFetchDelay) {
1713     auto hbl = ht.getLockedBucket(key);
1714     StoredValue* v = fetchValidValue(hbl,
1715                                      key,
1716                                      WantsDeleted::Yes,
1717                                      TrackReference::Yes,
1718                                      QueueExpired::Yes);
1719
1720     if (v) {
1721         if (v->isDeleted() || v->isTempNonExistentItem() ||
1722             v->isTempDeletedItem()) {
1723             return GetValue(NULL, ENGINE_KEY_ENOENT);
1724         }
1725
1726         // if v is locked return error
1727         if (v->isLocked(currentTime)) {
1728             return GetValue(NULL, ENGINE_TMPFAIL);
1729         }
1730
1731         // If the value is not resident, wait for it...
1732         if (!v->isResident()) {
1733             if (cookie) {
1734                 bgFetch(key, cookie, engine, bgFetchDelay);
1735             }
1736             return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
1737         }
1738
1739         // acquire lock and increment cas value
1740         v->lock(currentTime + lockTimeout);
1741
1742         auto it = v->toItem(false, getId());
1743         it->setCas(nextHLCCas());
1744         v->setCas(it->getCas());
1745
1746         return GetValue(it.release());
1747
1748     } else {
1749         // No value found in the hashtable.
1750         switch (eviction) {
1751         case VALUE_ONLY:
1752             return GetValue(NULL, ENGINE_KEY_ENOENT);
1753
1754         case FULL_EVICTION:
1755             if (maybeKeyExistsInFilter(key)) {
1756                 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1757                         hbl, key, cookie, engine, bgFetchDelay, false);
1758                 return GetValue(NULL, ec, -1, true);
1759             } else {
1760                 // As bloomfilter predicted that item surely doesn't exist
1761                 // on disk, return ENOENT for getLocked().
1762                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1763             }
1764         }
1765         return GetValue(); // just to prevent compiler warning
1766     }
1767 }
1768
1769 void VBucket::deletedOnDiskCbk(const Item& queuedItem, bool deleted) {
1770     auto hbl = ht.getLockedBucket(queuedItem.getKey());
1771     StoredValue* v = fetchValidValue(hbl,
1772                                      queuedItem.getKey(),
1773                                      WantsDeleted::Yes,
1774                                      TrackReference::No,
1775                                      QueueExpired::Yes);
1776     // Delete the item in the hash table iff:
1777     //  1. Item is existent in hashtable, and deleted flag is true
1778     //  2. rev seqno of queued item matches rev seqno of hash table item
1779     if (v && v->isDeleted() && (queuedItem.getRevSeqno() == v->getRevSeqno())) {
1780         bool isDeleted = deleteStoredValue(hbl, *v);
1781         if (!isDeleted) {
1782             throw std::logic_error(
1783                     "deletedOnDiskCbk:callback: "
1784                     "Failed to delete key with seqno:" +
1785                     std::to_string(v->getBySeqno()) + "' from bucket " +
1786                     std::to_string(hbl.getBucketNum()));
1787         }
1788
1789         /**
1790          * Deleted items are to be added to the bloomfilter,
1791          * in either eviction policy.
1792          */
1793         addToFilter(queuedItem.getKey());
1794     }
1795
1796     if (deleted) {
1797         ++stats.totalPersisted;
1798         ++opsDelete;
1799     }
1800     doStatsForFlushing(queuedItem, queuedItem.size());
1801     --stats.diskQueueSize;
1802     decrMetaDataDisk(queuedItem);
1803 }
1804
1805 bool VBucket::deleteKey(const DocKey& key) {
1806     auto hbl = ht.getLockedBucket(key);
1807     StoredValue* v = ht.unlocked_find(
1808             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1809     if (!v) {
1810         return false;
1811     }
1812     return deleteStoredValue(hbl, *v);
1813 }
1814
1815 void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
1816                                   uint64_t prevHighSeqno) {
1817     failovers->pruneEntries(rollbackResult.highSeqno);
1818     checkpointManager.clear(*this, rollbackResult.highSeqno);
1819     setPersistedSnapshot(rollbackResult.snapStartSeqno,
1820                          rollbackResult.snapEndSeqno);
1821     incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
1822     setBackfillPhase(false);
1823 }
1824
1825 void VBucket::dump() const {
1826     std::cerr << "VBucket[" << this << "] with state: " << toString(getState())
1827               << " numItems:" << getNumItems()
1828               << " numNonResident:" << getNumNonResidentItems()
1829               << " ht: " << std::endl << "  " << ht << std::endl
1830               << "]" << std::endl;
1831 }
1832
1833 void VBucket::_addStats(bool details, ADD_STAT add_stat, const void* c) {
1834     addStat(NULL, toString(state), add_stat, c);
1835     if (details) {
1836         size_t numItems = getNumItems();
1837         size_t tempItems = getNumTempItems();
1838         addStat("num_items", numItems, add_stat, c);
1839         addStat("num_temp_items", tempItems, add_stat, c);
1840         addStat("num_non_resident", getNumNonResidentItems(), add_stat, c);
1841         addStat("ht_memory", ht.memorySize(), add_stat, c);
1842         addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
1843         addStat("ht_cache_size", ht.cacheSize.load(), add_stat, c);
1844         addStat("num_ejects", ht.getNumEjects(), add_stat, c);
1845         addStat("ops_create", opsCreate.load(), add_stat, c);
1846         addStat("ops_update", opsUpdate.load(), add_stat, c);
1847         addStat("ops_delete", opsDelete.load(), add_stat, c);
1848         addStat("ops_reject", opsReject.load(), add_stat, c);
1849         addStat("queue_size", dirtyQueueSize.load(), add_stat, c);
1850         addStat("backfill_queue_size", getBackfillSize(), add_stat, c);
1851         addStat("queue_memory", dirtyQueueMem.load(), add_stat, c);
1852         addStat("queue_fill", dirtyQueueFill.load(), add_stat, c);
1853         addStat("queue_drain", dirtyQueueDrain.load(), add_stat, c);
1854         addStat("queue_age", getQueueAge(), add_stat, c);
1855         addStat("pending_writes", dirtyQueuePendingWrites.load(), add_stat, c);
1856
1857         addStat("high_seqno", getHighSeqno(), add_stat, c);
1858         addStat("uuid", failovers->getLatestUUID(), add_stat, c);
1859         addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
1860         addStat("bloom_filter", getFilterStatusString().data(),
1861                 add_stat, c);
1862         addStat("bloom_filter_size", getFilterSize(), add_stat, c);
1863         addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
1864         addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
1865         addStat("hp_vb_req_size", getHighPriorityChkSize(), add_stat, c);
1866         hlc.addStats(statPrefix, add_stat, c);
1867     }
1868 }
1869
1870 void VBucket::decrDirtyQueueMem(size_t decrementBy)
1871 {
1872     size_t oldVal, newVal;
1873     do {
1874         oldVal = dirtyQueueMem.load(std::memory_order_relaxed);
1875         if (oldVal < decrementBy) {
1876             newVal = 0;
1877         } else {
1878             newVal = oldVal - decrementBy;
1879         }
1880     } while (!dirtyQueueMem.compare_exchange_strong(oldVal, newVal));
1881 }
1882
1883 void VBucket::decrDirtyQueueAge(uint32_t decrementBy)
1884 {
1885     uint64_t oldVal, newVal;
1886     do {
1887         oldVal = dirtyQueueAge.load(std::memory_order_relaxed);
1888         if (oldVal < decrementBy) {
1889             newVal = 0;
1890         } else {
1891             newVal = oldVal - decrementBy;
1892         }
1893     } while (!dirtyQueueAge.compare_exchange_strong(oldVal, newVal));
1894 }
1895
1896 void VBucket::decrDirtyQueuePendingWrites(size_t decrementBy)
1897 {
1898     size_t oldVal, newVal;
1899     do {
1900         oldVal = dirtyQueuePendingWrites.load(std::memory_order_relaxed);
1901         if (oldVal < decrementBy) {
1902             newVal = 0;
1903         } else {
1904             newVal = oldVal - decrementBy;
1905         }
1906     } while (!dirtyQueuePendingWrites.compare_exchange_strong(oldVal, newVal));
1907 }
1908
1909 std::pair<MutationStatus, VBNotifyCtx> VBucket::processSet(
1910         const HashTable::HashBucketLock& hbl,
1911         StoredValue*& v,
1912         Item& itm,
1913         uint64_t cas,
1914         bool allowExisting,
1915         bool hasMetaData,
1916         const VBQueueItemCtx* queueItmCtx,
1917         bool maybeKeyExists,
1918         bool isReplication) {
1919     if (!hbl.getHTLock()) {
1920         throw std::invalid_argument(
1921                 "VBucket::processSet: htLock not held for "
1922                 "VBucket " +
1923                 std::to_string(getId()));
1924     }
1925
1926     if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
1927         return {MutationStatus::NoMem, VBNotifyCtx()};
1928     }
1929
1930     if (cas && eviction == FULL_EVICTION && maybeKeyExists) {
1931         if (!v || v->isTempInitialItem()) {
1932             return {MutationStatus::NeedBgFetch, VBNotifyCtx()};
1933         }
1934     }
1935
1936     /*
1937      * prior to checking for the lock, we should check if this object
1938      * has expired. If so, then check if CAS value has been provided
1939      * for this set op. In this case the operation should be denied since
1940      * a cas operation for a key that doesn't exist is not a very cool
1941      * thing to do. See MB 3252
1942      */
1943     if (v && v->isExpired(ep_real_time()) && !hasMetaData) {
1944         if (v->isLocked(ep_current_time())) {
1945             v->unlock();
1946         }
1947         if (cas) {
1948             /* item has expired and cas value provided. Deny ! */
1949             return {MutationStatus::NotFound, VBNotifyCtx()};
1950         }
1951     }
1952
1953     if (v) {
1954         if (!allowExisting && !v->isTempItem()) {
1955             return {MutationStatus::InvalidCas, VBNotifyCtx()};
1956         }
1957         if (v->isLocked(ep_current_time())) {
1958             /*
1959              * item is locked, deny if there is cas value mismatch
1960              * or no cas value is provided by the user
1961              */
1962             if (cas != v->getCas()) {
1963                 return {MutationStatus::IsLocked, VBNotifyCtx()};
1964             }
1965             /* allow operation*/
1966             v->unlock();
1967         } else if (cas && cas != v->getCas()) {
1968             if (v->isTempNonExistentItem()) {
1969                 // This is a temporary item which marks a key as non-existent;
1970                 // therefore specifying a non-matching CAS should be exposed
1971                 // as item not existing.
1972                 return {MutationStatus::NotFound, VBNotifyCtx()};
1973             }
1974             if ((v->isTempDeletedItem() || v->isDeleted()) && !itm.isDeleted()) {
1975                 // Existing item is deleted, and we are not replacing it with
1976                 // a (different) deleted value - return not existing.
1977                 return {MutationStatus::NotFound, VBNotifyCtx()};
1978             }
1979             // None of the above special cases; the existing item cannot be
1980             // modified with the specified CAS.
1981             return {MutationStatus::InvalidCas, VBNotifyCtx()};
1982         }
1983         if (!hasMetaData) {
1984             itm.setRevSeqno(v->getRevSeqno() + 1);
1985             /* MB-23530: We must ensure that a replace operation (i.e.
1986              * set with a CAS) /fails/ if the old document is deleted; it
1987              * logically "doesn't exist". However, if the new value is deleted
1988              * this op is a /delete/ with a CAS and we must permit a
1989              * deleted -> deleted transition for Deleted Bodies.
1990              */
1991             if (cas && (v->isDeleted() || v->isTempDeletedItem()) &&
1992                 !itm.isDeleted()) {
1993                 return {MutationStatus::NotFound, VBNotifyCtx()};
1994             }
1995         }
1996
1997         MutationStatus status;
1998         VBNotifyCtx notifyCtx;
1999         std::tie(v, status, notifyCtx) =
2000                 updateStoredValue(hbl, *v, itm, queueItmCtx);
2001         return {status, notifyCtx};
2002     } else if (cas != 0) {
2003         return {MutationStatus::NotFound, VBNotifyCtx()};
2004     } else {
2005         VBNotifyCtx notifyCtx;
2006         std::tie(v, notifyCtx) = addNewStoredValue(hbl, itm, queueItmCtx);
2007         if (!hasMetaData) {
2008             updateRevSeqNoOfNewStoredValue(*v);
2009             itm.setRevSeqno(v->getRevSeqno());
2010         }
2011         return {MutationStatus::WasClean, notifyCtx};
2012     }
2013 }
2014
2015 std::pair<AddStatus, VBNotifyCtx> VBucket::processAdd(
2016         const HashTable::HashBucketLock& hbl,
2017         StoredValue*& v,
2018         Item& itm,
2019         bool maybeKeyExists,
2020         bool isReplication,
2021         const VBQueueItemCtx* queueItmCtx) {
2022     if (!hbl.getHTLock()) {
2023         throw std::invalid_argument(
2024                 "VBucket::processAdd: htLock not held for "
2025                 "VBucket " +
2026                 std::to_string(getId()));
2027     }
2028
2029     if (v && !v->isDeleted() && !v->isExpired(ep_real_time()) &&
2030         !v->isTempItem()) {
2031         return {AddStatus::Exists, VBNotifyCtx()};
2032     }
2033     if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
2034         return {AddStatus::NoMem, VBNotifyCtx()};
2035     }
2036
2037     std::pair<AddStatus, VBNotifyCtx> rv = {AddStatus::Success, VBNotifyCtx()};
2038
2039     if (v) {
2040         if (v->isTempInitialItem() && eviction == FULL_EVICTION &&
2041             maybeKeyExists) {
2042             // Need to figure out if an item exists on disk
2043             return {AddStatus::BgFetch, VBNotifyCtx()};
2044         }
2045
2046         rv.first = (v->isDeleted() || v->isExpired(ep_real_time()))
2047                            ? AddStatus::UnDel
2048                            : AddStatus::Success;
2049
2050         if (v->isTempDeletedItem()) {
2051             itm.setRevSeqno(v->getRevSeqno() + 1);
2052         } else {
2053             itm.setRevSeqno(ht.getMaxDeletedRevSeqno() + 1);
2054         }
2055
2056         if (!v->isTempItem()) {
2057             itm.setRevSeqno(v->getRevSeqno() + 1);
2058         }
2059
2060         std::tie(v, std::ignore, rv.second) =
2061                 updateStoredValue(hbl, *v, itm, queueItmCtx);
2062     } else {
2063         if (itm.getBySeqno() != StoredValue::state_temp_init) {
2064             if (eviction == FULL_EVICTION && maybeKeyExists) {
2065                 return {AddStatus::AddTmpAndBgFetch, VBNotifyCtx()};
2066             }
2067         }
2068         std::tie(v, rv.second) = addNewStoredValue(hbl, itm, queueItmCtx);
2069         updateRevSeqNoOfNewStoredValue(*v);
2070         itm.setRevSeqno(v->getRevSeqno());
2071         if (v->isTempItem()) {
2072             rv.first = AddStatus::BgFetch;
2073         }
2074     }
2075
2076     if (v->isTempItem()) {
2077         v->setNRUValue(MAX_NRU_VALUE);
2078     }
2079     return rv;
2080 }
2081
2082 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2083 VBucket::processSoftDelete(const HashTable::HashBucketLock& hbl,
2084                            StoredValue& v,
2085                            uint64_t cas,
2086                            const ItemMetaData& metadata,
2087                            const VBQueueItemCtx& queueItmCtx,
2088                            bool use_meta,
2089                            uint64_t bySeqno) {
2090     if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2091         return std::make_tuple(MutationStatus::NeedBgFetch, &v, VBNotifyCtx());
2092     }
2093
2094     if (v.isLocked(ep_current_time())) {
2095         if (cas != v.getCas()) {
2096             return std::make_tuple(MutationStatus::IsLocked, &v, VBNotifyCtx());
2097         }
2098         v.unlock();
2099     }
2100
2101     if (cas != 0 && cas != v.getCas()) {
2102         return std::make_tuple(MutationStatus::InvalidCas, &v, VBNotifyCtx());
2103     }
2104
2105     /* allow operation */
2106     v.unlock();
2107
2108     MutationStatus rv =
2109             v.isDirty() ? MutationStatus::WasDirty : MutationStatus::WasClean;
2110
2111     if (use_meta) {
2112         v.setCas(metadata.cas);
2113         v.setFlags(metadata.flags);
2114         v.setExptime(metadata.exptime);
2115     }
2116
2117     v.setRevSeqno(metadata.revSeqno);
2118     VBNotifyCtx notifyCtx;
2119     StoredValue* newSv;
2120     std::tie(newSv, notifyCtx) =
2121             softDeleteStoredValue(hbl,
2122                                   v,
2123                                   /*onlyMarkDeleted*/ false,
2124                                   queueItmCtx,
2125                                   bySeqno);
2126     ht.updateMaxDeletedRevSeqno(metadata.revSeqno);
2127     return std::make_tuple(rv, newSv, notifyCtx);
2128 }
2129
2130 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2131 VBucket::processExpiredItem(const HashTable::HashBucketLock& hbl,
2132                             StoredValue& v) {
2133     if (!hbl.getHTLock()) {
2134         throw std::invalid_argument(
2135                 "VBucket::processExpiredItem: htLock not held for VBucket " +
2136                 std::to_string(getId()));
2137     }
2138
2139     if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2140         return std::make_tuple(MutationStatus::NeedBgFetch,
2141                                &v,
2142                                queueDirty(v,
2143                                           GenerateBySeqno::Yes,
2144                                           GenerateCas::Yes,
2145                                           /*isBackfillItem*/ false));
2146     }
2147
2148     /* If the datatype is XATTR, mark the item as deleted
2149      * but don't delete the value as system xattrs can
2150      * still be queried by mobile clients even after
2151      * deletion.
2152      * TODO: The current implementation is inefficient
2153      * but functionally correct and for performance reasons
2154      * only the system xattrs need to be stored.
2155      */
2156     value_t value = v.getValue();
2157     bool onlyMarkDeleted =
2158             value && mcbp::datatype::is_xattr(value->getDataType());
2159     v.setRevSeqno(v.getRevSeqno() + 1);
2160     VBNotifyCtx notifyCtx;
2161     StoredValue* newSv;
2162     std::tie(newSv, notifyCtx) =
2163             softDeleteStoredValue(hbl,
2164                                   v,
2165                                   onlyMarkDeleted,
2166                                   VBQueueItemCtx(GenerateBySeqno::Yes,
2167                                                  GenerateCas::Yes,
2168                                                  TrackCasDrift::No,
2169                                                  /*isBackfillItem*/ false,
2170                                                  nullptr /* no pre link */),
2171                                   v.getBySeqno());
2172     ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno() + 1);
2173     return std::make_tuple(MutationStatus::NotFound, newSv, notifyCtx);
2174 }
2175
2176 bool VBucket::deleteStoredValue(const HashTable::HashBucketLock& hbl,
2177                                 StoredValue& v) {
2178     if (!v.isDeleted() && v.isLocked(ep_current_time())) {
2179         return false;
2180     }
2181
2182     /* StoredValue deleted here. If any other in-memory data structures are
2183        using the StoredValue intrusively then they must have handled the delete
2184        by this point */
2185     ht.unlocked_del(hbl, v.getKey());
2186     return true;
2187 }
2188
2189 AddStatus VBucket::addTempStoredValue(const HashTable::HashBucketLock& hbl,
2190                                       const DocKey& key,
2191                                       bool isReplication) {
2192     uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_RAW_BYTES};
2193     static_assert(sizeof(ext_meta) == 1,
2194                   "VBucket::addTempStoredValue(): expected "
2195                   "EXT_META_LEN to be 1");
2196     Item itm(key,
2197              /*flags*/ 0,
2198              /*exp*/ 0,
2199              /*data*/ NULL,
2200              /*size*/ 0,
2201              ext_meta,
2202              sizeof(ext_meta),
2203              0,
2204              StoredValue::state_temp_init);
2205
2206     /* if a temp item for a possibly deleted, set it non-resident by resetting
2207        the value cuz normally a new item added is considered resident which
2208        does not apply for temp item. */
2209     StoredValue* v = nullptr;
2210     return processAdd(hbl, v, itm, true, isReplication, nullptr).first;
2211 }
2212
2213 void VBucket::notifyNewSeqno(const VBNotifyCtx& notifyCtx) {
2214     if (newSeqnoCb) {
2215         newSeqnoCb->callback(getId(), notifyCtx);
2216     }
2217 }
2218
2219 /*
2220  * Queue the item to the checkpoint and return the seqno the item was
2221  * allocated.
2222  */
2223 int64_t VBucket::queueItem(Item* item, OptionalSeqno seqno) {
2224     item->setVBucketId(id);
2225     queued_item qi(item);
2226     checkpointManager.queueDirty(
2227             *this,
2228             qi,
2229             seqno ? GenerateBySeqno::No : GenerateBySeqno::Yes,
2230             GenerateCas::Yes,
2231             nullptr /* No pre link step as this is for system events */);
2232     VBNotifyCtx notifyCtx;
2233     // If the seqno is initialized, skip replication notification
2234     notifyCtx.notifyReplication = !seqno.is_initialized();
2235     notifyCtx.notifyFlusher = true;
2236     notifyCtx.bySeqno = qi->getBySeqno();
2237     notifyNewSeqno(notifyCtx);
2238     return qi->getBySeqno();
2239 }
2240
2241 VBNotifyCtx VBucket::queueDirty(StoredValue& v,
2242                                 const VBQueueItemCtx& queueItmCtx) {
2243     if (queueItmCtx.trackCasDrift == TrackCasDrift::Yes) {
2244         setMaxCasAndTrackDrift(v.getCas());
2245     }
2246     return queueDirty(v,
2247                       queueItmCtx.genBySeqno,
2248                       queueItmCtx.genCas,
2249                       queueItmCtx.isBackfillItem,
2250                       queueItmCtx.preLinkDocumentContext);
2251 }
2252
2253 void VBucket::updateRevSeqNoOfNewStoredValue(StoredValue& v) {
2254     /**
2255      * Possibly, this item is being recreated. Conservatively assign it
2256      * a seqno that is greater than the greatest seqno of all deleted
2257      * items seen so far.
2258      */
2259     uint64_t seqno = ht.getMaxDeletedRevSeqno();
2260     if (!v.isTempItem()) {
2261         ++seqno;
2262     }
2263     v.setRevSeqno(seqno);
2264 }
2265
2266 void VBucket::addHighPriorityVBEntry(uint64_t seqnoOrChkId,
2267                                      const void* cookie,
2268                                      HighPriorityVBNotify reqType) {
2269     std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2270     hpVBReqs.push_back(HighPriorityVBEntry(cookie, seqnoOrChkId, reqType));
2271     numHpVBReqs.store(hpVBReqs.size());
2272
2273     LOG(EXTENSION_LOG_NOTICE,
2274         "Added high priority async request %s "
2275         "for vb:%" PRIu16 ", Check for:%" PRIu64 ", "
2276         "Persisted upto:%" PRIu64 ", cookie:%p",
2277         to_string(reqType).c_str(),
2278         getId(),
2279         seqnoOrChkId,
2280         getPersistenceSeqno(),
2281         cookie);
2282 }
2283
2284 std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifications(
2285         EventuallyPersistentEngine& engine,
2286         uint64_t idNum,
2287         HighPriorityVBNotify notifyType) {
2288     std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2289     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2290
2291     auto entry = hpVBReqs.begin();
2292
2293     while (entry != hpVBReqs.end()) {
2294         if (notifyType != entry->reqType) {
2295             ++entry;
2296             continue;
2297         }
2298
2299         std::string logStr(to_string(notifyType));
2300
2301         hrtime_t wall_time(gethrtime() - entry->start);
2302         size_t spent = wall_time / 1000000000;
2303         if (entry->id <= idNum) {
2304             toNotify[entry->cookie] = ENGINE_SUCCESS;
2305             stats.chkPersistenceHisto.add(wall_time / 1000);
2306             adjustCheckpointFlushTimeout(wall_time / 1000000000);
2307             LOG(EXTENSION_LOG_NOTICE,
2308                 "Notified the completion of %s "
2309                 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2310                 ", "
2311                 "Persisted upto: %" PRIu64 ", cookie %p",
2312                 logStr.c_str(),
2313                 getId(),
2314                 entry->id,
2315                 idNum,
2316                 entry->cookie);
2317             entry = hpVBReqs.erase(entry);
2318         } else if (spent > getCheckpointFlushTimeout()) {
2319             adjustCheckpointFlushTimeout(spent);
2320             engine.storeEngineSpecific(entry->cookie, NULL);
2321             toNotify[entry->cookie] = ENGINE_TMPFAIL;
2322             LOG(EXTENSION_LOG_WARNING,
2323                 "Notified the timeout on %s "
2324                 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2325                 ", "
2326                 "Persisted upto: %" PRIu64 ", cookie %p",
2327                 logStr.c_str(),
2328                 getId(),
2329                 entry->id,
2330                 idNum,
2331                 entry->cookie);
2332             entry = hpVBReqs.erase(entry);
2333         } else {
2334             ++entry;
2335         }
2336     }
2337     numHpVBReqs.store(hpVBReqs.size());
2338     return toNotify;
2339 }
2340
2341 std::map<const void*, ENGINE_ERROR_CODE> VBucket::tmpFailAndGetAllHpNotifies(
2342         EventuallyPersistentEngine& engine) {
2343     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2344
2345     LockHolder lh(hpVBReqsMutex);
2346
2347     for (auto& entry : hpVBReqs) {
2348         toNotify[entry.cookie] = ENGINE_TMPFAIL;
2349         engine.storeEngineSpecific(entry.cookie, NULL);
2350     }
2351     hpVBReqs.clear();
2352
2353     return toNotify;
2354 }
2355
2356 void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
2357     size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
2358
2359     if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
2360         chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
2361     } else if (wall_time <= middle) {
2362         chkFlushTimeout = middle;
2363     } else {
2364         chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
2365     }
2366 }
2367
2368 size_t VBucket::getCheckpointFlushTimeout() {
2369     return chkFlushTimeout;
2370 }
2371
2372 std::unique_ptr<Item> VBucket::pruneXattrDocument(
2373         StoredValue& v, const ItemMetaData& itemMeta) {
2374     // Need to take a copy of the value, prune it, and add it back
2375
2376     // Create work-space document
2377     std::vector<uint8_t> workspace(v.getValue()->vlength());
2378     std::copy_n(v.getValue()->getData(),
2379                 v.getValue()->vlength(),
2380                 workspace.begin());
2381
2382     // Now attach to the XATTRs in the document
2383     auto sz = cb::xattr::get_body_offset(
2384             {reinterpret_cast<char*>(workspace.data()), workspace.size()});
2385
2386     cb::xattr::Blob xattr({workspace.data(), sz});
2387     xattr.prune_user_keys();
2388
2389     auto prunedXattrs = xattr.finalize();
2390
2391     if (prunedXattrs.size()) {
2392         // Something remains - Create a Blob and copy-in just the XATTRs
2393         auto newValue =
2394                 Blob::New(reinterpret_cast<const char*>(prunedXattrs.data()),
2395                           prunedXattrs.size(),
2396                           const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(
2397                                   v.getValue()->getExtMeta())),
2398                           v.getValue()->getExtLen());
2399
2400         return std::make_unique<Item>(v.getKey(),
2401                                       itemMeta.flags,
2402                                       itemMeta.exptime,
2403                                       newValue,
2404                                       itemMeta.cas,
2405                                       v.getBySeqno(),
2406                                       getId(),
2407                                       itemMeta.revSeqno);
2408     } else {
2409         return {};
2410     }
2411 }