1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2015 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
27 #include "bgfetcher.h"
28 #include "conflict_resolution.h"
29 #include "ep_engine.h"
31 #include "failover-table.h"
33 #include "pre_link_document_context.h"
34 #include "vbucketdeletiontask.h"
36 #define STATWRITER_NAMESPACE vbucket
37 #include "statwriter.h"
38 #undef STATWRITER_NAMESPACE
39 #include "stored_value_factories.h"
43 #include <xattr/blob.h>
44 #include <xattr/utils.h>
47 const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
48 const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
50 /* Statics definitions */
51 std::atomic<size_t> VBucket::chkFlushTimeout(MIN_CHK_FLUSH_TIMEOUT);
53 VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
54 std::vector<uint16_t> tmp(acceptable.size() + other.size());
55 std::vector<uint16_t>::iterator end;
56 end = std::set_symmetric_difference(acceptable.begin(),
58 other.acceptable.begin(),
59 other.acceptable.end(),
61 return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
64 VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
66 std::vector<uint16_t> tmp(acceptable.size() + other.size());
67 std::vector<uint16_t>::iterator end;
69 end = std::set_intersection(acceptable.begin(), acceptable.end(),
70 other.acceptable.begin(),
71 other.acceptable.end(),
73 return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
76 static bool isRange(std::set<uint16_t>::const_iterator it,
77 const std::set<uint16_t>::const_iterator &end,
81 for (uint16_t val = *it;
82 it != end && (val + length) == *it;
92 std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
94 std::set<uint16_t>::const_iterator it;
96 if (filter.acceptable.empty()) {
99 bool needcomma = false;
101 for (it = filter.acceptable.begin();
102 it != filter.acceptable.end();
109 if (isRange(it, filter.acceptable.end(), length)) {
110 std::set<uint16_t>::iterator last = it;
111 for (size_t i = 0; i < length; ++i) {
114 out << "[" << *it << "," << *last << "]";
127 const vbucket_state_t VBucket::ACTIVE =
128 static_cast<vbucket_state_t>(htonl(vbucket_state_active));
129 const vbucket_state_t VBucket::REPLICA =
130 static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
131 const vbucket_state_t VBucket::PENDING =
132 static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
133 const vbucket_state_t VBucket::DEAD =
134 static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
136 VBucket::VBucket(id_type i,
137 vbucket_state_t newState,
139 CheckpointConfig& chkConfig,
141 uint64_t lastSnapStart,
142 uint64_t lastSnapEnd,
143 std::unique_ptr<FailoverTable> table,
144 std::shared_ptr<Callback<id_type> > flusherCb,
145 std::unique_ptr<AbstractStoredValueFactory> valFact,
146 NewSeqnoCallback newSeqnoCb,
147 Configuration& config,
148 item_eviction_policy_t evictionPolicy,
149 vbucket_state_t initState,
152 const std::string& collectionsManifest)
153 : ht(st, std::move(valFact), config.getHtSize(), config.getHtLocks()),
154 checkpointManager(st,
161 failovers(std::move(table)),
171 dirtyQueuePendingWrites(0),
174 eviction(evictionPolicy),
180 initialState(initState),
181 purge_seqno(purgeSeqno),
182 takeover_backed_up(false),
183 persisted_snapshot_start(lastSnapStart),
184 persisted_snapshot_end(lastSnapEnd),
185 rollbackItemCount(0),
187 std::chrono::microseconds(config.getHlcDriftAheadThresholdUs()),
188 std::chrono::microseconds(config.getHlcDriftBehindThresholdUs())),
189 statPrefix("vb_" + std::to_string(i)),
190 persistenceCheckpointId(0),
191 bucketCreation(false),
192 deferredDeletion(false),
193 deferredDeletionCookie(nullptr),
194 newSeqnoCb(std::move(newSeqnoCb)),
195 manifest(collectionsManifest) {
196 if (config.getConflictResolutionType().compare("lww") == 0) {
197 conflictResolver.reset(new LastWriteWinsResolution());
199 conflictResolver.reset(new RevisionSeqnoResolution());
202 backfill.isBackfillPhase = false;
204 stats.memOverhead->fetch_add(sizeof(VBucket)
205 + ht.memorySize() + sizeof(CheckpointManager));
206 LOG(EXTENSION_LOG_NOTICE,
207 "VBucket: created vbucket:%" PRIu16 " with state:%s "
209 "lastSeqno:%" PRIu64 " "
210 "lastSnapshot:{%" PRIu64 ",%" PRIu64 "} "
211 "persisted_snapshot:{%" PRIu64 ",%" PRIu64 "} "
213 id, VBucket::toString(state), VBucket::toString(initialState),
214 lastSeqno, lastSnapStart, lastSnapEnd,
215 persisted_snapshot_start, persisted_snapshot_end,
219 VBucket::~VBucket() {
220 if (!pendingOps.empty()) {
221 LOG(EXTENSION_LOG_WARNING,
222 "~Vbucket(): vbucket:%" PRIu16 " has %ld pending ops",
227 stats.diskQueueSize.fetch_sub(dirtyQueueSize.load());
228 stats.vbBackfillQueueSize.fetch_sub(getBackfillSize());
230 // Clear out the bloomfilter(s)
233 stats.memOverhead->fetch_sub(sizeof(VBucket) + ht.memorySize() +
234 sizeof(CheckpointManager));
236 LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
239 void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
240 ENGINE_ERROR_CODE code) {
241 std::unique_lock<std::mutex> lh(pendingOpLock);
243 if (pendingOpsStart > 0) {
244 hrtime_t now = gethrtime();
245 if (now > pendingOpsStart) {
246 hrtime_t d = (now - pendingOpsStart) / 1000;
247 stats.pendingOpsHisto.add(d);
248 atomic_setIfBigger(stats.pendingOpsMaxDuration, d);
255 stats.pendingOps.fetch_sub(pendingOps.size());
256 atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
258 while (!pendingOps.empty()) {
259 const void *pendingOperation = pendingOps.back();
260 pendingOps.pop_back();
261 // We don't want to hold the pendingOpLock when
262 // calling notifyIOComplete.
264 engine.notifyIOComplete(pendingOperation, code);
268 LOG(EXTENSION_LOG_INFO,
269 "Fired pendings ops for vbucket %" PRIu16 " in state %s\n",
270 id, VBucket::toString(state));
273 void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
275 if (state == vbucket_state_active) {
276 fireAllOps(engine, ENGINE_SUCCESS);
277 } else if (state == vbucket_state_pending) {
280 fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
284 void VBucket::setState(vbucket_state_t to) {
285 vbucket_state_t oldstate;
287 WriterLockHolder wlh(stateLock);
290 if (to == vbucket_state_active &&
291 checkpointManager.getOpenCheckpointId() < 2) {
292 checkpointManager.setOpenCheckpointId(2);
295 LOG(EXTENSION_LOG_NOTICE,
296 "VBucket::setState: transitioning vbucket:%" PRIu16 " from:%s to:%s",
297 id, VBucket::toString(oldstate), VBucket::toString(to));
303 vbucket_state VBucket::getVBucketState() const {
304 auto persisted_range = getPersistedSnapshot();
306 return vbucket_state{getState(),
307 getPersistenceCheckpointId(), 0, getHighSeqno(),
309 persisted_range.start, persisted_range.end,
310 getMaxCas(), failovers->toJSON()};
315 void VBucket::doStatsForQueueing(const Item& qi, size_t itemBytes)
318 dirtyQueueMem.fetch_add(sizeof(Item));
320 dirtyQueueAge.fetch_add(qi.getQueuedTime());
321 dirtyQueuePendingWrites.fetch_add(itemBytes);
324 void VBucket::doStatsForFlushing(const Item& qi, size_t itemBytes) {
326 decrDirtyQueueMem(sizeof(Item));
328 decrDirtyQueueAge(qi.getQueuedTime());
329 decrDirtyQueuePendingWrites(itemBytes);
332 void VBucket::incrMetaDataDisk(const Item& qi) {
333 metaDataDisk.fetch_add(qi.getKey().size() + sizeof(ItemMetaData));
336 void VBucket::decrMetaDataDisk(const Item& qi) {
337 // assume couchstore remove approx this much data from disk
338 metaDataDisk.fetch_sub((qi.getKey().size() + sizeof(ItemMetaData)));
341 void VBucket::resetStats() {
347 stats.diskQueueSize.fetch_sub(dirtyQueueSize.exchange(0));
348 dirtyQueueMem.store(0);
349 dirtyQueueFill.store(0);
350 dirtyQueueAge.store(0);
351 dirtyQueuePendingWrites.store(0);
352 dirtyQueueDrain.store(0);
357 template <typename T>
358 void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
360 std::string stat = statPrefix;
362 add_prefixed_stat(statPrefix, nm, val, add_stat, c);
364 add_casted_stat(statPrefix.data(), val, add_stat, c);
368 void VBucket::handlePreExpiry(StoredValue& v) {
369 value_t value = v.getValue();
371 std::unique_ptr<Item> itm(v.toItem(false, id));
373 EventuallyPersistentEngine* engine = ObjectRegistry::getCurrentEngine();
374 itm_info = itm->toItemInfo(failovers->getLatestUUID());
375 value_t new_val(Blob::Copy(*value));
376 itm->setValue(new_val);
378 SERVER_HANDLE_V1* sapi = engine->getServerApi();
379 /* TODO: In order to minimize allocations, the callback needs to
380 * allocate an item whose value size will be exactly the size of the
381 * value after pre-expiry is performed.
383 if (sapi->document->pre_expiry(itm_info)) {
384 char* extMeta = const_cast<char *>(v.getValue()->getExtMeta());
385 Item new_item(v.getKey(), v.getFlags(), v.getExptime(),
386 itm_info.value[0].iov_base, itm_info.value[0].iov_len,
387 reinterpret_cast<uint8_t*>(extMeta),
388 v.getValue()->getExtLen(), v.getCas(),
389 v.getBySeqno(), id, v.getRevSeqno(),
392 new_item.setDeleted();
393 v.setValue(new_item, ht);
398 size_t VBucket::getNumNonResidentItems() const {
399 if (eviction == VALUE_ONLY) {
400 return ht.getNumInMemoryNonResItems();
402 size_t num_items = ht.getNumItems();
403 size_t num_res_items = ht.getNumInMemoryItems() -
404 ht.getNumInMemoryNonResItems();
405 return num_items > num_res_items ? (num_items - num_res_items) : 0;
410 uint64_t VBucket::getPersistenceCheckpointId() const {
411 return persistenceCheckpointId.load();
414 void VBucket::setPersistenceCheckpointId(uint64_t checkpointId) {
415 persistenceCheckpointId.store(checkpointId);
418 void VBucket::markDirty(const DocKey& key) {
419 auto hbl = ht.getLockedBucket(key);
420 StoredValue* v = ht.unlocked_find(
421 key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::Yes);
425 LOG(EXTENSION_LOG_WARNING, "markDirty: Error marking dirty, a key "
426 "is missing from vb:%" PRIu16, id);
430 bool VBucket::isResidentRatioUnderThreshold(float threshold) {
431 if (eviction != FULL_EVICTION) {
432 throw std::invalid_argument("VBucket::isResidentRatioUnderThreshold: "
433 "policy (which is " + std::to_string(eviction) +
434 ") must be FULL_EVICTION");
436 size_t num_items = getNumItems();
437 size_t num_non_resident_items = getNumNonResidentItems();
438 if (threshold >= ((float)(num_items - num_non_resident_items) /
446 void VBucket::createFilter(size_t key_count, double probability) {
447 // Create the actual bloom filter upon vbucket creation during
451 LockHolder lh(bfMutex);
452 if (bFilter == nullptr && tempFilter == nullptr) {
453 bFilter = std::make_unique<BloomFilter>(key_count, probability,
456 LOG(EXTENSION_LOG_WARNING, "(vb %" PRIu16 ") Bloom filter / Temp filter"
457 " already exist!", id);
461 void VBucket::initTempFilter(size_t key_count, double probability) {
462 // Create a temp bloom filter with status as COMPACTING,
463 // if the main filter is found to exist, set its state to
464 // COMPACTING as well.
465 LockHolder lh(bfMutex);
466 tempFilter = std::make_unique<BloomFilter>(key_count, probability,
469 bFilter->setStatus(BFILTER_COMPACTING);
473 void VBucket::addToFilter(const DocKey& key) {
474 LockHolder lh(bfMutex);
476 bFilter->addKey(key);
479 // If the temp bloom filter is not found to be NULL,
480 // it means that compaction is running on the particular
481 // vbucket. Therefore add the key to the temp filter as
482 // well, as once compaction completes the temp filter
483 // will replace the main bloom filter.
485 tempFilter->addKey(key);
489 bool VBucket::maybeKeyExistsInFilter(const DocKey& key) {
490 LockHolder lh(bfMutex);
492 return bFilter->maybeKeyExists(key);
494 // If filter doesn't exist, allow the BgFetch to go through.
499 bool VBucket::isTempFilterAvailable() {
500 LockHolder lh(bfMutex);
502 (tempFilter->getStatus() == BFILTER_COMPACTING ||
503 tempFilter->getStatus() == BFILTER_ENABLED)) {
510 void VBucket::addToTempFilter(const DocKey& key) {
511 // Keys will be added to only the temp filter during
513 LockHolder lh(bfMutex);
515 tempFilter->addKey(key);
519 void VBucket::swapFilter() {
520 // Delete the main bloom filter and replace it with
521 // the temp filter that was populated during compaction,
522 // only if the temp filter's state is found to be either at
523 // COMPACTING or ENABLED (if in the case the user enables
524 // bloomfilters for some reason while compaction was running).
525 // Otherwise, it indicates that the filter's state was
526 // possibly disabled during compaction, therefore clear out
527 // the temp filter. If it gets enabled at some point, a new
528 // bloom filter will be made available after the next
531 LockHolder lh(bfMutex);
535 if (tempFilter->getStatus() == BFILTER_COMPACTING ||
536 tempFilter->getStatus() == BFILTER_ENABLED) {
537 bFilter = std::move(tempFilter);
538 bFilter->setStatus(BFILTER_ENABLED);
544 void VBucket::clearFilter() {
545 LockHolder lh(bfMutex);
550 void VBucket::setFilterStatus(bfilter_status_t to) {
551 LockHolder lh(bfMutex);
553 bFilter->setStatus(to);
556 tempFilter->setStatus(to);
560 std::string VBucket::getFilterStatusString() {
561 LockHolder lh(bfMutex);
563 return bFilter->getStatusString();
564 } else if (tempFilter) {
565 return tempFilter->getStatusString();
567 return "DOESN'T EXIST";
571 size_t VBucket::getFilterSize() {
572 LockHolder lh(bfMutex);
574 return bFilter->getFilterSize();
580 size_t VBucket::getNumOfKeysInFilter() {
581 LockHolder lh(bfMutex);
583 return bFilter->getNumOfKeysInFilter();
589 VBNotifyCtx VBucket::queueDirty(
591 const GenerateBySeqno generateBySeqno,
592 const GenerateCas generateCas,
593 const bool isBackfillItem,
594 PreLinkDocumentContext* preLinkDocumentContext) {
595 VBNotifyCtx notifyCtx;
597 queued_item qi(v.toItem(false, getId()));
599 if (isBackfillItem) {
600 queueBackfillItem(qi, generateBySeqno);
601 notifyCtx.notifyFlusher = true;
602 /* During backfill on a TAP receiver we need to update the snapshot
603 range in the checkpoint. Has to be done here because in case of TAP
604 backfill, above, we use vb.queueBackfillItem() instead of
605 vb.checkpointManager.queueDirty() */
606 if (generateBySeqno == GenerateBySeqno::Yes) {
607 checkpointManager.resetSnapshotRange();
610 notifyCtx.notifyFlusher =
611 checkpointManager.queueDirty(*this,
615 preLinkDocumentContext);
616 notifyCtx.notifyReplication = true;
617 if (GenerateCas::Yes == generateCas) {
618 v.setCas(qi->getCas());
622 v.setBySeqno(qi->getBySeqno());
623 notifyCtx.bySeqno = qi->getBySeqno();
628 StoredValue* VBucket::fetchValidValue(HashTable::HashBucketLock& hbl,
630 WantsDeleted wantsDeleted,
631 TrackReference trackReference,
632 QueueExpired queueExpired) {
633 if (!hbl.getHTLock()) {
634 throw std::logic_error(
635 "Hash bucket lock not held in "
636 "VBucket::fetchValidValue() for hash bucket: " +
637 std::to_string(hbl.getBucketNum()) + "for key: " +
638 std::string(reinterpret_cast<const char*>(key.data()),
641 StoredValue* v = ht.unlocked_find(
642 key, hbl.getBucketNum(), wantsDeleted, trackReference);
643 if (v && !v->isDeleted() && !v->isTempItem()) {
644 // In the deleted case, we ignore expiration time.
645 if (v->isExpired(ep_real_time())) {
646 if (getState() != vbucket_state_active) {
647 return wantsDeleted == WantsDeleted::Yes ? v : NULL;
650 // queueDirty only allowed on active VB
651 if (queueExpired == QueueExpired::Yes &&
652 getState() == vbucket_state_active) {
653 incExpirationStat(ExpireBy::Access);
655 VBNotifyCtx notifyCtx;
656 std::tie(std::ignore, v, notifyCtx) =
657 processExpiredItem(hbl, *v);
658 notifyNewSeqno(notifyCtx);
660 return wantsDeleted == WantsDeleted::Yes ? v : NULL;
666 void VBucket::incExpirationStat(const ExpireBy source) {
668 case ExpireBy::Pager:
669 ++stats.expired_pager;
671 case ExpireBy::Compactor:
672 ++stats.expired_compactor;
674 case ExpireBy::Access:
675 ++stats.expired_access;
681 MutationStatus VBucket::setFromInternal(Item& itm) {
685 ENGINE_ERROR_CODE VBucket::set(Item& itm,
687 EventuallyPersistentEngine& engine,
688 const int bgFetchDelay) {
689 bool cas_op = (itm.getCas() != 0);
690 auto hbl = ht.getLockedBucket(itm.getKey());
691 StoredValue* v = ht.unlocked_find(itm.getKey(),
695 if (v && v->isLocked(ep_current_time()) &&
696 (getState() == vbucket_state_replica ||
697 getState() == vbucket_state_pending)) {
701 bool maybeKeyExists = true;
702 // If we didn't find a valid item, check Bloomfilter's prediction if in
703 // full eviction policy and for a CAS operation.
704 if ((v == nullptr || v->isTempInitialItem()) &&
705 (eviction == FULL_EVICTION) && (itm.getCas() != 0)) {
706 // Check Bloomfilter's prediction
707 if (!maybeKeyExistsInFilter(itm.getKey())) {
708 maybeKeyExists = false;
712 PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
713 VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
716 /*isBackfillItem*/ false,
717 &preLinkDocumentContext);
719 MutationStatus status;
720 VBNotifyCtx notifyCtx;
721 std::tie(status, notifyCtx) = processSet(hbl,
725 /*allowExisting*/ true,
726 /*hashMetaData*/ false,
730 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
732 case MutationStatus::NoMem:
735 case MutationStatus::InvalidCas:
736 ret = ENGINE_KEY_EEXISTS;
738 case MutationStatus::IsLocked:
741 case MutationStatus::NotFound:
743 ret = ENGINE_KEY_ENOENT;
747 case MutationStatus::WasDirty:
748 // Even if the item was dirty, push it into the vbucket's open
750 case MutationStatus::WasClean:
751 notifyNewSeqno(notifyCtx);
753 itm.setBySeqno(v->getBySeqno());
754 itm.setCas(v->getCas());
756 case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
760 // temp item is already created. Simply schedule a bg fetch job
761 hbl.getHTLock().unlock();
762 bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
763 return ENGINE_EWOULDBLOCK;
765 ret = addTempItemAndBGFetch(
766 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
774 ENGINE_ERROR_CODE VBucket::replace(Item& itm,
776 EventuallyPersistentEngine& engine,
777 const int bgFetchDelay) {
778 auto hbl = ht.getLockedBucket(itm.getKey());
779 StoredValue* v = ht.unlocked_find(itm.getKey(),
784 if (v->isDeleted() || v->isTempDeletedItem() ||
785 v->isTempNonExistentItem()) {
786 return ENGINE_KEY_ENOENT;
789 MutationStatus mtype;
790 VBNotifyCtx notifyCtx;
791 if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
792 mtype = MutationStatus::NeedBgFetch;
794 PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
795 VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
798 /*isBackfillItem*/ false,
799 &preLinkDocumentContext);
800 std::tie(mtype, notifyCtx) = processSet(hbl,
804 /*allowExisting*/ true,
805 /*hasMetaData*/ false,
809 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
811 case MutationStatus::NoMem:
814 case MutationStatus::IsLocked:
817 case MutationStatus::InvalidCas:
818 case MutationStatus::NotFound:
819 ret = ENGINE_NOT_STORED;
822 case MutationStatus::WasDirty:
823 // Even if the item was dirty, push it into the vbucket's open
825 case MutationStatus::WasClean:
826 notifyNewSeqno(notifyCtx);
828 itm.setBySeqno(v->getBySeqno());
829 itm.setCas(v->getCas());
831 case MutationStatus::NeedBgFetch: {
832 // temp item is already created. Simply schedule a bg fetch job
833 hbl.getHTLock().unlock();
834 bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
835 ret = ENGINE_EWOULDBLOCK;
842 if (eviction == VALUE_ONLY) {
843 return ENGINE_KEY_ENOENT;
846 if (maybeKeyExistsInFilter(itm.getKey())) {
847 return addTempItemAndBGFetch(
848 hbl, itm.getKey(), cookie, engine, bgFetchDelay, false);
850 // As bloomfilter predicted that item surely doesn't exist
851 // on disk, return ENOENT for replace().
852 return ENGINE_KEY_ENOENT;
857 ENGINE_ERROR_CODE VBucket::addBackfillItem(Item& itm,
858 const GenerateBySeqno genBySeqno) {
859 auto hbl = ht.getLockedBucket(itm.getKey());
860 StoredValue* v = ht.unlocked_find(itm.getKey(),
865 // Note that this function is only called on replica or pending vbuckets.
866 if (v && v->isLocked(ep_current_time())) {
870 VBQueueItemCtx queueItmCtx(genBySeqno,
873 /*isBackfillItem*/ true,
874 nullptr /* No pre link should happen */);
875 MutationStatus status;
876 VBNotifyCtx notifyCtx;
877 std::tie(status, notifyCtx) = processSet(hbl,
881 /*allowExisting*/ true,
882 /*hasMetaData*/ true,
885 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
887 case MutationStatus::NoMem:
890 case MutationStatus::InvalidCas:
891 case MutationStatus::IsLocked:
892 ret = ENGINE_KEY_EEXISTS;
894 case MutationStatus::WasDirty:
895 // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
896 // set correctly, and also the sequence numbers are ordered correctly.
898 case MutationStatus::NotFound:
900 case MutationStatus::WasClean: {
901 setMaxCas(v->getCas());
902 // we unlock ht lock here because we want to avoid potential lock
903 // inversions arising from notifyNewSeqno() call
904 hbl.getHTLock().unlock();
905 notifyNewSeqno(notifyCtx);
907 case MutationStatus::NeedBgFetch:
908 throw std::logic_error(
909 "VBucket::addBackfillItem: "
910 "SET on a non-active vbucket should not require a "
911 "bg_metadata_fetch.");
917 ENGINE_ERROR_CODE VBucket::setWithMeta(Item& itm,
921 EventuallyPersistentEngine& engine,
922 const int bgFetchDelay,
924 const bool allowExisting,
925 const GenerateBySeqno genBySeqno,
926 const GenerateCas genCas,
927 const bool isReplication) {
928 auto hbl = ht.getLockedBucket(itm.getKey());
929 StoredValue* v = ht.unlocked_find(itm.getKey(),
934 bool maybeKeyExists = true;
937 if (v->isTempInitialItem()) {
938 bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
939 return ENGINE_EWOULDBLOCK;
942 if (!(conflictResolver->resolve(*v,
946 ++stats.numOpsSetMetaResolutionFailed;
947 return ENGINE_KEY_EEXISTS;
950 if (maybeKeyExistsInFilter(itm.getKey())) {
951 return addTempItemAndBGFetch(hbl,
959 maybeKeyExists = false;
963 if (eviction == FULL_EVICTION) {
964 // Check Bloomfilter's prediction
965 if (!maybeKeyExistsInFilter(itm.getKey())) {
966 maybeKeyExists = false;
971 if (v && v->isLocked(ep_current_time()) &&
972 (getState() == vbucket_state_replica ||
973 getState() == vbucket_state_pending)) {
977 VBQueueItemCtx queueItmCtx(genBySeqno,
980 /*isBackfillItem*/ false,
981 nullptr /* No pre link step needed */);
982 MutationStatus status;
983 VBNotifyCtx notifyCtx;
984 std::tie(status, notifyCtx) = processSet(hbl,
994 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
996 case MutationStatus::NoMem:
999 case MutationStatus::InvalidCas:
1000 ret = ENGINE_KEY_EEXISTS;
1002 case MutationStatus::IsLocked:
1003 ret = ENGINE_LOCKED;
1005 case MutationStatus::WasDirty:
1006 case MutationStatus::WasClean: {
1008 *seqno = static_cast<uint64_t>(v->getBySeqno());
1010 // we unlock ht lock here because we want to avoid potential lock
1011 // inversions arising from notifyNewSeqno() call
1012 hbl.getHTLock().unlock();
1013 notifyNewSeqno(notifyCtx);
1015 case MutationStatus::NotFound:
1016 ret = ENGINE_KEY_ENOENT;
1018 case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
1020 if (v) { // temp item is already created. Simply schedule a
1021 hbl.getHTLock().unlock(); // bg fetch job.
1022 bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1023 return ENGINE_EWOULDBLOCK;
1025 ret = addTempItemAndBGFetch(hbl,
1038 ENGINE_ERROR_CODE VBucket::deleteItem(const DocKey& key,
1041 EventuallyPersistentEngine& engine,
1042 const int bgFetchDelay,
1043 ItemMetaData* itemMeta,
1044 mutation_descr_t* mutInfo) {
1045 auto hbl = ht.getLockedBucket(key);
1046 StoredValue* v = ht.unlocked_find(
1047 key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1049 if (!v || v->isDeleted() || v->isTempItem()) {
1050 if (eviction == VALUE_ONLY) {
1051 return ENGINE_KEY_ENOENT;
1052 } else { // Full eviction.
1053 if (!v) { // Item might be evicted from cache.
1054 if (maybeKeyExistsInFilter(key)) {
1055 return addTempItemAndBGFetch(
1056 hbl, key, cookie, engine, bgFetchDelay, true);
1058 // As bloomfilter predicted that item surely doesn't
1059 // exist on disk, return ENOENT for deleteItem().
1060 return ENGINE_KEY_ENOENT;
1062 } else if (v->isTempInitialItem()) {
1063 hbl.getHTLock().unlock();
1064 bgFetch(key, cookie, engine, bgFetchDelay, true);
1065 return ENGINE_EWOULDBLOCK;
1066 } else { // Non-existent or deleted key.
1067 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1068 // Delete a temp non-existent item to ensure that
1069 // if a delete were issued over an item that doesn't
1070 // exist, then we don't preserve a temp item.
1071 deleteStoredValue(hbl, *v);
1073 return ENGINE_KEY_ENOENT;
1078 if (v->isLocked(ep_current_time()) &&
1079 (getState() == vbucket_state_replica ||
1080 getState() == vbucket_state_pending)) {
1084 if (itemMeta != nullptr) {
1085 itemMeta->cas = v->getCas();
1088 MutationStatus delrv;
1089 VBNotifyCtx notifyCtx;
1090 if (v->isExpired(ep_real_time())) {
1091 std::tie(delrv, v, notifyCtx) = processExpiredItem(hbl, *v);
1093 ItemMetaData metadata;
1094 metadata.revSeqno = v->getRevSeqno() + 1;
1095 std::tie(delrv, v, notifyCtx) =
1096 processSoftDelete(hbl,
1100 VBQueueItemCtx(GenerateBySeqno::Yes,
1103 /*isBackfillItem*/ false,
1104 nullptr /* no pre link */),
1106 /*bySeqno*/ v->getBySeqno());
1110 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1112 case MutationStatus::NoMem:
1113 ret = ENGINE_ENOMEM;
1115 case MutationStatus::InvalidCas:
1116 ret = ENGINE_KEY_EEXISTS;
1118 case MutationStatus::IsLocked:
1119 ret = ENGINE_LOCKED_TMPFAIL;
1121 case MutationStatus::NotFound:
1122 ret = ENGINE_KEY_ENOENT;
1124 * A NotFound return value at this point indicates that the
1125 * item has expired. But, a deletion still needs to be queued
1126 * for the item in order to persist it.
1128 case MutationStatus::WasClean:
1129 case MutationStatus::WasDirty:
1130 if (itemMeta != nullptr) {
1131 itemMeta->revSeqno = v->getRevSeqno();
1132 itemMeta->flags = v->getFlags();
1133 itemMeta->exptime = v->getExptime();
1136 notifyNewSeqno(notifyCtx);
1137 seqno = static_cast<uint64_t>(v->getBySeqno());
1140 if (delrv != MutationStatus::NotFound) {
1142 mutInfo->seqno = seqno;
1143 mutInfo->vbucket_uuid = failovers->getLatestUUID();
1145 if (itemMeta != nullptr) {
1146 itemMeta->cas = v->getCas();
1150 case MutationStatus::NeedBgFetch:
1151 // We already figured out if a bg fetch is requred for a full-evicted
1153 throw std::logic_error(
1154 "VBucket::deleteItem: "
1155 "Unexpected NEEDS_BG_FETCH from processSoftDelete");
1160 ENGINE_ERROR_CODE VBucket::deleteWithMeta(const DocKey& key,
1164 EventuallyPersistentEngine& engine,
1165 const int bgFetchDelay,
1167 const ItemMetaData& itemMeta,
1168 const bool backfill,
1169 const GenerateBySeqno genBySeqno,
1170 const GenerateCas generateCas,
1171 const uint64_t bySeqno,
1172 const bool isReplication) {
1173 auto hbl = ht.getLockedBucket(key);
1174 StoredValue* v = ht.unlocked_find(
1175 key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1176 if (!force) { // Need conflict resolution.
1178 if (v->isTempInitialItem()) {
1179 bgFetch(key, cookie, engine, bgFetchDelay, true);
1180 return ENGINE_EWOULDBLOCK;
1183 if (!(conflictResolver->resolve(*v,
1185 PROTOCOL_BINARY_RAW_BYTES,
1187 ++stats.numOpsDelMetaResolutionFailed;
1188 return ENGINE_KEY_EEXISTS;
1191 // Item is 1) deleted or not existent in the value eviction case OR
1192 // 2) deleted or evicted in the full eviction.
1193 if (maybeKeyExistsInFilter(key)) {
1194 return addTempItemAndBGFetch(hbl,
1202 // Even though bloomfilter predicted that item doesn't exist
1203 // on disk, we must put this delete on disk if the cas is valid.
1204 AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1205 if (rv == AddStatus::NoMem) {
1206 return ENGINE_ENOMEM;
1208 v = ht.unlocked_find(key,
1211 TrackReference::No);
1217 // We should always try to persist a delete here.
1218 AddStatus rv = addTempStoredValue(hbl, key, isReplication);
1219 if (rv == AddStatus::NoMem) {
1220 return ENGINE_ENOMEM;
1222 v = ht.unlocked_find(key,
1225 TrackReference::No);
1228 } else if (v->isTempInitialItem()) {
1234 if (v && v->isLocked(ep_current_time()) &&
1235 (getState() == vbucket_state_replica ||
1236 getState() == vbucket_state_pending)) {
1240 MutationStatus delrv;
1241 VBNotifyCtx notifyCtx;
1243 if (eviction == FULL_EVICTION) {
1244 delrv = MutationStatus::NeedBgFetch;
1246 delrv = MutationStatus::NotFound;
1249 VBQueueItemCtx queueItmCtx(genBySeqno,
1253 nullptr /* No pre link step needed */);
1255 // system xattrs must remain
1256 std::unique_ptr<Item> itm;
1257 if (mcbp::datatype::is_xattr(v->getDatatype()) &&
1258 (itm = pruneXattrDocument(*v, itemMeta))) {
1259 std::tie(v, delrv, notifyCtx) =
1260 updateStoredValue(hbl, *v, *itm, &queueItmCtx);
1262 std::tie(delrv, v, notifyCtx) = processSoftDelete(hbl,
1271 cas = v ? v->getCas() : 0;
1274 case MutationStatus::NoMem:
1275 return ENGINE_ENOMEM;
1276 case MutationStatus::InvalidCas:
1277 return ENGINE_KEY_EEXISTS;
1278 case MutationStatus::IsLocked:
1279 return ENGINE_LOCKED_TMPFAIL;
1280 case MutationStatus::NotFound:
1281 return ENGINE_KEY_ENOENT;
1282 case MutationStatus::WasDirty:
1283 case MutationStatus::WasClean: {
1285 *seqno = static_cast<uint64_t>(v->getBySeqno());
1287 // we unlock ht lock here because we want to avoid potential lock
1288 // inversions arising from notifyNewSeqno() call
1289 hbl.getHTLock().unlock();
1290 notifyNewSeqno(notifyCtx);
1293 case MutationStatus::NeedBgFetch:
1294 hbl.getHTLock().unlock();
1295 bgFetch(key, cookie, engine, bgFetchDelay, true);
1296 return ENGINE_EWOULDBLOCK;
1298 return ENGINE_SUCCESS;
1301 void VBucket::deleteExpiredItem(const Item& it,
1305 // The item is correctly trimmed (by the caller). Fetch the one in the
1306 // hashtable and replace it if the CAS match (same item; no race).
1307 // If not found in the hashtable we should add it as a deleted item
1308 const DocKey& key = it.getKey();
1309 auto hbl = ht.getLockedBucket(key);
1310 StoredValue* v = ht.unlocked_find(
1311 key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1313 if (v->getCas() != it.getCas()) {
1317 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1318 bool deleted = deleteStoredValue(hbl, *v);
1320 throw std::logic_error(
1321 "VBucket::deleteExpiredItem: "
1322 "Failed to delete seqno:" +
1323 std::to_string(v->getBySeqno()) + " from bucket " +
1324 std::to_string(hbl.getBucketNum()));
1326 } else if (v->isExpired(startTime) && !v->isDeleted()) {
1327 VBNotifyCtx notifyCtx;
1328 std::tie(std::ignore, std::ignore, notifyCtx) =
1329 processExpiredItem(hbl, *v);
1330 // we unlock ht lock here because we want to avoid potential lock
1331 // inversions arising from notifyNewSeqno() call
1332 hbl.getHTLock().unlock();
1333 notifyNewSeqno(notifyCtx);
1336 if (eviction == FULL_EVICTION) {
1337 // Create a temp item and delete and push it
1338 // into the checkpoint queue, only if the bloomfilter
1339 // predicts that the item may exist on disk.
1340 if (maybeKeyExistsInFilter(key)) {
1341 AddStatus rv = addTempStoredValue(hbl, key);
1342 if (rv == AddStatus::NoMem) {
1345 v = ht.unlocked_find(key,
1348 TrackReference::No);
1350 v->setRevSeqno(it.getRevSeqno());
1351 v->setValue(it, ht);
1352 VBNotifyCtx notifyCtx;
1353 std::tie(std::ignore, std::ignore, notifyCtx) =
1354 processExpiredItem(hbl, *v);
1355 // we unlock ht lock here because we want to avoid potential
1356 // lock inversions arising from notifyNewSeqno() call
1357 hbl.getHTLock().unlock();
1358 notifyNewSeqno(notifyCtx);
1362 incExpirationStat(source);
1365 ENGINE_ERROR_CODE VBucket::add(Item& itm,
1367 EventuallyPersistentEngine& engine,
1368 const int bgFetchDelay) {
1369 auto hbl = ht.getLockedBucket(itm.getKey());
1370 StoredValue* v = ht.unlocked_find(itm.getKey(),
1373 TrackReference::No);
1375 bool maybeKeyExists = true;
1376 if ((v == nullptr || v->isTempInitialItem()) &&
1377 (eviction == FULL_EVICTION)) {
1378 // Check bloomfilter's prediction
1379 if (!maybeKeyExistsInFilter(itm.getKey())) {
1380 maybeKeyExists = false;
1384 PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
1385 VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
1388 /*isBackfillItem*/ false,
1389 &preLinkDocumentContext);
1391 VBNotifyCtx notifyCtx;
1392 std::tie(status, notifyCtx) =
1393 processAdd(hbl, v, itm, maybeKeyExists, false, &queueItmCtx);
1396 case AddStatus::NoMem:
1397 return ENGINE_ENOMEM;
1398 case AddStatus::Exists:
1399 return ENGINE_NOT_STORED;
1400 case AddStatus::AddTmpAndBgFetch:
1401 return addTempItemAndBGFetch(
1402 hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
1403 case AddStatus::BgFetch:
1404 hbl.getHTLock().unlock();
1405 bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1406 return ENGINE_EWOULDBLOCK;
1407 case AddStatus::Success:
1408 case AddStatus::UnDel:
1409 notifyNewSeqno(notifyCtx);
1410 itm.setBySeqno(v->getBySeqno());
1411 itm.setCas(v->getCas());
1414 return ENGINE_SUCCESS;
1417 std::pair<MutationStatus, GetValue> VBucket::processGetAndUpdateTtl(
1418 HashTable::HashBucketLock& hbl,
1423 if (v->isDeleted() || v->isTempDeletedItem() ||
1424 v->isTempNonExistentItem()) {
1425 return {MutationStatus::NotFound, {}};
1428 if (!v->isResident()) {
1429 return {MutationStatus::NeedBgFetch, {}};
1432 if (v->isLocked(ep_current_time())) {
1433 return {MutationStatus::IsLocked,
1434 GetValue(nullptr, ENGINE_KEY_EEXISTS, 0)};
1437 const bool exptime_mutated = exptime != v->getExptime();
1438 auto bySeqNo = v->getBySeqno();
1439 if (exptime_mutated) {
1441 v->setExptime(exptime);
1442 v->setRevSeqno(v->getRevSeqno() + 1);
1446 v->toItem(v->isLocked(ep_current_time()), getId()).release();
1448 GetValue rv(item, ENGINE_SUCCESS, bySeqNo);
1450 if (exptime_mutated) {
1451 VBQueueItemCtx qItemCtx(GenerateBySeqno::Yes,
1456 VBNotifyCtx notifyCtx;
1457 std::tie(v, std::ignore, notifyCtx) =
1458 updateStoredValue(hbl, *v, *item, &qItemCtx, true);
1459 rv.getValue()->setCas(v->getCas());
1460 // we unlock ht lock here because we want to avoid potential lock
1461 // inversions arising from notifyNewSeqno() call
1462 hbl.getHTLock().unlock();
1463 notifyNewSeqno(notifyCtx);
1466 return {MutationStatus::WasClean, rv};
1468 if (eviction == VALUE_ONLY) {
1469 return {MutationStatus::NotFound, {}};
1471 if (maybeKeyExistsInFilter(key)) {
1472 return {MutationStatus::NeedBgFetch, {}};
1474 // As bloomfilter predicted that item surely doesn't exist
1475 // on disk, return ENOENT for getAndUpdateTtl().
1476 return {MutationStatus::NotFound, {}};
1482 GetValue VBucket::getAndUpdateTtl(const DocKey& key,
1484 EventuallyPersistentEngine& engine,
1487 auto hbl = ht.getLockedBucket(key);
1488 StoredValue* v = fetchValidValue(hbl,
1491 TrackReference::Yes,
1494 MutationStatus status;
1495 std::tie(status, gv) = processGetAndUpdateTtl(hbl, key, v, exptime);
1497 if (status == MutationStatus::NeedBgFetch) {
1499 bgFetch(key, cookie, engine, bgFetchDelay);
1500 return GetValue(nullptr, ENGINE_EWOULDBLOCK, v->getBySeqno());
1502 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1503 hbl, key, cookie, engine, bgFetchDelay, false);
1504 return GetValue(NULL, ec, -1, true);
1511 MutationStatus VBucket::insertFromWarmup(Item& itm,
1513 bool keyMetaDataOnly) {
1514 if (!StoredValue::hasAvailableSpace(stats, itm)) {
1515 return MutationStatus::NoMem;
1518 auto hbl = ht.getLockedBucket(itm.getKey());
1519 StoredValue* v = ht.unlocked_find(itm.getKey(),
1522 TrackReference::No);
1525 v = addNewStoredValue(hbl, itm, /*queueItmCtx*/ nullptr).first;
1526 if (keyMetaDataOnly) {
1527 v->markNotResident();
1528 /* For now ht stats are updated from outside ht. This seems to be
1529 a better option for now than passing a flag to
1530 addNewStoredValue() just for this func */
1531 ++(ht.numNonResidentItems);
1533 /* For now ht stats are updated from outside ht. This seems to be
1534 a better option for now than passing a flag to
1535 addNewStoredValue() just for this func.
1536 We need to decrNumTotalItems because ht.numTotalItems is already
1537 set by warmup when it estimated the item count from disk */
1538 ht.decrNumTotalItems();
1539 v->setNewCacheItem(false);
1541 if (keyMetaDataOnly) {
1542 // We don't have a better error code ;)
1543 return MutationStatus::InvalidCas;
1546 // Verify that the CAS isn't changed
1547 if (v->getCas() != itm.getCas()) {
1548 if (v->getCas() == 0) {
1549 v->setCas(itm.getCas());
1550 v->setFlags(itm.getFlags());
1551 v->setExptime(itm.getExptime());
1552 v->setRevSeqno(itm.getRevSeqno());
1554 return MutationStatus::InvalidCas;
1557 updateStoredValue(hbl, *v, itm, /*queueItmCtx*/ nullptr);
1562 if (eject && !keyMetaDataOnly) {
1563 ht.unlocked_ejectItem(v, eviction);
1566 return MutationStatus::NotFound;
1569 GetValue VBucket::getInternal(const DocKey& key,
1571 EventuallyPersistentEngine& engine,
1573 get_options_t options,
1574 bool diskFlushAll) {
1575 const TrackReference trackReference = (options & TRACK_REFERENCE)
1576 ? TrackReference::Yes
1577 : TrackReference::No;
1578 const bool getDeletedValue = (options & GET_DELETED_VALUE);
1579 auto hbl = ht.getLockedBucket(key);
1580 StoredValue* v = fetchValidValue(
1581 hbl, key, WantsDeleted::Yes, trackReference, QueueExpired::Yes);
1583 if (v->isDeleted() && !getDeletedValue) {
1586 if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1587 // Delete a temp non-existent item to ensure that
1588 // if the get were issued over an item that doesn't
1589 // exist, then we dont preserve a temp item.
1590 if (options & DELETE_TEMP) {
1591 deleteStoredValue(hbl, *v);
1596 // If the value is not resident, wait for it...
1597 if (!v->isResident()) {
1598 return getInternalNonResident(
1599 key, cookie, engine, bgFetchDelay, options, *v);
1602 // Should we hide (return -1) for the items' CAS?
1603 const bool hide_cas =
1604 (options & HIDE_LOCKED_CAS) && v->isLocked(ep_current_time());
1605 return GetValue(v->toItem(hide_cas, getId()).release(),
1611 if (!getDeletedValue && (eviction == VALUE_ONLY || diskFlushAll)) {
1615 if (maybeKeyExistsInFilter(key)) {
1616 ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1618 QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
1619 ec = addTempItemAndBGFetch(
1620 hbl, key, cookie, engine, bgFetchDelay, false);
1622 return GetValue(NULL, ec, -1, true);
1624 // As bloomfilter predicted that item surely doesn't exist
1625 // on disk, return ENOENT, for getInternal().
1631 ENGINE_ERROR_CODE VBucket::getMetaData(const DocKey& key,
1633 EventuallyPersistentEngine& engine,
1635 ItemMetaData& metadata,
1637 uint8_t& datatype) {
1639 auto hbl = ht.getLockedBucket(key);
1640 StoredValue* v = ht.unlocked_find(
1641 key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1644 stats.numOpsGetMeta++;
1645 if (v->isTempInitialItem()) {
1646 // Need bg meta fetch.
1647 bgFetch(key, cookie, engine, bgFetchDelay, true);
1648 return ENGINE_EWOULDBLOCK;
1649 } else if (v->isTempNonExistentItem()) {
1650 metadata.cas = v->getCas();
1651 return ENGINE_KEY_ENOENT;
1653 if (v->isTempDeletedItem() || v->isDeleted() ||
1654 v->isExpired(ep_real_time())) {
1655 deleted |= GET_META_ITEM_DELETED_FLAG;
1658 if (v->isLocked(ep_current_time())) {
1659 metadata.cas = static_cast<uint64_t>(-1);
1661 metadata.cas = v->getCas();
1663 metadata.flags = v->getFlags();
1664 metadata.exptime = v->getExptime();
1665 metadata.revSeqno = v->getRevSeqno();
1666 datatype = v->getDatatype();
1668 return ENGINE_SUCCESS;
1671 // The key wasn't found. However, this may be because it was previously
1672 // deleted or evicted with the full eviction strategy.
1673 // So, add a temporary item corresponding to the key to the hash table
1674 // and schedule a background fetch for its metadata from the persistent
1675 // store. The item's state will be updated after the fetch completes.
1677 // Schedule this bgFetch only if the key is predicted to be may-be
1678 // existent on disk by the bloomfilter.
1680 if (maybeKeyExistsInFilter(key)) {
1681 return addTempItemAndBGFetch(
1682 hbl, key, cookie, engine, bgFetchDelay, true);
1684 stats.numOpsGetMeta++;
1685 return ENGINE_KEY_ENOENT;
1690 ENGINE_ERROR_CODE VBucket::getKeyStats(const DocKey& key,
1692 EventuallyPersistentEngine& engine,
1694 struct key_stats& kstats,
1695 WantsDeleted wantsDeleted) {
1696 auto hbl = ht.getLockedBucket(key);
1697 StoredValue* v = fetchValidValue(hbl,
1700 TrackReference::Yes,
1704 if ((v->isDeleted() && wantsDeleted == WantsDeleted::No) ||
1705 v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1706 return ENGINE_KEY_ENOENT;
1708 if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
1709 hbl.getHTLock().unlock();
1710 bgFetch(key, cookie, engine, bgFetchDelay, true);
1711 return ENGINE_EWOULDBLOCK;
1713 kstats.logically_deleted = v->isDeleted();
1714 kstats.dirty = v->isDirty();
1715 kstats.exptime = v->getExptime();
1716 kstats.flags = v->getFlags();
1717 kstats.cas = v->getCas();
1718 kstats.vb_state = getState();
1719 return ENGINE_SUCCESS;
1721 if (eviction == VALUE_ONLY) {
1722 return ENGINE_KEY_ENOENT;
1724 if (maybeKeyExistsInFilter(key)) {
1725 return addTempItemAndBGFetch(
1726 hbl, key, cookie, engine, bgFetchDelay, true);
1728 // If bgFetch were false, or bloomfilter predicted that
1729 // item surely doesn't exist on disk, return ENOENT for
1731 return ENGINE_KEY_ENOENT;
1737 GetValue VBucket::getLocked(const DocKey& key,
1738 rel_time_t currentTime,
1739 uint32_t lockTimeout,
1741 EventuallyPersistentEngine& engine,
1743 auto hbl = ht.getLockedBucket(key);
1744 StoredValue* v = fetchValidValue(hbl,
1747 TrackReference::Yes,
1751 if (v->isDeleted() || v->isTempNonExistentItem() ||
1752 v->isTempDeletedItem()) {
1753 return GetValue(NULL, ENGINE_KEY_ENOENT);
1756 // if v is locked return error
1757 if (v->isLocked(currentTime)) {
1758 return GetValue(NULL, ENGINE_TMPFAIL);
1761 // If the value is not resident, wait for it...
1762 if (!v->isResident()) {
1764 bgFetch(key, cookie, engine, bgFetchDelay);
1766 return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
1769 // acquire lock and increment cas value
1770 v->lock(currentTime + lockTimeout);
1772 auto it = v->toItem(false, getId());
1773 it->setCas(nextHLCCas());
1774 v->setCas(it->getCas());
1776 return GetValue(it.release());
1779 // No value found in the hashtable.
1782 return GetValue(NULL, ENGINE_KEY_ENOENT);
1785 if (maybeKeyExistsInFilter(key)) {
1786 ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(
1787 hbl, key, cookie, engine, bgFetchDelay, false);
1788 return GetValue(NULL, ec, -1, true);
1790 // As bloomfilter predicted that item surely doesn't exist
1791 // on disk, return ENOENT for getLocked().
1792 return GetValue(NULL, ENGINE_KEY_ENOENT);
1795 return GetValue(); // just to prevent compiler warning
1799 void VBucket::deletedOnDiskCbk(const Item& queuedItem, bool deleted) {
1800 auto hbl = ht.getLockedBucket(queuedItem.getKey());
1801 StoredValue* v = fetchValidValue(hbl,
1802 queuedItem.getKey(),
1806 // Delete the item in the hash table iff:
1807 // 1. Item is existent in hashtable, and deleted flag is true
1808 // 2. rev seqno of queued item matches rev seqno of hash table item
1809 if (v && v->isDeleted() && (queuedItem.getRevSeqno() == v->getRevSeqno())) {
1810 bool isDeleted = deleteStoredValue(hbl, *v);
1812 throw std::logic_error(
1813 "deletedOnDiskCbk:callback: "
1814 "Failed to delete key with seqno:" +
1815 std::to_string(v->getBySeqno()) + "' from bucket " +
1816 std::to_string(hbl.getBucketNum()));
1820 * Deleted items are to be added to the bloomfilter,
1821 * in either eviction policy.
1823 addToFilter(queuedItem.getKey());
1827 ++stats.totalPersisted;
1830 doStatsForFlushing(queuedItem, queuedItem.size());
1831 --stats.diskQueueSize;
1832 decrMetaDataDisk(queuedItem);
1835 bool VBucket::deleteKey(const DocKey& key) {
1836 auto hbl = ht.getLockedBucket(key);
1837 StoredValue* v = ht.unlocked_find(
1838 key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1842 return deleteStoredValue(hbl, *v);
1845 void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
1846 uint64_t prevHighSeqno) {
1847 failovers->pruneEntries(rollbackResult.highSeqno);
1848 checkpointManager.clear(*this, rollbackResult.highSeqno);
1849 setPersistedSnapshot(rollbackResult.snapStartSeqno,
1850 rollbackResult.snapEndSeqno);
1851 incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
1852 checkpointManager.setOpenCheckpointId(1);
1855 void VBucket::dump() const {
1856 std::cerr << "VBucket[" << this << "] with state: " << toString(getState())
1857 << " numItems:" << getNumItems()
1858 << " numNonResident:" << getNumNonResidentItems()
1859 << " ht: " << std::endl << " " << ht << std::endl
1860 << "]" << std::endl;
1863 void VBucket::_addStats(bool details, ADD_STAT add_stat, const void* c) {
1864 addStat(NULL, toString(state), add_stat, c);
1866 size_t numItems = getNumItems();
1867 size_t tempItems = getNumTempItems();
1868 addStat("num_items", numItems, add_stat, c);
1869 addStat("num_temp_items", tempItems, add_stat, c);
1870 addStat("num_non_resident", getNumNonResidentItems(), add_stat, c);
1871 addStat("ht_memory", ht.memorySize(), add_stat, c);
1872 addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
1873 addStat("ht_cache_size", ht.cacheSize.load(), add_stat, c);
1874 addStat("ht_size", ht.getSize(), add_stat, c);
1875 addStat("num_ejects", ht.getNumEjects(), add_stat, c);
1876 addStat("ops_create", opsCreate.load(), add_stat, c);
1877 addStat("ops_update", opsUpdate.load(), add_stat, c);
1878 addStat("ops_delete", opsDelete.load(), add_stat, c);
1879 addStat("ops_reject", opsReject.load(), add_stat, c);
1880 addStat("queue_size", dirtyQueueSize.load(), add_stat, c);
1881 addStat("backfill_queue_size", getBackfillSize(), add_stat, c);
1882 addStat("queue_memory", dirtyQueueMem.load(), add_stat, c);
1883 addStat("queue_fill", dirtyQueueFill.load(), add_stat, c);
1884 addStat("queue_drain", dirtyQueueDrain.load(), add_stat, c);
1885 addStat("queue_age", getQueueAge(), add_stat, c);
1886 addStat("pending_writes", dirtyQueuePendingWrites.load(), add_stat, c);
1888 addStat("high_seqno", getHighSeqno(), add_stat, c);
1889 addStat("uuid", failovers->getLatestUUID(), add_stat, c);
1890 addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
1891 addStat("bloom_filter", getFilterStatusString().data(),
1893 addStat("bloom_filter_size", getFilterSize(), add_stat, c);
1894 addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
1895 addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
1896 addStat("hp_vb_req_size", getHighPriorityChkSize(), add_stat, c);
1897 hlc.addStats(statPrefix, add_stat, c);
1901 void VBucket::decrDirtyQueueMem(size_t decrementBy)
1903 size_t oldVal, newVal;
1905 oldVal = dirtyQueueMem.load(std::memory_order_relaxed);
1906 if (oldVal < decrementBy) {
1909 newVal = oldVal - decrementBy;
1911 } while (!dirtyQueueMem.compare_exchange_strong(oldVal, newVal));
1914 void VBucket::decrDirtyQueueAge(uint32_t decrementBy)
1916 uint64_t oldVal, newVal;
1918 oldVal = dirtyQueueAge.load(std::memory_order_relaxed);
1919 if (oldVal < decrementBy) {
1922 newVal = oldVal - decrementBy;
1924 } while (!dirtyQueueAge.compare_exchange_strong(oldVal, newVal));
1927 void VBucket::decrDirtyQueuePendingWrites(size_t decrementBy)
1929 size_t oldVal, newVal;
1931 oldVal = dirtyQueuePendingWrites.load(std::memory_order_relaxed);
1932 if (oldVal < decrementBy) {
1935 newVal = oldVal - decrementBy;
1937 } while (!dirtyQueuePendingWrites.compare_exchange_strong(oldVal, newVal));
1940 std::pair<MutationStatus, VBNotifyCtx> VBucket::processSet(
1941 const HashTable::HashBucketLock& hbl,
1947 const VBQueueItemCtx* queueItmCtx,
1948 bool maybeKeyExists,
1949 bool isReplication) {
1950 if (!hbl.getHTLock()) {
1951 throw std::invalid_argument(
1952 "VBucket::processSet: htLock not held for "
1954 std::to_string(getId()));
1957 if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
1958 return {MutationStatus::NoMem, VBNotifyCtx()};
1961 if (cas && eviction == FULL_EVICTION && maybeKeyExists) {
1962 if (!v || v->isTempInitialItem()) {
1963 return {MutationStatus::NeedBgFetch, VBNotifyCtx()};
1968 * prior to checking for the lock, we should check if this object
1969 * has expired. If so, then check if CAS value has been provided
1970 * for this set op. In this case the operation should be denied since
1971 * a cas operation for a key that doesn't exist is not a very cool
1972 * thing to do. See MB 3252
1974 if (v && v->isExpired(ep_real_time()) && !hasMetaData && !itm.isDeleted()) {
1975 if (v->isLocked(ep_current_time())) {
1979 /* item has expired and cas value provided. Deny ! */
1980 return {MutationStatus::NotFound, VBNotifyCtx()};
1985 if (!allowExisting && !v->isTempItem()) {
1986 return {MutationStatus::InvalidCas, VBNotifyCtx()};
1988 if (v->isLocked(ep_current_time())) {
1990 * item is locked, deny if there is cas value mismatch
1991 * or no cas value is provided by the user
1993 if (cas != v->getCas()) {
1994 return {MutationStatus::IsLocked, VBNotifyCtx()};
1996 /* allow operation*/
1998 } else if (cas && cas != v->getCas()) {
1999 if (v->isTempNonExistentItem()) {
2000 // This is a temporary item which marks a key as non-existent;
2001 // therefore specifying a non-matching CAS should be exposed
2002 // as item not existing.
2003 return {MutationStatus::NotFound, VBNotifyCtx()};
2005 if ((v->isTempDeletedItem() || v->isDeleted()) && !itm.isDeleted()) {
2006 // Existing item is deleted, and we are not replacing it with
2007 // a (different) deleted value - return not existing.
2008 return {MutationStatus::NotFound, VBNotifyCtx()};
2010 // None of the above special cases; the existing item cannot be
2011 // modified with the specified CAS.
2012 return {MutationStatus::InvalidCas, VBNotifyCtx()};
2015 itm.setRevSeqno(v->getRevSeqno() + 1);
2016 /* MB-23530: We must ensure that a replace operation (i.e.
2017 * set with a CAS) /fails/ if the old document is deleted; it
2018 * logically "doesn't exist". However, if the new value is deleted
2019 * this op is a /delete/ with a CAS and we must permit a
2020 * deleted -> deleted transition for Deleted Bodies.
2022 if (cas && (v->isDeleted() || v->isTempDeletedItem()) &&
2024 return {MutationStatus::NotFound, VBNotifyCtx()};
2028 MutationStatus status;
2029 VBNotifyCtx notifyCtx;
2030 std::tie(v, status, notifyCtx) =
2031 updateStoredValue(hbl, *v, itm, queueItmCtx);
2032 return {status, notifyCtx};
2033 } else if (cas != 0) {
2034 return {MutationStatus::NotFound, VBNotifyCtx()};
2036 VBNotifyCtx notifyCtx;
2037 std::tie(v, notifyCtx) = addNewStoredValue(hbl, itm, queueItmCtx);
2039 updateRevSeqNoOfNewStoredValue(*v);
2040 itm.setRevSeqno(v->getRevSeqno());
2042 return {MutationStatus::WasClean, notifyCtx};
2046 std::pair<AddStatus, VBNotifyCtx> VBucket::processAdd(
2047 const HashTable::HashBucketLock& hbl,
2050 bool maybeKeyExists,
2052 const VBQueueItemCtx* queueItmCtx) {
2053 if (!hbl.getHTLock()) {
2054 throw std::invalid_argument(
2055 "VBucket::processAdd: htLock not held for "
2057 std::to_string(getId()));
2060 if (v && !v->isDeleted() && !v->isExpired(ep_real_time()) &&
2062 return {AddStatus::Exists, VBNotifyCtx()};
2064 if (!StoredValue::hasAvailableSpace(stats, itm, isReplication)) {
2065 return {AddStatus::NoMem, VBNotifyCtx()};
2068 std::pair<AddStatus, VBNotifyCtx> rv = {AddStatus::Success, VBNotifyCtx()};
2071 if (v->isTempInitialItem() && eviction == FULL_EVICTION &&
2073 // Need to figure out if an item exists on disk
2074 return {AddStatus::BgFetch, VBNotifyCtx()};
2077 rv.first = (v->isDeleted() || v->isExpired(ep_real_time()))
2079 : AddStatus::Success;
2081 if (v->isTempDeletedItem()) {
2082 itm.setRevSeqno(v->getRevSeqno() + 1);
2084 itm.setRevSeqno(ht.getMaxDeletedRevSeqno() + 1);
2087 if (!v->isTempItem()) {
2088 itm.setRevSeqno(v->getRevSeqno() + 1);
2091 std::tie(v, std::ignore, rv.second) =
2092 updateStoredValue(hbl, *v, itm, queueItmCtx);
2094 if (itm.getBySeqno() != StoredValue::state_temp_init) {
2095 if (eviction == FULL_EVICTION && maybeKeyExists) {
2096 return {AddStatus::AddTmpAndBgFetch, VBNotifyCtx()};
2099 std::tie(v, rv.second) = addNewStoredValue(hbl, itm, queueItmCtx);
2100 updateRevSeqNoOfNewStoredValue(*v);
2101 itm.setRevSeqno(v->getRevSeqno());
2102 if (v->isTempItem()) {
2103 rv.first = AddStatus::BgFetch;
2107 if (v->isTempItem()) {
2108 v->setNRUValue(MAX_NRU_VALUE);
2113 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2114 VBucket::processSoftDelete(const HashTable::HashBucketLock& hbl,
2117 const ItemMetaData& metadata,
2118 const VBQueueItemCtx& queueItmCtx,
2121 if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2122 return std::make_tuple(MutationStatus::NeedBgFetch, &v, VBNotifyCtx());
2125 if (v.isLocked(ep_current_time())) {
2126 if (cas != v.getCas()) {
2127 return std::make_tuple(MutationStatus::IsLocked, &v, VBNotifyCtx());
2132 if (cas != 0 && cas != v.getCas()) {
2133 return std::make_tuple(MutationStatus::InvalidCas, &v, VBNotifyCtx());
2136 /* allow operation */
2140 v.isDirty() ? MutationStatus::WasDirty : MutationStatus::WasClean;
2143 v.setCas(metadata.cas);
2144 v.setFlags(metadata.flags);
2145 v.setExptime(metadata.exptime);
2148 v.setRevSeqno(metadata.revSeqno);
2149 VBNotifyCtx notifyCtx;
2151 std::tie(newSv, notifyCtx) =
2152 softDeleteStoredValue(hbl,
2154 /*onlyMarkDeleted*/ false,
2157 ht.updateMaxDeletedRevSeqno(metadata.revSeqno);
2158 return std::make_tuple(rv, newSv, notifyCtx);
2161 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2162 VBucket::processExpiredItem(const HashTable::HashBucketLock& hbl,
2164 if (!hbl.getHTLock()) {
2165 throw std::invalid_argument(
2166 "VBucket::processExpiredItem: htLock not held for VBucket " +
2167 std::to_string(getId()));
2170 if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2171 return std::make_tuple(MutationStatus::NeedBgFetch,
2174 GenerateBySeqno::Yes,
2176 /*isBackfillItem*/ false));
2179 /* If the datatype is XATTR, mark the item as deleted
2180 * but don't delete the value as system xattrs can
2181 * still be queried by mobile clients even after
2183 * TODO: The current implementation is inefficient
2184 * but functionally correct and for performance reasons
2185 * only the system xattrs need to be stored.
2187 value_t value = v.getValue();
2188 bool onlyMarkDeleted =
2189 value && mcbp::datatype::is_xattr(value->getDataType());
2190 v.setRevSeqno(v.getRevSeqno() + 1);
2191 VBNotifyCtx notifyCtx;
2193 std::tie(newSv, notifyCtx) =
2194 softDeleteStoredValue(hbl,
2197 VBQueueItemCtx(GenerateBySeqno::Yes,
2200 /*isBackfillItem*/ false,
2201 nullptr /* no pre link */),
2203 ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno() + 1);
2204 return std::make_tuple(MutationStatus::NotFound, newSv, notifyCtx);
2207 bool VBucket::deleteStoredValue(const HashTable::HashBucketLock& hbl,
2209 if (!v.isDeleted() && v.isLocked(ep_current_time())) {
2213 /* StoredValue deleted here. If any other in-memory data structures are
2214 using the StoredValue intrusively then they must have handled the delete
2216 ht.unlocked_del(hbl, v.getKey());
2220 AddStatus VBucket::addTempStoredValue(const HashTable::HashBucketLock& hbl,
2222 bool isReplication) {
2223 uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_RAW_BYTES};
2224 static_assert(sizeof(ext_meta) == 1,
2225 "VBucket::addTempStoredValue(): expected "
2226 "EXT_META_LEN to be 1");
2235 StoredValue::state_temp_init);
2237 /* if a temp item for a possibly deleted, set it non-resident by resetting
2238 the value cuz normally a new item added is considered resident which
2239 does not apply for temp item. */
2240 StoredValue* v = nullptr;
2241 return processAdd(hbl, v, itm, true, isReplication, nullptr).first;
2244 void VBucket::notifyNewSeqno(const VBNotifyCtx& notifyCtx) {
2246 newSeqnoCb->callback(getId(), notifyCtx);
2251 * Queue the item to the checkpoint and return the seqno the item was
2254 int64_t VBucket::queueItem(Item* item, OptionalSeqno seqno) {
2255 item->setVBucketId(id);
2256 queued_item qi(item);
2257 checkpointManager.queueDirty(
2260 seqno ? GenerateBySeqno::No : GenerateBySeqno::Yes,
2262 nullptr /* No pre link step as this is for system events */);
2263 VBNotifyCtx notifyCtx;
2264 // If the seqno is initialized, skip replication notification
2265 notifyCtx.notifyReplication = !seqno.is_initialized();
2266 notifyCtx.notifyFlusher = true;
2267 notifyCtx.bySeqno = qi->getBySeqno();
2268 notifyNewSeqno(notifyCtx);
2269 return qi->getBySeqno();
2272 VBNotifyCtx VBucket::queueDirty(StoredValue& v,
2273 const VBQueueItemCtx& queueItmCtx) {
2274 if (queueItmCtx.trackCasDrift == TrackCasDrift::Yes) {
2275 setMaxCasAndTrackDrift(v.getCas());
2277 return queueDirty(v,
2278 queueItmCtx.genBySeqno,
2280 queueItmCtx.isBackfillItem,
2281 queueItmCtx.preLinkDocumentContext);
2284 void VBucket::updateRevSeqNoOfNewStoredValue(StoredValue& v) {
2286 * Possibly, this item is being recreated. Conservatively assign it
2287 * a seqno that is greater than the greatest seqno of all deleted
2288 * items seen so far.
2290 uint64_t seqno = ht.getMaxDeletedRevSeqno();
2291 if (!v.isTempItem()) {
2294 v.setRevSeqno(seqno);
2297 void VBucket::addHighPriorityVBEntry(uint64_t seqnoOrChkId,
2299 HighPriorityVBNotify reqType) {
2300 std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2301 hpVBReqs.push_back(HighPriorityVBEntry(cookie, seqnoOrChkId, reqType));
2302 numHpVBReqs.store(hpVBReqs.size());
2304 LOG(EXTENSION_LOG_NOTICE,
2305 "Added high priority async request %s "
2306 "for vb:%" PRIu16 ", Check for:%" PRIu64 ", "
2307 "Persisted upto:%" PRIu64 ", cookie:%p",
2308 to_string(reqType).c_str(),
2311 getPersistenceSeqno(),
2315 std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifications(
2316 EventuallyPersistentEngine& engine,
2318 HighPriorityVBNotify notifyType) {
2319 std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2320 std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2322 auto entry = hpVBReqs.begin();
2324 while (entry != hpVBReqs.end()) {
2325 if (notifyType != entry->reqType) {
2330 std::string logStr(to_string(notifyType));
2332 hrtime_t wall_time(gethrtime() - entry->start);
2333 size_t spent = wall_time / 1000000000;
2334 if (entry->id <= idNum) {
2335 toNotify[entry->cookie] = ENGINE_SUCCESS;
2336 stats.chkPersistenceHisto.add(wall_time / 1000);
2337 adjustCheckpointFlushTimeout(wall_time / 1000000000);
2338 LOG(EXTENSION_LOG_NOTICE,
2339 "Notified the completion of %s "
2340 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2342 "Persisted upto: %" PRIu64 ", cookie %p",
2348 entry = hpVBReqs.erase(entry);
2349 } else if (spent > getCheckpointFlushTimeout()) {
2350 adjustCheckpointFlushTimeout(spent);
2351 engine.storeEngineSpecific(entry->cookie, NULL);
2352 toNotify[entry->cookie] = ENGINE_TMPFAIL;
2353 LOG(EXTENSION_LOG_WARNING,
2354 "Notified the timeout on %s "
2355 "for vbucket %" PRIu16 ", Check for: %" PRIu64
2357 "Persisted upto: %" PRIu64 ", cookie %p",
2363 entry = hpVBReqs.erase(entry);
2368 numHpVBReqs.store(hpVBReqs.size());
2372 std::map<const void*, ENGINE_ERROR_CODE> VBucket::tmpFailAndGetAllHpNotifies(
2373 EventuallyPersistentEngine& engine) {
2374 std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2376 LockHolder lh(hpVBReqsMutex);
2378 for (auto& entry : hpVBReqs) {
2379 toNotify[entry.cookie] = ENGINE_TMPFAIL;
2380 engine.storeEngineSpecific(entry.cookie, NULL);
2387 void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
2388 size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
2390 if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
2391 chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
2392 } else if (wall_time <= middle) {
2393 chkFlushTimeout = middle;
2395 chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
2399 size_t VBucket::getCheckpointFlushTimeout() {
2400 return chkFlushTimeout;
2403 std::unique_ptr<Item> VBucket::pruneXattrDocument(
2404 StoredValue& v, const ItemMetaData& itemMeta) {
2405 // Need to take a copy of the value, prune it, and add it back
2407 // Create work-space document
2408 std::vector<uint8_t> workspace(v.getValue()->vlength());
2409 std::copy_n(v.getValue()->getData(),
2410 v.getValue()->vlength(),
2413 // Now attach to the XATTRs in the document
2414 auto sz = cb::xattr::get_body_offset(
2415 {reinterpret_cast<char*>(workspace.data()), workspace.size()});
2417 cb::xattr::Blob xattr({workspace.data(), sz});
2418 xattr.prune_user_keys();
2420 auto prunedXattrs = xattr.finalize();
2422 if (prunedXattrs.size()) {
2423 // Something remains - Create a Blob and copy-in just the XATTRs
2425 Blob::New(reinterpret_cast<const char*>(prunedXattrs.data()),
2426 prunedXattrs.size(),
2427 const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(
2428 v.getValue()->getExtMeta())),
2429 v.getValue()->getExtLen());
2431 return std::make_unique<Item>(v.getKey(),
2444 void VBucket::DeferredDeleter::operator()(VBucket* vb) const {
2445 // If the vbucket is marked as deleting then we must schedule task to
2446 // perform the resource destruction (memory/disk).
2447 if (vb->isDeletionDeferred()) {
2448 vb->scheduleDeferredDeletion(engine);