MB-24055: Reduce HashTable::defaultNumBuckets from 1531 to 3
[ep-engine.git] / src / vbucket.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <functional>
21 #include <list>
22 #include <set>
23 #include <string>
24 #include <vector>
25
26 #include "atomic.h"
27 #include "bgfetcher.h"
28 #include "conflict_resolution.h"
29 #include "ep_engine.h"
30 #include "ep_types.h"
31 #include "failover-table.h"
32 #include "flusher.h"
33 #include "pre_link_document_context.h"
34
35 #define STATWRITER_NAMESPACE vbucket
36 #include "statwriter.h"
37 #undef STATWRITER_NAMESPACE
38 #include "stored_value_factories.h"
39
40 #include "vbucket.h"
41
42 #include <xattr/blob.h>
43 #include <xattr/utils.h>
44
45 /* Macros */
46 const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
47 const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
48
49 /* Statics definitions */
50 std::atomic<size_t> VBucket::chkFlushTimeout(MIN_CHK_FLUSH_TIMEOUT);
51
52 VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
53     std::vector<uint16_t> tmp(acceptable.size() + other.size());
54     std::vector<uint16_t>::iterator end;
55     end = std::set_symmetric_difference(acceptable.begin(),
56                                         acceptable.end(),
57                                         other.acceptable.begin(),
58                                         other.acceptable.end(),
59                                         tmp.begin());
60     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
61 }
62
63 VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
64                                                                         const {
65     std::vector<uint16_t> tmp(acceptable.size() + other.size());
66     std::vector<uint16_t>::iterator end;
67
68     end = std::set_intersection(acceptable.begin(), acceptable.end(),
69                                 other.acceptable.begin(),
70                                 other.acceptable.end(),
71                                 tmp.begin());
72     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
73 }
74
75 static bool isRange(std::set<uint16_t>::const_iterator it,
76                     const std::set<uint16_t>::const_iterator &end,
77                     size_t &length)
78 {
79     length = 0;
80     for (uint16_t val = *it;
81          it != end && (val + length) == *it;
82          ++it, ++length) {
83         // empty
84     }
85
86     --length;
87
88     return length > 1;
89 }
90
91 std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
92 {
93     std::set<uint16_t>::const_iterator it;
94
95     if (filter.acceptable.empty()) {
96         out << "{ empty }";
97     } else {
98         bool needcomma = false;
99         out << "{ ";
100         for (it = filter.acceptable.begin();
101              it != filter.acceptable.end();
102              ++it) {
103             if (needcomma) {
104                 out << ", ";
105             }
106
107             size_t length;
108             if (isRange(it, filter.acceptable.end(), length)) {
109                 std::set<uint16_t>::iterator last = it;
110                 for (size_t i = 0; i < length; ++i) {
111                     ++last;
112                 }
113                 out << "[" << *it << "," << *last << "]";
114                 it = last;
115             } else {
116                 out << *it;
117             }
118             needcomma = true;
119         }
120         out << " }";
121     }
122
123     return out;
124 }
125
126 const vbucket_state_t VBucket::ACTIVE =
127                      static_cast<vbucket_state_t>(htonl(vbucket_state_active));
128 const vbucket_state_t VBucket::REPLICA =
129                     static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
130 const vbucket_state_t VBucket::PENDING =
131                     static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
132 const vbucket_state_t VBucket::DEAD =
133                     static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
134
135 VBucket::VBucket(id_type i,
136                  vbucket_state_t newState,
137                  EPStats& st,
138                  CheckpointConfig& chkConfig,
139                  int64_t lastSeqno,
140                  uint64_t lastSnapStart,
141                  uint64_t lastSnapEnd,
142                  std::unique_ptr<FailoverTable> table,
143                  std::shared_ptr<Callback<id_type> > flusherCb,
144                  std::unique_ptr<AbstractStoredValueFactory> valFact,
145                  NewSeqnoCallback newSeqnoCb,
146                  Configuration& config,
147                  item_eviction_policy_t evictionPolicy,
148                  vbucket_state_t initState,
149                  uint64_t purgeSeqno,
150                  uint64_t maxCas,
151                  const std::string& collectionsManifest)
152     : ht(st, std::move(valFact)),
153       checkpointManager(st,
154                         i,
155                         chkConfig,
156                         lastSeqno,
157                         lastSnapStart,
158                         lastSnapEnd,
159                         flusherCb),
160       failovers(std::move(table)),
161       opsCreate(0),
162       opsUpdate(0),
163       opsDelete(0),
164       opsReject(0),
165       dirtyQueueSize(0),
166       dirtyQueueMem(0),
167       dirtyQueueFill(0),
168       dirtyQueueDrain(0),
169       dirtyQueueAge(0),
170       dirtyQueuePendingWrites(0),
171       metaDataDisk(0),
172       numExpiredItems(0),
173       eviction(evictionPolicy),
174       stats(st),
175       persistenceSeqno(0),
176       numHpVBReqs(0),
177       id(i),
178       state(newState),
179       initialState(initState),
180       purge_seqno(purgeSeqno),
181       takeover_backed_up(false),
182       persisted_snapshot_start(lastSnapStart),
183       persisted_snapshot_end(lastSnapEnd),
184       rollbackItemCount(0),
185       hlc(maxCas,
186           std::chrono::microseconds(config.getHlcDriftAheadThresholdUs()),
187           std::chrono::microseconds(config.getHlcDriftBehindThresholdUs())),
188       statPrefix("vb_" + std::to_string(i)),
189       persistenceCheckpointId(0),
190       bucketCreation(false),
191       bucketDeletion(false),
192       newSeqnoCb(std::move(newSeqnoCb)),
193       manifest(collectionsManifest) {
194     if (config.getConflictResolutionType().compare("lww") == 0) {
195         conflictResolver.reset(new LastWriteWinsResolution());
196     } else {
197         conflictResolver.reset(new RevisionSeqnoResolution());
198     }
199
200     backfill.isBackfillPhase = false;
201     pendingOpsStart = 0;
202     stats.memOverhead->fetch_add(sizeof(VBucket)
203                                 + ht.memorySize() + sizeof(CheckpointManager));
204     LOG(EXTENSION_LOG_NOTICE,
205         "VBucket: created vbucket:%" PRIu16 " with state:%s "
206                 "initialState:%s "
207                 "lastSeqno:%" PRIu64 " "
208                 "lastSnapshot:{%" PRIu64 ",%" PRIu64 "} "
209                 "persisted_snapshot:{%" PRIu64 ",%" PRIu64 "} "
210                 "max_cas:%" PRIu64,
211         id, VBucket::toString(state), VBucket::toString(initialState),
212         lastSeqno, lastSnapStart, lastSnapEnd,
213         persisted_snapshot_start, persisted_snapshot_end,
214         getMaxCas());
215 }
216
217 VBucket::~VBucket() {
218     if (!pendingOps.empty()) {
219         LOG(EXTENSION_LOG_WARNING,
220             "~Vbucket(): vbucket:%" PRIu16 " has %ld pending ops",
221             id,
222             pendingOps.size());
223     }
224
225     stats.diskQueueSize.fetch_sub(dirtyQueueSize.load());
226     stats.vbBackfillQueueSize.fetch_sub(getBackfillSize());
227
228     // Clear out the bloomfilter(s)
229     clearFilter();
230
231     stats.memOverhead->fetch_sub(sizeof(VBucket) + ht.memorySize() +
232                                 sizeof(CheckpointManager));
233
234     LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
235 }
236
237 void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
238                          ENGINE_ERROR_CODE code) {
239     std::unique_lock<std::mutex> lh(pendingOpLock);
240
241     if (pendingOpsStart > 0) {
242         hrtime_t now = gethrtime();
243         if (now > pendingOpsStart) {
244             hrtime_t d = (now - pendingOpsStart) / 1000;
245             stats.pendingOpsHisto.add(d);
246             atomic_setIfBigger(stats.pendingOpsMaxDuration, d);
247         }
248     } else {
249         return;
250     }
251
252     pendingOpsStart = 0;
253     stats.pendingOps.fetch_sub(pendingOps.size());
254     atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
255
256     while (!pendingOps.empty()) {
257         const void *pendingOperation = pendingOps.back();
258         pendingOps.pop_back();
259         // We don't want to hold the pendingOpLock when
260         // calling notifyIOComplete.
261         lh.unlock();
262         engine.notifyIOComplete(pendingOperation, code);
263         lh.lock();
264     }
265
266     LOG(EXTENSION_LOG_INFO,
267         "Fired pendings ops for vbucket %" PRIu16 " in state %s\n",
268         id, VBucket::toString(state));
269 }
270
271 void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
272
273     if (state == vbucket_state_active) {
274         fireAllOps(engine, ENGINE_SUCCESS);
275     } else if (state == vbucket_state_pending) {
276         // Nothing
277     } else {
278         fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
279     }
280 }
281
282 void VBucket::setState(vbucket_state_t to) {
283     vbucket_state_t oldstate;
284     {
285         WriterLockHolder wlh(stateLock);
286         oldstate = state;
287
288         if (to == vbucket_state_active &&
289             checkpointManager.getOpenCheckpointId() < 2) {
290             checkpointManager.setOpenCheckpointId(2);
291         }
292
293         LOG(EXTENSION_LOG_NOTICE,
294             "VBucket::setState: transitioning vbucket:%" PRIu16 " from:%s to:%s",
295             id, VBucket::toString(oldstate), VBucket::toString(to));
296
297         state = to;
298     }
299 }
300
301 vbucket_state VBucket::getVBucketState() const {
302      auto persisted_range = getPersistedSnapshot();
303
304      return vbucket_state{getState(),
305                           getPersistenceCheckpointId(), 0, getHighSeqno(),
306                           getPurgeSeqno(),
307                           persisted_range.start, persisted_range.end,
308                           getMaxCas(), failovers->toJSON()};
309 }
310
311
312
313 void VBucket::doStatsForQueueing(const Item& qi, size_t itemBytes)
314 {
315     ++dirtyQueueSize;
316     dirtyQueueMem.fetch_add(sizeof(Item));
317     ++dirtyQueueFill;
318     dirtyQueueAge.fetch_add(qi.getQueuedTime());
319     dirtyQueuePendingWrites.fetch_add(itemBytes);
320 }
321
322 void VBucket::doStatsForFlushing(const Item& qi, size_t itemBytes) {
323     --dirtyQueueSize;
324     decrDirtyQueueMem(sizeof(Item));
325     ++dirtyQueueDrain;
326     decrDirtyQueueAge(qi.getQueuedTime());
327     decrDirtyQueuePendingWrites(itemBytes);
328 }
329
330 void VBucket::incrMetaDataDisk(const Item& qi) {
331     metaDataDisk.fetch_add(qi.getKey().size() + sizeof(ItemMetaData));
332 }
333
334 void VBucket::decrMetaDataDisk(const Item& qi) {
335     // assume couchstore remove approx this much data from disk
336     metaDataDisk.fetch_sub((qi.getKey().size() + sizeof(ItemMetaData)));
337 }
338
339 void VBucket::resetStats() {
340     opsCreate.store(0);
341     opsUpdate.store(0);
342     opsDelete.store(0);
343     opsReject.store(0);
344
345     stats.diskQueueSize.fetch_sub(dirtyQueueSize.exchange(0));
346     dirtyQueueMem.store(0);
347     dirtyQueueFill.store(0);
348     dirtyQueueAge.store(0);
349     dirtyQueuePendingWrites.store(0);
350     dirtyQueueDrain.store(0);
351
352     hlc.resetStats();
353 }
354
355 template <typename T>
356 void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
357                       const void *c) {
358     std::string stat = statPrefix;
359     if (nm != NULL) {
360         add_prefixed_stat(statPrefix, nm, val, add_stat, c);
361     } else {
362         add_casted_stat(statPrefix.data(), val, add_stat, c);
363     }
364 }
365
366 void VBucket::handlePreExpiry(StoredValue& v) {
367     value_t value = v.getValue();
368     if (value) {
369         std::unique_ptr<Item> itm(v.toItem(false, id));
370         item_info itm_info;
371         EventuallyPersistentEngine* engine = ObjectRegistry::getCurrentEngine();
372         itm_info = itm->toItemInfo(failovers->getLatestUUID());
373         value_t new_val(Blob::Copy(*value));
374         itm->setValue(new_val);
375
376         SERVER_HANDLE_V1* sapi = engine->getServerApi();
377         /* TODO: In order to minimize allocations, the callback needs to
378          * allocate an item whose value size will be exactly the size of the
379          * value after pre-expiry is performed.
380          */
381         if (sapi->document->pre_expiry(itm_info)) {
382             char* extMeta = const_cast<char *>(v.getValue()->getExtMeta());
383             Item new_item(v.getKey(), v.getFlags(), v.getExptime(),
384                           itm_info.value[0].iov_base, itm_info.value[0].iov_len,
385                           reinterpret_cast<uint8_t*>(extMeta),
386                           v.getValue()->getExtLen(), v.getCas(),
387                           v.getBySeqno(), id, v.getRevSeqno(),
388                           v.getNRUValue());
389
390             new_item.setDeleted();
391             v.setValue(new_item, ht);
392         }
393     }
394 }
395
396 size_t VBucket::getNumNonResidentItems() const {
397     if (eviction == VALUE_ONLY) {
398         return ht.getNumInMemoryNonResItems();
399     } else {
400         size_t num_items = ht.getNumItems();
401         size_t num_res_items = ht.getNumInMemoryItems() -
402                                ht.getNumInMemoryNonResItems();
403         return num_items > num_res_items ? (num_items - num_res_items) : 0;
404     }
405 }
406
407
408 uint64_t VBucket::getPersistenceCheckpointId() const {
409     return persistenceCheckpointId.load();
410 }
411
412 void VBucket::setPersistenceCheckpointId(uint64_t checkpointId) {
413     persistenceCheckpointId.store(checkpointId);
414 }
415
416 void VBucket::markDirty(const DocKey& key) {
417     auto hbl = ht.getLockedBucket(key);
418     StoredValue* v = ht.unlocked_find(
419             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::Yes);
420     if (v) {
421         v->markDirty();
422     } else {
423         LOG(EXTENSION_LOG_WARNING, "markDirty: Error marking dirty, a key "
424             "is missing from vb:%" PRIu16, id);
425     }
426 }
427
428 bool VBucket::isResidentRatioUnderThreshold(float threshold) {
429     if (eviction != FULL_EVICTION) {
430         throw std::invalid_argument("VBucket::isResidentRatioUnderThreshold: "
431                 "policy (which is " + std::to_string(eviction) +
432                 ") must be FULL_EVICTION");
433     }
434     size_t num_items = getNumItems();
435     size_t num_non_resident_items = getNumNonResidentItems();
436     if (threshold >= ((float)(num_items - num_non_resident_items) /
437                                                                 num_items)) {
438         return true;
439     } else {
440         return false;
441     }
442 }
443
444 void VBucket::createFilter(size_t key_count, double probability) {
445     // Create the actual bloom filter upon vbucket creation during
446     // scenarios:
447     //      - Bucket creation
448     //      - Rebalance
449     LockHolder lh(bfMutex);
450     if (bFilter == nullptr && tempFilter == nullptr) {
451         bFilter = std::make_unique<BloomFilter>(key_count, probability,
452                                         BFILTER_ENABLED);
453     } else {
454         LOG(EXTENSION_LOG_WARNING, "(vb %" PRIu16 ") Bloom filter / Temp filter"
455             " already exist!", id);
456     }
457 }
458
459 void VBucket::initTempFilter(size_t key_count, double probability) {
460     // Create a temp bloom filter with status as COMPACTING,
461     // if the main filter is found to exist, set its state to
462     // COMPACTING as well.
463     LockHolder lh(bfMutex);
464     tempFilter = std::make_unique<BloomFilter>(key_count, probability,
465                                      BFILTER_COMPACTING);
466     if (bFilter) {
467         bFilter->setStatus(BFILTER_COMPACTING);
468     }
469 }
470
471 void VBucket::addToFilter(const DocKey& key) {
472     LockHolder lh(bfMutex);
473     if (bFilter) {
474         bFilter->addKey(key);
475     }
476
477     // If the temp bloom filter is not found to be NULL,
478     // it means that compaction is running on the particular
479     // vbucket. Therefore add the key to the temp filter as
480     // well, as once compaction completes the temp filter
481     // will replace the main bloom filter.
482     if (tempFilter) {
483         tempFilter->addKey(key);
484     }
485 }
486
487 bool VBucket::maybeKeyExistsInFilter(const DocKey& key) {
488     LockHolder lh(bfMutex);
489     if (bFilter) {
490         return bFilter->maybeKeyExists(key);
491     } else {
492         // If filter doesn't exist, allow the BgFetch to go through.
493         return true;
494     }
495 }
496
497 bool VBucket::isTempFilterAvailable() {
498     LockHolder lh(bfMutex);
499     if (tempFilter &&
500         (tempFilter->getStatus() == BFILTER_COMPACTING ||
501          tempFilter->getStatus() == BFILTER_ENABLED)) {
502         return true;
503     } else {
504         return false;
505     }
506 }
507
508 void VBucket::addToTempFilter(const DocKey& key) {
509     // Keys will be added to only the temp filter during
510     // compaction.
511     LockHolder lh(bfMutex);
512     if (tempFilter) {
513         tempFilter->addKey(key);
514     }
515 }
516
517 void VBucket::swapFilter() {
518     // Delete the main bloom filter and replace it with
519     // the temp filter that was populated during compaction,
520     // only if the temp filter's state is found to be either at
521     // COMPACTING or ENABLED (if in the case the user enables
522     // bloomfilters for some reason while compaction was running).
523     // Otherwise, it indicates that the filter's state was
524     // possibly disabled during compaction, therefore clear out
525     // the temp filter. If it gets enabled at some point, a new
526     // bloom filter will be made available after the next
527     // compaction.
528
529     LockHolder lh(bfMutex);
530     if (tempFilter) {
531         bFilter.reset();
532
533         if (tempFilter->getStatus() == BFILTER_COMPACTING ||
534              tempFilter->getStatus() == BFILTER_ENABLED) {
535             bFilter = std::move(tempFilter);
536             bFilter->setStatus(BFILTER_ENABLED);
537         }
538         tempFilter.reset();
539     }
540 }
541
542 void VBucket::clearFilter() {
543     LockHolder lh(bfMutex);
544     bFilter.reset();
545     tempFilter.reset();
546 }
547
548 void VBucket::setFilterStatus(bfilter_status_t to) {
549     LockHolder lh(bfMutex);
550     if (bFilter) {
551         bFilter->setStatus(to);
552     }
553     if (tempFilter) {
554         tempFilter->setStatus(to);
555     }
556 }
557
558 std::string VBucket::getFilterStatusString() {
559     LockHolder lh(bfMutex);
560     if (bFilter) {
561         return bFilter->getStatusString();
562     } else if (tempFilter) {
563         return tempFilter->getStatusString();
564     } else {
565         return "DOESN'T EXIST";
566     }
567 }
568
569 size_t VBucket::getFilterSize() {
570     LockHolder lh(bfMutex);
571     if (bFilter) {
572         return bFilter->getFilterSize();
573     } else {
574         return 0;
575     }
576 }
577
578 size_t VBucket::getNumOfKeysInFilter() {
579     LockHolder lh(bfMutex);
580     if (bFilter) {
581         return bFilter->getNumOfKeysInFilter();
582     } else {
583         return 0;
584     }
585 }
586
587 VBNotifyCtx VBucket::queueDirty(
588         StoredValue& v,
589         const GenerateBySeqno generateBySeqno,
590         const GenerateCas generateCas,
591         const bool isBackfillItem,
592         PreLinkDocumentContext* preLinkDocumentContext) {
593     VBNotifyCtx notifyCtx;
594
595     queued_item qi(v.toItem(false, getId()));
596
597     if (isBackfillItem) {
598         queueBackfillItem(qi, generateBySeqno);
599         notifyCtx.notifyFlusher = true;
600         /* During backfill on a TAP receiver we need to update the snapshot
601          range in the checkpoint. Has to be done here because in case of TAP
602          backfill, above, we use vb.queueBackfillItem() instead of
603          vb.checkpointManager.queueDirty() */
604         if (generateBySeqno == GenerateBySeqno::Yes) {
605             checkpointManager.resetSnapshotRange();
606         }
607     } else {
608         notifyCtx.notifyFlusher =
609                 checkpointManager.queueDirty(*this,
610                                              qi,
611                                              generateBySeqno,
612                                              generateCas,
613                                              preLinkDocumentContext);
614         notifyCtx.notifyReplication = true;
615         if (GenerateCas::Yes == generateCas) {
616             v.setCas(qi->getCas());
617         }
618     }
619
620     v.setBySeqno(qi->getBySeqno());
621     notifyCtx.bySeqno = qi->getBySeqno();
622
623     return notifyCtx;
624 }
625
626 StoredValue* VBucket::fetchValidValue(HashTable::HashBucketLock& hbl,
627                                       const DocKey& key,
628                                       WantsDeleted wantsDeleted,
629                                       TrackReference trackReference,
630                                       QueueExpired queueExpired) {
631     if (!hbl.getHTLock()) {
632         throw std::logic_error(
633                 "Hash bucket lock not held in "
634                 "VBucket::fetchValidValue() for hash bucket: " +
635                 std::to_string(hbl.getBucketNum()) + "for key: " +
636                 std::string(reinterpret_cast<const char*>(key.data()),
637                             key.size()));
638     }
639     StoredValue* v = ht.unlocked_find(
640             key, hbl.getBucketNum(), wantsDeleted, trackReference);
641     if (v && !v->isDeleted() && !v->isTempItem()) {
642         // In the deleted case, we ignore expiration time.
643         if (v->isExpired(ep_real_time())) {
644             if (getState() != vbucket_state_active) {
645                 return wantsDeleted == WantsDeleted::Yes ? v : NULL;
646             }
647
648             // queueDirty only allowed on active VB
649             if (queueExpired == QueueExpired::Yes &&
650                 getState() == vbucket_state_active) {
651                 incExpirationStat(ExpireBy::Access);
652                 handlePreExpiry(*v);
653                 VBNotifyCtx notifyCtx;
654                 std::tie(std::ignore, v, notifyCtx) =
655                         processExpiredItem(hbl, *v);
656                 notifyNewSeqno(notifyCtx);
657             }
658             return wantsDeleted == WantsDeleted::Yes ? v : NULL;
659         }
660     }
661     return v;
662 }
663
664 void VBucket::incExpirationStat(const ExpireBy source) {
665     switch (source) {
666     case ExpireBy::Pager:
667         ++stats.expired_pager;
668         break;
669     case ExpireBy::Compactor:
670         ++stats.expired_compactor;
671         break;
672     case ExpireBy::Access:
673         ++stats.expired_access;
674         break;
675     }
676     ++numExpiredItems;
677 }
678
679 MutationStatus VBucket::setFromInternal(Item& itm) {
680     return ht.set(itm);
681 }
682
683 ENGINE_ERROR_CODE VBucket::set(Item& itm,
684                                const void* cookie,
685                                EventuallyPersistentEngine& engine,
686                                const int bgFetchDelay) {
687     bool cas_op = (itm.getCas() != 0);
688     auto hbl = ht.getLockedBucket(itm.getKey());
689     StoredValue* v = ht.unlocked_find(itm.getKey(),
690                                       hbl.getBucketNum(),
691                                       WantsDeleted::Yes,
692                                       TrackReference::No);
693     if (v && v->isLocked(ep_current_time()) &&
694         (getState() == vbucket_state_replica ||
695          getState() == vbucket_state_pending)) {
696         v->unlock();
697     }
698
699     bool maybeKeyExists = true;
700     // If we didn't find a valid item, check Bloomfilter's prediction if in
701     // full eviction policy and for a CAS operation.
702     if ((v == nullptr || v->isTempInitialItem()) &&
703         (eviction == FULL_EVICTION) && (itm.getCas() != 0)) {
704         // Check Bloomfilter's prediction
705         if (!maybeKeyExistsInFilter(itm.getKey())) {
706             maybeKeyExists = false;
707         }
708     }
709
710     PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
711     VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
712                                GenerateCas::Yes,
713                                TrackCasDrift::No,
714                                /*isBackfillItem*/ false,
715                                &preLinkDocumentContext);
716
717     MutationStatus status;
718     VBNotifyCtx notifyCtx;
719     std::tie(status, notifyCtx) = processSet(hbl,
720                                              v,
721                                              itm,
722                                              itm.getCas(),
723                                              /*allowExisting*/ true,
724                                              /*hashMetaData*/ false,
725                                              &queueItmCtx,
726                                              maybeKeyExists);
727
728     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
729     switch (status) {
730     case MutationStatus::NoMem:
731         ret = ENGINE_ENOMEM;
732         break;
733     case MutationStatus::InvalidCas:
734         ret = ENGINE_KEY_EEXISTS;
735         break;
736     case MutationStatus::IsLocked:
737         ret = ENGINE_LOCKED;
738         break;
739     case MutationStatus::NotFound:
740         if (cas_op) {
741             ret = ENGINE_KEY_ENOENT;
742             break;
743         }
744     // FALLTHROUGH
745     case MutationStatus::WasDirty:
746     // Even if the item was dirty, push it into the vbucket's open
747     // checkpoint.
748     case MutationStatus::WasClean:
749         notifyNewSeqno(notifyCtx);
750
751         itm.setBySeqno(v->getBySeqno());
752         itm.setCas(v->getCas());
753         break;
754     case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
755         // +
756         // full eviction.
757         if (v) {
758             // temp item is already created. Simply schedule a bg fetch job
759             hbl.getHTLock().unlock();
760             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
761             return ENGINE_EWOULDBLOCK;
762         }
763         ret = addTempItemAndBGFetch(
764                 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
765         break;
766     }
767     }
768
769     return ret;
770 }
771
772 ENGINE_ERROR_CODE VBucket::replace(Item& itm,
773                                    const void* cookie,
774                                    EventuallyPersistentEngine& engine,
775                                    const int bgFetchDelay) {
776     auto hbl = ht.getLockedBucket(itm.getKey());
777     StoredValue* v = ht.unlocked_find(itm.getKey(),
778                                       hbl.getBucketNum(),
779                                       WantsDeleted::Yes,
780                                       TrackReference::No);
781     if (v) {
782         if (v->isDeleted() || v->isTempDeletedItem() ||
783             v->isTempNonExistentItem()) {
784             return ENGINE_KEY_ENOENT;
785         }
786
787         MutationStatus mtype;
788         VBNotifyCtx notifyCtx;
789         if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
790             mtype = MutationStatus::NeedBgFetch;
791         } else {
792             PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
793             VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
794                                        GenerateCas::Yes,
795                                        TrackCasDrift::No,
796                                        /*isBackfillItem*/ false,
797                                        &preLinkDocumentContext);
798             std::tie(mtype, notifyCtx) = processSet(hbl,
799                                                     v,
800                                                     itm,
801                                                     0,
802                                                     /*allowExisting*/ true,
803                                                     /*hasMetaData*/ false,
804                                                     &queueItmCtx);
805         }
806
807         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
808         switch (mtype) {
809         case MutationStatus::NoMem:
810             ret = ENGINE_ENOMEM;
811             break;
812         case MutationStatus::IsLocked:
813             ret = ENGINE_LOCKED;
814             break;
815         case MutationStatus::InvalidCas:
816         case MutationStatus::NotFound:
817             ret = ENGINE_NOT_STORED;
818             break;
819         // FALLTHROUGH
820         case MutationStatus::WasDirty:
821         // Even if the item was dirty, push it into the vbucket's open
822         // checkpoint.
823         case MutationStatus::WasClean:
824             notifyNewSeqno(notifyCtx);
825
826             itm.setBySeqno(v->getBySeqno());
827             itm.setCas(v->getCas());
828             break;
829         case MutationStatus::NeedBgFetch: {
830             // temp item is already created. Simply schedule a bg fetch job
831             hbl.getHTLock().unlock();
832             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
833             ret = ENGINE_EWOULDBLOCK;
834             break;
835         }
836         }
837
838         return ret;
839     } else {
840         if (eviction == VALUE_ONLY) {
841             return ENGINE_KEY_ENOENT;
842         }
843
844         if (maybeKeyExistsInFilter(itm.getKey())) {
845             return addTempItemAndBGFetch(
846                     hbl, itm.getKey(), cookie, engine, bgFetchDelay, false);
847         } else {
848             // As bloomfilter predicted that item surely doesn't exist
849             // on disk, return ENOENT for replace().
850             return ENGINE_KEY_ENOENT;
851         }
852     }
853 }
854
855 ENGINE_ERROR_CODE VBucket::addBackfillItem(Item& itm,
856                                            const GenerateBySeqno genBySeqno) {
857     auto hbl = ht.getLockedBucket(itm.getKey());
858     StoredValue* v = ht.unlocked_find(itm.getKey(),
859                                       hbl.getBucketNum(),
860                                       WantsDeleted::Yes,
861                                       TrackReference::No);
862
863     // Note that this function is only called on replica or pending vbuckets.
864     if (v && v->isLocked(ep_current_time())) {
865         v->unlock();
866     }
867
868     VBQueueItemCtx queueItmCtx(genBySeqno,
869                                GenerateCas::No,
870                                TrackCasDrift::No,
871                                /*isBackfillItem*/ true,
872                                nullptr /* No pre link should happen */);
873     MutationStatus status;
874     VBNotifyCtx notifyCtx;
875     std::tie(status, notifyCtx) = processSet(hbl,
876                                              v,
877                                              itm,
878                                              0,
879                                              /*allowExisting*/ true,
880                                              /*hasMetaData*/ true,
881                                              &queueItmCtx);
882
883     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
884     switch (status) {
885     case MutationStatus::NoMem:
886         ret = ENGINE_ENOMEM;
887         break;
888     case MutationStatus::InvalidCas:
889     case MutationStatus::IsLocked:
890         ret = ENGINE_KEY_EEXISTS;
891         break;
892     case MutationStatus::WasDirty:
893     // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
894     // set correctly, and also the sequence numbers are ordered correctly.
895     // (MB-14003)
896     case MutationStatus::NotFound:
897     // FALLTHROUGH
898     case MutationStatus::WasClean: {
899         setMaxCas(v->getCas());
900         // we unlock ht lock here because we want to avoid potential lock
901         // inversions arising from notifyNewSeqno() call
902         hbl.getHTLock().unlock();
903         notifyNewSeqno(notifyCtx);
904     } break;
905     case MutationStatus::NeedBgFetch:
906         throw std::logic_error(
907                 "VBucket::addBackfillItem: "
908                 "SET on a non-active vbucket should not require a "
909                 "bg_metadata_fetch.");
910     }
911
912     return ret;
913 }
914
915 ENGINE_ERROR_CODE VBucket::setWithMeta(Item& itm,
916                                        const uint64_t cas,
917                                        uint64_t* seqno,
918                                        const void* cookie,
919                                        EventuallyPersistentEngine& engine,
920                                        const int bgFetchDelay,
921                                        const bool force,
922                                        const bool allowExisting,
923                                        const GenerateBySeqno genBySeqno,
924                                        const GenerateCas genCas,
925                                        const bool isReplication) {
926     auto hbl = ht.getLockedBucket(itm.getKey());
927     StoredValue* v = ht.unlocked_find(itm.getKey(),
928                                       hbl.getBucketNum(),
929                                       WantsDeleted::Yes,
930                                       TrackReference::No);
931
932     bool maybeKeyExists = true;
933     if (!force) {
934         if (v) {
935             if (v->isTempInitialItem()) {
936                 bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
937                 return ENGINE_EWOULDBLOCK;
938             }
939
940             if (!(conflictResolver->resolve(*v,
941                                             itm.getMetaData(),
942                                             itm.getDataType(),
943                                             itm.isDeleted()))) {
944                 ++stats.numOpsSetMetaResolutionFailed;
945                 return ENGINE_KEY_EEXISTS;
946             }
947         } else {
948             if (maybeKeyExistsInFilter(itm.getKey())) {
949                 return addTempItemAndBGFetch(hbl,
950                                              itm.getKey(),
951                                              cookie,
952                                              engine,
953                                              bgFetchDelay,
954                                              true,
955                                              isReplication);
956             } else {
957                 maybeKeyExists = false;
958             }
959         }
960     } else {
961         if (eviction == FULL_EVICTION) {
962             // Check Bloomfilter's prediction
963             if (!maybeKeyExistsInFilter(itm.getKey())) {
964                 maybeKeyExists = false;
965             }
966         }
967     }
968
969     if (v && v->isLocked(ep_current_time()) &&
970         (getState() == vbucket_state_replica ||
971          getState() == vbucket_state_pending)) {
972         v->unlock();
973     }
974
975     VBQueueItemCtx queueItmCtx(genBySeqno,
976                                genCas,
977                                TrackCasDrift::Yes,
978                                /*isBackfillItem*/ false,
979                                nullptr /* No pre link step needed */);
980     MutationStatus status;
981     VBNotifyCtx notifyCtx;
982     std::tie(status, notifyCtx) = processSet(hbl,
983                                              v,
984                                              itm,
985                                              cas,
986                                              allowExisting,
987                                              true,
988                                              &queueItmCtx,
989                                              maybeKeyExists,
990                                              isReplication);
991
992     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
993     switch (status) {
994     case MutationStatus::NoMem:
995         ret = ENGINE_ENOMEM;
996         break;
997     case MutationStatus::InvalidCas:
998         ret = ENGINE_KEY_EEXISTS;
999         break;
1000     case MutationStatus::IsLocked:
1001         ret = ENGINE_LOCKED;
1002         break;
1003     case MutationStatus::WasDirty:
1004     case MutationStatus::WasClean: {
1005         if (seqno) {
1006             *seqno = static_cast<uint64_t>(v->getBySeqno());
1007         }
1008         // we unlock ht lock here because we want to avoid potential lock
1009         // inversions arising from notifyNewSeqno() call
1010         hbl.getHTLock().unlock();
1011         notifyNewSeqno(notifyCtx);
1012     } break;
1013     case MutationStatus::NotFound:
1014         ret = ENGINE_KEY_ENOENT;
1015         break;
1016     case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
1017         // + full eviction.
1018         if (v) { // temp item is already created. Simply schedule a
1019             hbl.getHTLock().unlock(); // bg fetch job.
1020             bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1021             return ENGINE_EWOULDBLOCK;
1022         }
1023         ret = addTempItemAndBGFetch(hbl,
1024                                     itm.getKey(),
1025                                     cookie,
1026                                     engine,
1027                                     bgFetchDelay,
1028                                     true,
1029                                     isReplication);
1030     }
1031     }
1032
1033     return ret;
1034 }
1035
1036 ENGINE_ERROR_CODE VBucket::deleteItem(const DocKey& key,
1037                                       uint64_t& cas,
1038                                       const void* cookie,
1039                                       EventuallyPersistentEngine& engine,
1040                                       const int bgFetchDelay,
1041                                       ItemMetaData* itemMeta,
1042                                       mutation_descr_t* mutInfo) {
1043     auto hbl = ht.getLockedBucket(key);
1044     StoredValue* v = ht.unlocked_find(
1045             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1046
1047     if (!v || v->isDeleted() || v->isTempItem()) {
1048         if (eviction == VALUE_ONLY) {
1049             return ENGINE_KEY_ENOENT;
1050         } else { // Full eviction.
1051             if (!v) { // Item might be evicted from cache.
1052                 if (maybeKeyExistsInFilter(key)) {
1053                     return addTempItemAndBGFetch(
1054                             hbl, key, cookie, engine, bgFetchDelay, true);
1055                 } else {
1056                     // As bloomfilter predicted that item surely doesn't
1057                     // exist on disk, return ENOENT for deleteItem().
1058                     return ENGINE_KEY_ENOENT;
1059                 }
1060             } else if (v->isTempInitialItem()) {
1061                 hbl.getHTLock().unlock();
1062                 bgFetch(key, cookie, engine, bgFetchDelay, true);
1063                 return ENGINE_EWOULDBLOCK;
1064             } else { // Non-existent or deleted key.
1065                 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1066                     // Delete a temp non-existent item to ensure that
1067                     // if a delete were issued over an item that doesn't
1068                     // exist, then we don't preserve a temp item.
1069                     deleteStoredValue(hbl, *v);
1070                 }
1071                 return ENGINE_KEY_ENOENT;
1072             }
1073         }
1074     }
1075
1076     if (v->isLocked(ep_current_time()) &&
1077         (getState() == vbucket_state_replica ||
1078          getState() == vbucket_state_pending)) {
1079         v->unlock();
1080     }
1081
1082     if (itemMeta != nullptr) {
1083         itemMeta->cas = v->getCas();
1084     }
1085
1086     MutationStatus delrv;
1087     VBNotifyCtx notifyCtx;
1088     if (v->isExpired(ep_real_time())) {
1089         std::tie(delrv, v, notifyCtx) = processExpiredItem(hbl, *v);
1090     } else {
1091         ItemMetaData metadata;
1092         metadata.revSeqno = v->getRevSeqno() + 1;
1093         std::tie(delrv, v, notifyCtx) =
1094                 processSoftDelete(hbl,
1095                                   *v,
1096                                   cas,
1097                                   metadata,
1098                                   VBQueueItemCtx(GenerateBySeqno::Yes,
1099                                                  GenerateCas::Yes,
1100                                                  TrackCasDrift::No,
1101                                                  /*isBackfillItem*/ false,
1102                                                  nullptr /* no pre link */),
1103                                   /*use_meta*/ false,
1104                                   /*bySeqno*/ v->getBySeqno());
1105     }
1106
1107     uint64_t seqno = 0;
1108     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1109     switch (delrv) {
1110     case MutationStatus::NoMem:
1111         ret = ENGINE_ENOMEM;
1112         break;
1113     case MutationStatus::InvalidCas:
1114         ret = ENGINE_KEY_EEXISTS;
1115         break;
1116     case MutationStatus::IsLocked:
1117         ret = ENGINE_LOCKED_TMPFAIL;
1118         break;
1119     case MutationStatus::NotFound:
1120         ret = ENGINE_KEY_ENOENT;
1121     /* Fallthrough:
1122      * A NotFound return value at this point indicates that the
1123      * item has expired. But, a deletion still needs to be queued
1124      * for the item in order to persist it.
1125      */
1126     case MutationStatus::WasClean:
1127     case MutationStatus::WasDirty:
1128         if (itemMeta != nullptr) {
1129             itemMeta->revSeqno = v->getRevSeqno();
1130             itemMeta->flags = v->getFlags();
1131             itemMeta->exptime = v->getExptime();
1132         }
1133
1134         notifyNewSeqno(notifyCtx);
1135         seqno = static_cast<uint64_t>(v->getBySeqno());
1136         cas = v->getCas();
1137
1138         if (delrv != MutationStatus::NotFound) {
1139             if (mutInfo) {
1140                 mutInfo->seqno = seqno;
1141                 mutInfo->vbucket_uuid = failovers->getLatestUUID();
1142             }
1143             if (itemMeta != nullptr) {
1144                 itemMeta->cas = v->getCas();
1145             }
1146         }
1147         break;
1148     case MutationStatus::NeedBgFetch:
1149         // We already figured out if a bg fetch is requred for a full-evicted
1150         // item above.
1151         throw std::logic_error(
1152                 "VBucket::deleteItem: "
1153                 "Unexpected NEEDS_BG_FETCH from processSoftDelete");
1154     }
1155     return ret;
1156 }
1157
1158 ENGINE_ERROR_CODE VBucket::deleteWithMeta(const DocKey& key,
1159                                           uint64_t& cas,
1160                                           uint64_t* seqno,
1161                                           const void* cookie,
1162                                           EventuallyPersistentEngine& engine,
1163                                           const int bgFetchDelay,
1164                                           const bool force,
1165                                           const ItemMetaData& itemMeta,
1166                                           const bool backfill,
1167                                           const GenerateBySeqno genBySeqno,
1168                                           const GenerateCas generateCas,
1169                                           const uint64_t bySeqno,
1170                                           const bool isReplication) {
1171     auto hbl = ht.getLockedBucket(key);
1172     StoredValue* v = ht.unlocked_find(
1173             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1174     if (!force) { // Need conflict resolution.
1175         if (v) {
1176             if (v->isTempInitialItem()) {
1177                 bgFetch(key, cookie, engine, bgFetchDelay, true);
1178                 return ENGINE_EWOULDBLOCK;
1179             }
1180
1181             if (!(conflictResolver->resolve(*v,
1182                                             itemMeta,
1183                                             PROTOCOL_BINARY_RAW_BYTES,
1184                                             true))) {
1185                 ++stats.numOpsDelMetaResolutionFailed;
1186                 return ENGINE_KEY_EEXISTS;
1187             }
1188         } else {
1189             // Item is 1) deleted or not existent in the value eviction case OR
1190             // 2) deleted or evicted in the full eviction.
1191             if (maybeKeyExistsInFilter(key)) {
1192                 return addTempItemAndBGFetch(hbl,
1193                                              key,
1194                                              cookie,
1195                                              engine,
1196                                              bgFetchDelay,
1197                                              true,
1198                                              isReplication);
1199             } else {
1200                 // Even though bloomfilter predicted that item doesn't exist
1201                 // on disk, we must put this delete on disk if the cas is valid.
1202                 AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1203                 if (rv == AddStatus::NoMem) {
1204                     return ENGINE_ENOMEM;
1205                 }
1206                 v = ht.unlocked_find(key,
1207                                      hbl.getBucketNum(),
1208                                      WantsDeleted::Yes,
1209                                      TrackReference::No);
1210                 v->setDeleted();
1211             }
1212         }
1213     } else {
1214         if (!v) {
1215             // We should always try to persist a delete here.
1216             AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1217             if (rv == AddStatus::NoMem) {
1218                 return ENGINE_ENOMEM;
1219             }
1220             v = ht.unlocked_find(key,
1221                                  hbl.getBucketNum(),
1222                                  WantsDeleted::Yes,
1223                                  TrackReference::No);
1224             v->setDeleted();
1225             v->setCas(cas);
1226         } else if (v->isTempInitialItem()) {
1227             v->setDeleted();
1228             v->setCas(cas);
1229         }
1230     }
1231
1232     if (v && v->isLocked(ep_current_time()) &&
1233         (getState() == vbucket_state_replica ||
1234          getState() == vbucket_state_pending)) {
1235         v->unlock();
1236     }
1237
1238     MutationStatus delrv;
1239     VBNotifyCtx notifyCtx;
1240     if (!v) {
1241         if (eviction == FULL_EVICTION) {
1242             delrv = MutationStatus::NeedBgFetch;
1243         } else {
1244             delrv = MutationStatus::NotFound;
1245         }
1246     } else {
1247         VBQueueItemCtx queueItmCtx(genBySeqno,
1248                                    generateCas,
1249                                    TrackCasDrift::Yes,
1250                                    backfill,
1251                                    nullptr /* No pre link step needed */);
1252
1253         // system xattrs must remain
1254         std::unique_ptr<Item> itm;
1255         if (mcbp::datatype::is_xattr(v->getDatatype()) &&
1256             (itm = pruneXattrDocument(*v, itemMeta))) {
1257             std::tie(v, delrv, notifyCtx) =
1258                     updateStoredValue(hbl, *v, *itm, &queueItmCtx);
1259         } else {
1260             std::tie(delrv, v, notifyCtx) = processSoftDelete(hbl,
1261                                                               *v,
1262                                                               cas,
1263                                                               itemMeta,
1264                                                               queueItmCtx,
1265                                                               /*use_meta*/ true,
1266                                                               bySeqno);
1267         }
1268     }
1269     cas = v ? v->getCas() : 0;
1270
1271     switch (delrv) {
1272     case MutationStatus::NoMem:
1273         return ENGINE_ENOMEM;
1274     case MutationStatus::InvalidCas:
1275         return ENGINE_KEY_EEXISTS;
1276     case MutationStatus::IsLocked:
1277         return ENGINE_LOCKED_TMPFAIL;
1278     case MutationStatus::NotFound:
1279         return ENGINE_KEY_ENOENT;
1280     case MutationStatus::WasDirty:
1281     case MutationStatus::WasClean: {
1282         if (seqno) {
1283             *seqno = static_cast<uint64_t>(v->getBySeqno());
1284         }
1285         // we unlock ht lock here because we want to avoid potential lock
1286         // inversions arising from notifyNewSeqno() call
1287         hbl.getHTLock().unlock();
1288         notifyNewSeqno(notifyCtx);
1289         break;
1290     }
1291     case MutationStatus::NeedBgFetch:
1292         hbl.getHTLock().unlock();
1293         bgFetch(key, cookie, engine, bgFetchDelay, true);
1294         return ENGINE_EWOULDBLOCK;
1295     }
1296     return ENGINE_SUCCESS;
1297 }
1298
1299 void VBucket::deleteExpiredItem(const DocKey& key,
1300                                 time_t startTime,
1301                                 uint64_t revSeqno,
1302                                 ExpireBy source) {
1303     auto hbl = ht.getLockedBucket(key);
1304     StoredValue* v = ht.unlocked_find(
1305             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1306     if (v) {
1307         if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1308             // This is a temporary item whose background fetch for metadata
1309             // has completed.
1310             bool deleted = deleteStoredValue(hbl, *v);
1311             if (!deleted) {
1312                 throw std::logic_error(
1313                         "VBucket::deleteExpiredItem: "
1314                         "Failed to delete seqno:" +
1315                         std::to_string(v->getBySeqno()) + " from bucket " +
1316                         std::to_string(hbl.getBucketNum()));
1317             }
1318         } else if (v->isExpired(startTime) && !v->isDeleted()) {
1319             handlePreExpiry(*v);
1320             VBNotifyCtx notifyCtx;
1321             std::tie(std::ignore, std::ignore, notifyCtx) =
1322                     processExpiredItem(hbl, *v);
1323             // we unlock ht lock here because we want to avoid potential lock
1324             // inversions arising from notifyNewSeqno() call
1325             hbl.getHTLock().unlock();
1326             notifyNewSeqno(notifyCtx);
1327         }
1328     } else {
1329         if (eviction == FULL_EVICTION) {
1330             // Create a temp item and delete and push it
1331             // into the checkpoint queue, only if the bloomfilter
1332             // predicts that the item may exist on disk.
1333             if (maybeKeyExistsInFilter(key)) {
1334                 AddStatus rv = addTempStoredValue(hbl, key);
1335                 if (rv == AddStatus::NoMem) {
1336                     return;
1337                 }
1338                 v = ht.unlocked_find(key,
1339                                      hbl.getBucketNum(),
1340                                      WantsDeleted::Yes,
1341                                      TrackReference::No);
1342                 v->setDeleted();
1343                 v->setRevSeqno(revSeqno);
1344                 VBNotifyCtx notifyCtx;
1345                 std::tie(std::ignore, std::ignore, notifyCtx) =
1346                         processExpiredItem(hbl, *v);
1347                 // we unlock ht lock here because we want to avoid potential
1348                 // lock inversions arising from notifyNewSeqno() call
1349                 hbl.getHTLock().unlock();
1350                 notifyNewSeqno(notifyCtx);
1351             }
1352         }
1353     }
1354     incExpirationStat(source);
1355 }
1356
1357 ENGINE_ERROR_CODE VBucket::add(Item& itm,
1358                                const void* cookie,
1359                                EventuallyPersistentEngine& engine,
1360                                const int bgFetchDelay) {
1361     auto hbl = ht.getLockedBucket(itm.getKey());
1362     StoredValue* v = ht.unlocked_find(itm.getKey(),
1363                                       hbl.getBucketNum(),
1364                                       WantsDeleted::Yes,
1365                                       TrackReference::No);
1366
1367     bool maybeKeyExists = true;
1368     if ((v == nullptr || v->isTempInitialItem()) &&
1369         (eviction == FULL_EVICTION)) {
1370         // Check bloomfilter's prediction
1371         if (!maybeKeyExistsInFilter(itm.getKey())) {
1372             maybeKeyExists = false;
1373         }
1374     }
1375
1376     PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
1377     VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
1378                                GenerateCas::Yes,
1379                                TrackCasDrift::No,
1380                                /*isBackfillItem*/ false,
1381                                &preLinkDocumentContext);
1382     AddStatus status;
1383     VBNotifyCtx notifyCtx;
1384     std::tie(status, notifyCtx) =
1385             processAdd(hbl, v, itm, maybeKeyExists, false, &queueItmCtx);
1386
1387     switch (status) {
1388     case AddStatus::NoMem:
1389         return ENGINE_ENOMEM;
1390     case AddStatus::Exists:
1391         return ENGINE_NOT_STORED;
1392     case AddStatus::AddTmpAndBgFetch:
1393         return addTempItemAndBGFetch(
1394                 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
1395     case AddStatus::BgFetch:
1396         hbl.getHTLock().unlock();
1397         bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1398         return ENGINE_EWOULDBLOCK;
1399     case AddStatus::Success:
1400     case AddStatus::UnDel:
1401         notifyNewSeqno(notifyCtx);
1402         itm.setBySeqno(v->getBySeqno());
1403         itm.setCas(v->getCas());
1404         break;
1405     }
1406     return ENGINE_SUCCESS;
1407 }
1408
1409 GetValue VBucket::getAndUpdateTtl(const DocKey& key,
1410                                   const void* cookie,
1411                                   EventuallyPersistentEngine& engine,
1412                                   int bgFetchDelay,
1413                                   time_t exptime) {
1414     auto hbl = ht.getLockedBucket(key);
1415     StoredValue* v = fetchValidValue(hbl,
1416                                      key,
1417                                      WantsDeleted::Yes,
1418                                      TrackReference::Yes,
1419                                      QueueExpired::Yes);
1420
1421     if (v) {
1422         if (v->isDeleted() || v->isTempDeletedItem() ||
1423             v->isTempNonExistentItem()) {
1424             return {};
1425         }
1426
1427         if (!v->isResident()) {
1428             bgFetch(key, cookie, engine, bgFetchDelay);
1429             return GetValue(nullptr, ENGINE_EWOULDBLOCK, v->getBySeqno());
1430         }
1431         if (v->isLocked(ep_current_time())) {
1432             return GetValue(nullptr, ENGINE_KEY_EEXISTS, 0);
1433         }
1434
1435         const bool exptime_mutated = exptime != v->getExptime();
1436         if (exptime_mutated) {
1437             v->markDirty();
1438             v->setExptime(exptime);
1439             v->setRevSeqno(v->getRevSeqno() + 1);
1440         }
1441
1442         GetValue rv(
1443                 v->toItem(v->isLocked(ep_current_time()), getId()).release(),
1444                 ENGINE_SUCCESS,
1445                 v->getBySeqno());
1446
1447         if (exptime_mutated) {
1448             VBNotifyCtx notifyCtx = queueDirty(*v);
1449             rv.getValue()->setCas(v->getCas());
1450             // we unlock ht lock here because we want to avoid potential lock
1451             // inversions arising from notifyNewSeqno() call
1452             hbl.getHTLock().unlock();
1453             notifyNewSeqno(notifyCtx);
1454         }
1455
1456         return rv;
1457     } else {
1458         if (eviction == VALUE_ONLY) {
1459             return {};
1460         } else {
1461             if (maybeKeyExistsInFilter(key)) {
1462                 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1463                         hbl, key, cookie, engine, bgFetchDelay, false);
1464                 return GetValue(NULL, ec, -1, true);
1465             } else {
1466                 // As bloomfilter predicted that item surely doesn't exist
1467                 // on disk, return ENOENT for getAndUpdateTtl().
1468                 return {};
1469             }
1470         }
1471     }
1472 }
1473
1474 MutationStatus VBucket::insertFromWarmup(Item& itm,
1475                                          bool eject,
1476                                          bool keyMetaDataOnly) {
1477     if (!StoredValue::hasAvailableSpace(stats, itm)) {
1478         return MutationStatus::NoMem;
1479     }
1480
1481     auto hbl = ht.getLockedBucket(itm.getKey());
1482     StoredValue* v = ht.unlocked_find(itm.getKey(),
1483                                       hbl.getBucketNum(),
1484                                       WantsDeleted::Yes,
1485                                       TrackReference::No);
1486
1487     if (v == NULL) {
1488         v = addNewStoredValue(hbl, itm, /*queueItmCtx*/ nullptr).first;
1489         if (keyMetaDataOnly) {
1490             v->markNotResident();
1491             /* For now ht stats are updated from outside ht. This seems to be
1492                a better option for now than passing a flag to
1493                addNewStoredValue() just for this func */
1494             ++(ht.numNonResidentItems);
1495         }
1496         /* For now ht stats are updated from outside ht. This seems to be
1497            a better option for now than passing a flag to
1498            addNewStoredValue() just for this func.
1499            We need to decrNumTotalItems because ht.numTotalItems is already
1500            set by warmup when it estimated the item count from disk */
1501         ht.decrNumTotalItems();
1502         v->setNewCacheItem(false);
1503     } else {
1504         if (keyMetaDataOnly) {
1505             // We don't have a better error code ;)
1506             return MutationStatus::InvalidCas;
1507         }
1508
1509         // Verify that the CAS isn't changed
1510         if (v->getCas() != itm.getCas()) {
1511             if (v->getCas() == 0) {
1512                 v->setCas(itm.getCas());
1513                 v->setFlags(itm.getFlags());
1514                 v->setExptime(itm.getExptime());
1515                 v->setRevSeqno(itm.getRevSeqno());
1516             } else {
1517                 return MutationStatus::InvalidCas;
1518             }
1519         }
1520         updateStoredValue(hbl, *v, itm, /*queueItmCtx*/ nullptr);
1521     }
1522
1523     v->markClean();
1524
1525     if (eject && !keyMetaDataOnly) {
1526         ht.unlocked_ejectItem(v, eviction);
1527     }
1528
1529     return MutationStatus::NotFound;
1530 }
1531
1532 GetValue VBucket::getInternal(const DocKey& key,
1533                               const void* cookie,
1534                               EventuallyPersistentEngine& engine,
1535                               int bgFetchDelay,
1536                               get_options_t options,
1537                               bool diskFlushAll) {
1538     const TrackReference trackReference = (options & TRACK_REFERENCE)
1539                                                   ? TrackReference::Yes
1540                                                   : TrackReference::No;
1541     const bool getDeletedValue = (options & GET_DELETED_VALUE);
1542     auto hbl = ht.getLockedBucket(key);
1543     StoredValue* v = fetchValidValue(
1544             hbl, key, WantsDeleted::Yes, trackReference, QueueExpired::Yes);
1545     if (v) {
1546         if (v->isDeleted() && !getDeletedValue) {
1547             return GetValue();
1548         }
1549         if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1550             // Delete a temp non-existent item to ensure that
1551             // if the get were issued over an item that doesn't
1552             // exist, then we dont preserve a temp item.
1553             if (options & DELETE_TEMP) {
1554                 deleteStoredValue(hbl, *v);
1555             }
1556             return GetValue();
1557         }
1558
1559         // If the value is not resident, wait for it...
1560         if (!v->isResident()) {
1561             return getInternalNonResident(
1562                     key, cookie, engine, bgFetchDelay, options, *v);
1563         }
1564
1565         // Should we hide (return -1) for the items' CAS?
1566         const bool hide_cas =
1567                 (options & HIDE_LOCKED_CAS) && v->isLocked(ep_current_time());
1568         return GetValue(v->toItem(hide_cas, getId()).release(),
1569                         ENGINE_SUCCESS,
1570                         v->getBySeqno(),
1571                         false,
1572                         v->getNRUValue());
1573     } else {
1574         if (!getDeletedValue && (eviction == VALUE_ONLY || diskFlushAll)) {
1575             return GetValue();
1576         }
1577
1578         if (maybeKeyExistsInFilter(key)) {
1579             ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1580             if (options &
1581                 QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
1582                 ec = addTempItemAndBGFetch(
1583                         hbl, key, cookie, engine, bgFetchDelay, false);
1584             }
1585             return GetValue(NULL, ec, -1, true);
1586         } else {
1587             // As bloomfilter predicted that item surely doesn't exist
1588             // on disk, return ENOENT, for getInternal().
1589             return GetValue();
1590         }
1591     }
1592 }
1593
1594 ENGINE_ERROR_CODE VBucket::getMetaData(const DocKey& key,
1595                                        const void* cookie,
1596                                        EventuallyPersistentEngine& engine,
1597                                        int bgFetchDelay,
1598                                        ItemMetaData& metadata,
1599                                        uint32_t& deleted,
1600                                        uint8_t& datatype) {
1601     deleted = 0;
1602     auto hbl = ht.getLockedBucket(key);
1603     StoredValue* v = ht.unlocked_find(
1604             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1605
1606     if (v) {
1607         stats.numOpsGetMeta++;
1608         if (v->isTempInitialItem()) {
1609             // Need bg meta fetch.
1610             bgFetch(key, cookie, engine, bgFetchDelay, true);
1611             return ENGINE_EWOULDBLOCK;
1612         } else if (v->isTempNonExistentItem()) {
1613             metadata.cas = v->getCas();
1614             return ENGINE_KEY_ENOENT;
1615         } else {
1616             if (v->isTempDeletedItem() || v->isDeleted() ||
1617                 v->isExpired(ep_real_time())) {
1618                 deleted |= GET_META_ITEM_DELETED_FLAG;
1619             }
1620
1621             if (v->isLocked(ep_current_time())) {
1622                 metadata.cas = static_cast<uint64_t>(-1);
1623             } else {
1624                 metadata.cas = v->getCas();
1625             }
1626             metadata.flags = v->getFlags();
1627             metadata.exptime = v->getExptime();
1628             metadata.revSeqno = v->getRevSeqno();
1629             datatype = v->getDatatype();
1630
1631             return ENGINE_SUCCESS;
1632         }
1633     } else {
1634         // The key wasn't found. However, this may be because it was previously
1635         // deleted or evicted with the full eviction strategy.
1636         // So, add a temporary item corresponding to the key to the hash table
1637         // and schedule a background fetch for its metadata from the persistent
1638         // store. The item's state will be updated after the fetch completes.
1639         //
1640         // Schedule this bgFetch only if the key is predicted to be may-be
1641         // existent on disk by the bloomfilter.
1642
1643         if (maybeKeyExistsInFilter(key)) {
1644             return addTempItemAndBGFetch(
1645                     hbl, key, cookie, engine, bgFetchDelay, true);
1646         } else {
1647             stats.numOpsGetMeta++;
1648             return ENGINE_KEY_ENOENT;
1649         }
1650     }
1651 }
1652
1653 ENGINE_ERROR_CODE VBucket::getKeyStats(const DocKey& key,
1654                                        const void* cookie,
1655                                        EventuallyPersistentEngine& engine,
1656                                        int bgFetchDelay,
1657                                        struct key_stats& kstats,
1658                                        WantsDeleted wantsDeleted) {
1659     auto hbl = ht.getLockedBucket(key);
1660     StoredValue* v = fetchValidValue(hbl,
1661                                      key,
1662                                      WantsDeleted::Yes,
1663                                      TrackReference::Yes,
1664                                      QueueExpired::Yes);
1665
1666     if (v) {
1667         if ((v->isDeleted() && wantsDeleted == WantsDeleted::No) ||
1668             v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1669             return ENGINE_KEY_ENOENT;
1670         }
1671         if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
1672             hbl.getHTLock().unlock();
1673             bgFetch(key, cookie, engine, bgFetchDelay, true);
1674             return ENGINE_EWOULDBLOCK;
1675         }
1676         kstats.logically_deleted = v->isDeleted();
1677         kstats.dirty = v->isDirty();
1678         kstats.exptime = v->getExptime();
1679         kstats.flags = v->getFlags();
1680         kstats.cas = v->getCas();
1681         kstats.vb_state = getState();
1682         return ENGINE_SUCCESS;
1683     } else {
1684         if (eviction == VALUE_ONLY) {
1685             return ENGINE_KEY_ENOENT;
1686         } else {
1687             if (maybeKeyExistsInFilter(key)) {
1688                 return addTempItemAndBGFetch(
1689                         hbl, key, cookie, engine, bgFetchDelay, true);
1690             } else {
1691                 // If bgFetch were false, or bloomfilter predicted that
1692                 // item surely doesn't exist on disk, return ENOENT for
1693                 // getKeyStats().
1694                 return ENGINE_KEY_ENOENT;
1695             }
1696         }
1697     }
1698 }
1699
1700 GetValue VBucket::getLocked(const DocKey& key,
1701                             rel_time_t currentTime,
1702                             uint32_t lockTimeout,
1703                             const void* cookie,
1704                             EventuallyPersistentEngine& engine,
1705                             int bgFetchDelay) {
1706     auto hbl = ht.getLockedBucket(key);
1707     StoredValue* v = fetchValidValue(hbl,
1708                                      key,
1709                                      WantsDeleted::Yes,
1710                                      TrackReference::Yes,
1711                                      QueueExpired::Yes);
1712
1713     if (v) {
1714         if (v->isDeleted() || v->isTempNonExistentItem() ||
1715             v->isTempDeletedItem()) {
1716             return GetValue(NULL, ENGINE_KEY_ENOENT);
1717         }
1718
1719         // if v is locked return error
1720         if (v->isLocked(currentTime)) {
1721             return GetValue(NULL, ENGINE_TMPFAIL);
1722         }
1723
1724         // If the value is not resident, wait for it...
1725         if (!v->isResident()) {
1726             if (cookie) {
1727                 bgFetch(key, cookie, engine, bgFetchDelay);
1728             }
1729             return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
1730         }
1731
1732         // acquire lock and increment cas value
1733         v->lock(currentTime + lockTimeout);
1734
1735         auto it = v->toItem(false, getId());
1736         it->setCas(nextHLCCas());
1737         v->setCas(it->getCas());
1738
1739         return GetValue(it.release());
1740
1741     } else {
1742         // No value found in the hashtable.
1743         switch (eviction) {
1744         case VALUE_ONLY:
1745             return GetValue(NULL, ENGINE_KEY_ENOENT);
1746
1747         case FULL_EVICTION:
1748             if (maybeKeyExistsInFilter(key)) {
1749                 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1750                         hbl, key, cookie, engine, bgFetchDelay, false);
1751                 return GetValue(NULL, ec, -1, true);
1752             } else {
1753                 // As bloomfilter predicted that item surely doesn't exist
1754                 // on disk, return ENOENT for getLocked().
1755                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1756             }
1757         }
1758         return GetValue(); // just to prevent compiler warning
1759     }
1760 }
1761
1762 void VBucket::deletedOnDiskCbk(const Item& queuedItem, bool deleted) {
1763     auto hbl = ht.getLockedBucket(queuedItem.getKey());
1764     StoredValue* v = fetchValidValue(hbl,
1765                                      queuedItem.getKey(),
1766                                      WantsDeleted::Yes,
1767                                      TrackReference::No,
1768                                      QueueExpired::Yes);
1769     // Delete the item in the hash table iff:
1770     //  1. Item is existent in hashtable, and deleted flag is true
1771     //  2. rev seqno of queued item matches rev seqno of hash table item
1772     if (v && v->isDeleted() && (queuedItem.getRevSeqno() == v->getRevSeqno())) {
1773         bool isDeleted = deleteStoredValue(hbl, *v);
1774         if (!isDeleted) {
1775             throw std::logic_error(
1776                     "deletedOnDiskCbk:callback: "
1777                     "Failed to delete key with seqno:" +
1778                     std::to_string(v->getBySeqno()) + "' from bucket " +
1779                     std::to_string(hbl.getBucketNum()));
1780         }
1781
1782         /**
1783          * Deleted items are to be added to the bloomfilter,
1784          * in either eviction policy.
1785          */
1786         addToFilter(queuedItem.getKey());
1787     }
1788
1789     if (deleted) {
1790         ++stats.totalPersisted;
1791         ++opsDelete;
1792     }
1793     doStatsForFlushing(queuedItem, queuedItem.size());
1794     --stats.diskQueueSize;
1795     decrMetaDataDisk(queuedItem);
1796 }
1797
1798 bool VBucket::deleteKey(const DocKey& key) {
1799     auto hbl = ht.getLockedBucket(key);
1800     StoredValue* v = ht.unlocked_find(
1801             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1802     if (!v) {
1803         return false;
1804     }
1805     return deleteStoredValue(hbl, *v);
1806 }
1807
1808 void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
1809                                   uint64_t prevHighSeqno) {
1810     failovers->pruneEntries(rollbackResult.highSeqno);
1811     checkpointManager.clear(*this, rollbackResult.highSeqno);
1812     setPersistedSnapshot(rollbackResult.snapStartSeqno,
1813                          rollbackResult.snapEndSeqno);
1814     incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
1815     setBackfillPhase(false);
1816 }
1817
1818 void VBucket::dump() const {
1819     std::cerr << "VBucket[" << this << "] with state: " << toString(getState())
1820               << " numItems:" << getNumItems()
1821               << " numNonResident:" << getNumNonResidentItems()
1822               << " ht: " << std::endl << "  " << ht << std::endl
1823               << "]" << std::endl;
1824 }
1825
1826 void VBucket::_addStats(bool details, ADD_STAT add_stat, const void* c) {
1827     addStat(NULL, toString(state), add_stat, c);
1828     if (details) {
1829         size_t numItems = getNumItems();
1830         size_t tempItems = getNumTempItems();
1831         addStat("num_items", numItems, add_stat, c);
1832         addStat("num_temp_items", tempItems, add_stat, c);
1833         addStat("num_non_resident", getNumNonResidentItems(), add_stat, c);
1834         addStat("ht_memory", ht.memorySize(), add_stat, c);
1835         addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
1836         addStat("ht_cache_size", ht.cacheSize.load(), add_stat, c);
1837         addStat("ht_size", ht.getSize(), add_stat, c);
1838         addStat("num_ejects", ht.getNumEjects(), add_stat, c);
1839         addStat("ops_create", opsCreate.load(), add_stat, c);
1840         addStat("ops_update", opsUpdate.load(), add_stat, c);
1841         addStat("ops_delete", opsDelete.load(), add_stat, c);
1842         addStat("ops_reject", opsReject.load(), add_stat, c);
1843         addStat("queue_size", dirtyQueueSize.load(), add_stat, c);
1844         addStat("backfill_queue_size", getBackfillSize(), add_stat, c);
1845         addStat("queue_memory", dirtyQueueMem.load(), add_stat, c);
1846         addStat("queue_fill", dirtyQueueFill.load(), add_stat, c);
1847         addStat("queue_drain", dirtyQueueDrain.load(), add_stat, c);
1848         addStat("queue_age", getQueueAge(), add_stat, c);
1849         addStat("pending_writes", dirtyQueuePendingWrites.load(), add_stat, c);
1850
1851         addStat("high_seqno", getHighSeqno(), add_stat, c);
1852         addStat("uuid", failovers->getLatestUUID(), add_stat, c);
1853         addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
1854         addStat("bloom_filter", getFilterStatusString().data(),
1855                 add_stat, c);
1856         addStat("bloom_filter_size", getFilterSize(), add_stat, c);
1857         addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
1858         addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
1859         addStat("hp_vb_req_size", getHighPriorityChkSize(), add_stat, c);
1860         hlc.addStats(statPrefix, add_stat, c);
1861     }
1862 }
1863
1864 void VBucket::decrDirtyQueueMem(size_t decrementBy)
1865 {
1866     size_t oldVal, newVal;
1867     do {
1868         oldVal = dirtyQueueMem.load(std::memory_order_relaxed);
1869         if (oldVal < decrementBy) {
1870             newVal = 0;
1871         } else {
1872             newVal = oldVal - decrementBy;
1873         }
1874     } while (!dirtyQueueMem.compare_exchange_strong(oldVal, newVal));
1875 }
1876
1877 void VBucket::decrDirtyQueueAge(uint32_t decrementBy)
1878 {
1879     uint64_t oldVal, newVal;
1880     do {
1881         oldVal = dirtyQueueAge.load(std::memory_order_relaxed);
1882         if (oldVal < decrementBy) {
1883             newVal = 0;
1884         } else {
1885             newVal = oldVal - decrementBy;
1886         }
1887     } while (!dirtyQueueAge.compare_exchange_strong(oldVal, newVal));
1888 }
1889
1890 void VBucket::decrDirtyQueuePendingWrites(size_t decrementBy)
1891 {
1892     size_t oldVal, newVal;
1893     do {
1894         oldVal = dirtyQueuePendingWrites.load(std::memory_order_relaxed);
1895         if (oldVal < decrementBy) {
1896             newVal = 0;
1897         } else {
1898             newVal = oldVal - decrementBy;
1899         }
1900     } while (!dirtyQueuePendingWrites.compare_exchange_strong(oldVal, newVal));
1901 }
1902
1903 std::pair<MutationStatus, VBNotifyCtx> VBucket::processSet(
1904         const HashTable::HashBucketLock& hbl,
1905         StoredValue*& v,
1906         Item& itm,
1907         uint64_t cas,
1908         bool allowExisting,
1909         bool hasMetaData,
1910         const VBQueueItemCtx* queueItmCtx,
1911         bool maybeKeyExists,
1912         bool isReplication) {
1913     if (!hbl.getHTLock()) {
1914         throw std::invalid_argument(
1915                 "VBucket::processSet: htLock not held for "
1916                 "VBucket " +
1917                 std::to_string(getId()));
1918     }
1919
1920     if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
1921         return {MutationStatus::NoMem, VBNotifyCtx()};
1922     }
1923
1924     if (cas && eviction == FULL_EVICTION && maybeKeyExists) {
1925         if (!v || v->isTempInitialItem()) {
1926             return {MutationStatus::NeedBgFetch, VBNotifyCtx()};
1927         }
1928     }
1929
1930     /*
1931      * prior to checking for the lock, we should check if this object
1932      * has expired. If so, then check if CAS value has been provided
1933      * for this set op. In this case the operation should be denied since
1934      * a cas operation for a key that doesn't exist is not a very cool
1935      * thing to do. See MB 3252
1936      */
1937     if (v && v->isExpired(ep_real_time()) && !hasMetaData) {
1938         if (v->isLocked(ep_current_time())) {
1939             v->unlock();
1940         }
1941         if (cas) {
1942             /* item has expired and cas value provided. Deny ! */
1943             return {MutationStatus::NotFound, VBNotifyCtx()};
1944         }
1945     }
1946
1947     if (v) {
1948         if (!allowExisting && !v->isTempItem()) {
1949             return {MutationStatus::InvalidCas, VBNotifyCtx()};
1950         }
1951         if (v->isLocked(ep_current_time())) {
1952             /*
1953              * item is locked, deny if there is cas value mismatch
1954              * or no cas value is provided by the user
1955              */
1956             if (cas != v->getCas()) {
1957                 return {MutationStatus::IsLocked, VBNotifyCtx()};
1958             }
1959             /* allow operation*/
1960             v->unlock();
1961         } else if (cas && cas != v->getCas()) {
1962             if (v->isTempNonExistentItem()) {
1963                 // This is a temporary item which marks a key as non-existent;
1964                 // therefore specifying a non-matching CAS should be exposed
1965                 // as item not existing.
1966                 return {MutationStatus::NotFound, VBNotifyCtx()};
1967             }
1968             if ((v->isTempDeletedItem() || v->isDeleted()) && !itm.isDeleted()) {
1969                 // Existing item is deleted, and we are not replacing it with
1970                 // a (different) deleted value - return not existing.
1971                 return {MutationStatus::NotFound, VBNotifyCtx()};
1972             }
1973             // None of the above special cases; the existing item cannot be
1974             // modified with the specified CAS.
1975             return {MutationStatus::InvalidCas, VBNotifyCtx()};
1976         }
1977         if (!hasMetaData) {
1978             itm.setRevSeqno(v->getRevSeqno() + 1);
1979             /* MB-23530: We must ensure that a replace operation (i.e.
1980              * set with a CAS) /fails/ if the old document is deleted; it
1981              * logically "doesn't exist". However, if the new value is deleted
1982              * this op is a /delete/ with a CAS and we must permit a
1983              * deleted -> deleted transition for Deleted Bodies.
1984              */
1985             if (cas && (v->isDeleted() || v->isTempDeletedItem()) &&
1986                 !itm.isDeleted()) {
1987                 return {MutationStatus::NotFound, VBNotifyCtx()};
1988             }
1989         }
1990
1991         MutationStatus status;
1992         VBNotifyCtx notifyCtx;
1993         std::tie(v, status, notifyCtx) =
1994                 updateStoredValue(hbl, *v, itm, queueItmCtx);
1995         return {status, notifyCtx};
1996     } else if (cas != 0) {
1997         return {MutationStatus::NotFound, VBNotifyCtx()};
1998     } else {
1999         VBNotifyCtx notifyCtx;
2000         std::tie(v, notifyCtx) = addNewStoredValue(hbl, itm, queueItmCtx);
2001         if (!hasMetaData) {
2002             updateRevSeqNoOfNewStoredValue(*v);
2003             itm.setRevSeqno(v->getRevSeqno());
2004         }
2005         return {MutationStatus::WasClean, notifyCtx};
2006     }
2007 }
2008
2009 std::pair<AddStatus, VBNotifyCtx> VBucket::processAdd(
2010         const HashTable::HashBucketLock& hbl,
2011         StoredValue*& v,
2012         Item& itm,
2013         bool maybeKeyExists,
2014         bool isReplication,
2015         const VBQueueItemCtx* queueItmCtx) {
2016     if (!hbl.getHTLock()) {
2017         throw std::invalid_argument(
2018                 "VBucket::processAdd: htLock not held for "
2019                 "VBucket " +
2020                 std::to_string(getId()));
2021     }
2022
2023     if (v && !v->isDeleted() && !v->isExpired(ep_real_time()) &&
2024         !v->isTempItem()) {
2025         return {AddStatus::Exists, VBNotifyCtx()};
2026     }
2027     if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
2028         return {AddStatus::NoMem, VBNotifyCtx()};
2029     }
2030
2031     std::pair<AddStatus, VBNotifyCtx> rv = {AddStatus::Success, VBNotifyCtx()};
2032
2033     if (v) {
2034         if (v->isTempInitialItem() && eviction == FULL_EVICTION &&
2035             maybeKeyExists) {
2036             // Need to figure out if an item exists on disk
2037             return {AddStatus::BgFetch, VBNotifyCtx()};
2038         }
2039
2040         rv.first = (v->isDeleted() || v->isExpired(ep_real_time()))
2041                            ? AddStatus::UnDel
2042                            : AddStatus::Success;
2043
2044         if (v->isTempDeletedItem()) {
2045             itm.setRevSeqno(v->getRevSeqno() + 1);
2046         } else {
2047             itm.setRevSeqno(ht.getMaxDeletedRevSeqno() + 1);
2048         }
2049
2050         if (!v->isTempItem()) {
2051             itm.setRevSeqno(v->getRevSeqno() + 1);
2052         }
2053
2054         std::tie(v, std::ignore, rv.second) =
2055                 updateStoredValue(hbl, *v, itm, queueItmCtx);
2056     } else {
2057         if (itm.getBySeqno() != StoredValue::state_temp_init) {
2058             if (eviction == FULL_EVICTION && maybeKeyExists) {
2059                 return {AddStatus::AddTmpAndBgFetch, VBNotifyCtx()};
2060             }
2061         }
2062         std::tie(v, rv.second) = addNewStoredValue(hbl, itm, queueItmCtx);
2063         updateRevSeqNoOfNewStoredValue(*v);
2064         itm.setRevSeqno(v->getRevSeqno());
2065         if (v->isTempItem()) {
2066             rv.first = AddStatus::BgFetch;
2067         }
2068     }
2069
2070     if (v->isTempItem()) {
2071         v->setNRUValue(MAX_NRU_VALUE);
2072     }
2073     return rv;
2074 }
2075
2076 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2077 VBucket::processSoftDelete(const HashTable::HashBucketLock& hbl,
2078                            StoredValue& v,
2079                            uint64_t cas,
2080                            const ItemMetaData& metadata,
2081                            const VBQueueItemCtx& queueItmCtx,
2082                            bool use_meta,
2083                            uint64_t bySeqno) {
2084     if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2085         return std::make_tuple(MutationStatus::NeedBgFetch, &v, VBNotifyCtx());
2086     }
2087
2088     if (v.isLocked(ep_current_time())) {
2089         if (cas != v.getCas()) {
2090             return std::make_tuple(MutationStatus::IsLocked, &v, VBNotifyCtx());
2091         }
2092         v.unlock();
2093     }
2094
2095     if (cas != 0 && cas != v.getCas()) {
2096         return std::make_tuple(MutationStatus::InvalidCas, &v, VBNotifyCtx());
2097     }
2098
2099     /* allow operation */
2100     v.unlock();
2101
2102     MutationStatus rv =
2103             v.isDirty() ? MutationStatus::WasDirty : MutationStatus::WasClean;
2104
2105     if (use_meta) {
2106         v.setCas(metadata.cas);
2107         v.setFlags(metadata.flags);
2108         v.setExptime(metadata.exptime);
2109     }
2110
2111     v.setRevSeqno(metadata.revSeqno);
2112     VBNotifyCtx notifyCtx;
2113     StoredValue* newSv;
2114     std::tie(newSv, notifyCtx) =
2115             softDeleteStoredValue(hbl,
2116                                   v,
2117                                   /*onlyMarkDeleted*/ false,
2118                                   queueItmCtx,
2119                                   bySeqno);
2120     ht.updateMaxDeletedRevSeqno(metadata.revSeqno);
2121     return std::make_tuple(rv, newSv, notifyCtx);
2122 }
2123
2124 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2125 VBucket::processExpiredItem(const HashTable::HashBucketLock& hbl,
2126                             StoredValue& v) {
2127     if (!hbl.getHTLock()) {
2128         throw std::invalid_argument(
2129                 "VBucket::processExpiredItem: htLock not held for VBucket " +
2130                 std::to_string(getId()));
2131     }
2132
2133     if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2134         return std::make_tuple(MutationStatus::NeedBgFetch,
2135                                &v,
2136                                queueDirty(v,
2137                                           GenerateBySeqno::Yes,
2138                                           GenerateCas::Yes,
2139                                           /*isBackfillItem*/ false));
2140     }
2141
2142     /* If the datatype is XATTR, mark the item as deleted
2143      * but don't delete the value as system xattrs can
2144      * still be queried by mobile clients even after
2145      * deletion.
2146      * TODO: The current implementation is inefficient
2147      * but functionally correct and for performance reasons
2148      * only the system xattrs need to be stored.
2149      */
2150     value_t value = v.getValue();
2151     bool onlyMarkDeleted =
2152             value && mcbp::datatype::is_xattr(value->getDataType());
2153     v.setRevSeqno(v.getRevSeqno() + 1);
2154     VBNotifyCtx notifyCtx;
2155     StoredValue* newSv;
2156     std::tie(newSv, notifyCtx) =
2157             softDeleteStoredValue(hbl,
2158                                   v,
2159                                   onlyMarkDeleted,
2160                                   VBQueueItemCtx(GenerateBySeqno::Yes,
2161                                                  GenerateCas::Yes,
2162                                                  TrackCasDrift::No,
2163                                                  /*isBackfillItem*/ false,
2164                                                  nullptr /* no pre link */),
2165                                   v.getBySeqno());
2166     ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno() + 1);
2167     return std::make_tuple(MutationStatus::NotFound, newSv, notifyCtx);
2168 }
2169
2170 bool VBucket::deleteStoredValue(const HashTable::HashBucketLock& hbl,
2171                                 StoredValue& v) {
2172     if (!v.isDeleted() && v.isLocked(ep_current_time())) {
2173         return false;
2174     }
2175
2176     /* StoredValue deleted here. If any other in-memory data structures are
2177        using the StoredValue intrusively then they must have handled the delete
2178        by this point */
2179     ht.unlocked_del(hbl, v.getKey());
2180     return true;
2181 }
2182
2183 AddStatus VBucket::addTempStoredValue(const HashTable::HashBucketLock& hbl,
2184                                       const DocKey& key,
2185                                       bool isReplication) {
2186     uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_RAW_BYTES};
2187     static_assert(sizeof(ext_meta) == 1,
2188                   "VBucket::addTempStoredValue(): expected "
2189                   "EXT_META_LEN to be 1");
2190     Item itm(key,
2191              /*flags*/ 0,
2192              /*exp*/ 0,
2193              /*data*/ NULL,
2194              /*size*/ 0,
2195              ext_meta,
2196              sizeof(ext_meta),
2197              0,
2198              StoredValue::state_temp_init);
2199
2200     /* if a temp item for a possibly deleted, set it non-resident by resetting
2201        the value cuz normally a new item added is considered resident which
2202        does not apply for temp item. */
2203     StoredValue* v = nullptr;
2204     return processAdd(hbl, v, itm, true, isReplication, nullptr).first;
2205 }
2206
2207 void VBucket::notifyNewSeqno(const VBNotifyCtx& notifyCtx) {
2208     if (newSeqnoCb) {
2209         newSeqnoCb->callback(getId(), notifyCtx);
2210     }
2211 }
2212
2213 /*
2214  * Queue the item to the checkpoint and return the seqno the item was
2215  * allocated.
2216  */
2217 int64_t VBucket::queueItem(Item* item, OptionalSeqno seqno) {
2218     item->setVBucketId(id);
2219     queued_item qi(item);
2220     checkpointManager.queueDirty(
2221             *this,
2222             qi,
2223             seqno ? GenerateBySeqno::No : GenerateBySeqno::Yes,
2224             GenerateCas::Yes,
2225             nullptr /* No pre link step as this is for system events */);
2226     VBNotifyCtx notifyCtx;
2227     // If the seqno is initialized, skip replication notification
2228     notifyCtx.notifyReplication = !seqno.is_initialized();
2229     notifyCtx.notifyFlusher = true;
2230     notifyCtx.bySeqno = qi->getBySeqno();
2231     notifyNewSeqno(notifyCtx);
2232     return qi->getBySeqno();
2233 }
2234
2235 VBNotifyCtx VBucket::queueDirty(StoredValue& v,
2236                                 const VBQueueItemCtx& queueItmCtx) {
2237     if (queueItmCtx.trackCasDrift == TrackCasDrift::Yes) {
2238         setMaxCasAndTrackDrift(v.getCas());
2239     }
2240     return queueDirty(v,
2241                       queueItmCtx.genBySeqno,
2242                       queueItmCtx.genCas,
2243                       queueItmCtx.isBackfillItem,
2244                       queueItmCtx.preLinkDocumentContext);
2245 }
2246
2247 void VBucket::updateRevSeqNoOfNewStoredValue(StoredValue& v) {
2248     /**
2249      * Possibly, this item is being recreated. Conservatively assign it
2250      * a seqno that is greater than the greatest seqno of all deleted
2251      * items seen so far.
2252      */
2253     uint64_t seqno = ht.getMaxDeletedRevSeqno();
2254     if (!v.isTempItem()) {
2255         ++seqno;
2256     }
2257     v.setRevSeqno(seqno);
2258 }
2259
2260 void VBucket::addHighPriorityVBEntry(uint64_t seqnoOrChkId,
2261                                      const void* cookie,
2262                                      HighPriorityVBNotify reqType) {
2263     std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2264     hpVBReqs.push_back(HighPriorityVBEntry(cookie, seqnoOrChkId, reqType));
2265     numHpVBReqs.store(hpVBReqs.size());
2266
2267     LOG(EXTENSION_LOG_NOTICE,
2268         "Added high priority async request %s "
2269         "for vb:%" PRIu16 ", Check for:%" PRIu64 ", "
2270         "Persisted upto:%" PRIu64 ", cookie:%p",
2271         to_string(reqType).c_str(),
2272         getId(),
2273         seqnoOrChkId,
2274         getPersistenceSeqno(),
2275         cookie);
2276 }
2277
2278 std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifications(
2279         EventuallyPersistentEngine& engine,
2280         uint64_t idNum,
2281         HighPriorityVBNotify notifyType) {
2282     std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2283     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2284
2285     auto entry = hpVBReqs.begin();
2286
2287     while (entry != hpVBReqs.end()) {
2288         if (notifyType != entry->reqType) {
2289             ++entry;
2290             continue;
2291         }
2292
2293         std::string logStr(to_string(notifyType));
2294
2295         hrtime_t wall_time(gethrtime() - entry->start);
2296         size_t spent = wall_time / 1000000000;
2297         if (entry->id <= idNum) {
2298             toNotify[entry->cookie] = ENGINE_SUCCESS;
2299             stats.chkPersistenceHisto.add(wall_time / 1000);
2300             adjustCheckpointFlushTimeout(wall_time / 1000000000);
2301             LOG(EXTENSION_LOG_NOTICE,
2302                 "Notified the completion of %s "
2303                 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2304                 ", "
2305                 "Persisted upto: %" PRIu64 ", cookie %p",
2306                 logStr.c_str(),
2307                 getId(),
2308                 entry->id,
2309                 idNum,
2310                 entry->cookie);
2311             entry = hpVBReqs.erase(entry);
2312         } else if (spent > getCheckpointFlushTimeout()) {
2313             adjustCheckpointFlushTimeout(spent);
2314             engine.storeEngineSpecific(entry->cookie, NULL);
2315             toNotify[entry->cookie] = ENGINE_TMPFAIL;
2316             LOG(EXTENSION_LOG_WARNING,
2317                 "Notified the timeout on %s "
2318                 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2319                 ", "
2320                 "Persisted upto: %" PRIu64 ", cookie %p",
2321                 logStr.c_str(),
2322                 getId(),
2323                 entry->id,
2324                 idNum,
2325                 entry->cookie);
2326             entry = hpVBReqs.erase(entry);
2327         } else {
2328             ++entry;
2329         }
2330     }
2331     numHpVBReqs.store(hpVBReqs.size());
2332     return toNotify;
2333 }
2334
2335 std::map<const void*, ENGINE_ERROR_CODE> VBucket::tmpFailAndGetAllHpNotifies(
2336         EventuallyPersistentEngine& engine) {
2337     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2338
2339     LockHolder lh(hpVBReqsMutex);
2340
2341     for (auto& entry : hpVBReqs) {
2342         toNotify[entry.cookie] = ENGINE_TMPFAIL;
2343         engine.storeEngineSpecific(entry.cookie, NULL);
2344     }
2345     hpVBReqs.clear();
2346
2347     return toNotify;
2348 }
2349
2350 void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
2351     size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
2352
2353     if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
2354         chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
2355     } else if (wall_time <= middle) {
2356         chkFlushTimeout = middle;
2357     } else {
2358         chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
2359     }
2360 }
2361
2362 size_t VBucket::getCheckpointFlushTimeout() {
2363     return chkFlushTimeout;
2364 }
2365
2366 std::unique_ptr<Item> VBucket::pruneXattrDocument(
2367         StoredValue& v, const ItemMetaData& itemMeta) {
2368     // Need to take a copy of the value, prune it, and add it back
2369
2370     // Create work-space document
2371     std::vector<uint8_t> workspace(v.getValue()->vlength());
2372     std::copy_n(v.getValue()->getData(),
2373                 v.getValue()->vlength(),
2374                 workspace.begin());
2375
2376     // Now attach to the XATTRs in the document
2377     auto sz = cb::xattr::get_body_offset(
2378             {reinterpret_cast<char*>(workspace.data()), workspace.size()});
2379
2380     cb::xattr::Blob xattr({workspace.data(), sz});
2381     xattr.prune_user_keys();
2382
2383     auto prunedXattrs = xattr.finalize();
2384
2385     if (prunedXattrs.size()) {
2386         // Something remains - Create a Blob and copy-in just the XATTRs
2387         auto newValue =
2388                 Blob::New(reinterpret_cast<const char*>(prunedXattrs.data()),
2389                           prunedXattrs.size(),
2390                           const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(
2391                                   v.getValue()->getExtMeta())),
2392                           v.getValue()->getExtLen());
2393
2394         return std::make_unique<Item>(v.getKey(),
2395                                       itemMeta.flags,
2396                                       itemMeta.exptime,
2397                                       newValue,
2398                                       itemMeta.cas,
2399                                       v.getBySeqno(),
2400                                       getId(),
2401                                       itemMeta.revSeqno);
2402     } else {
2403         return {};
2404     }
2405 }