1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2017 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
20 #include "bgfetcher.h"
21 #include "ep_engine.h"
22 #include "executorpool.h"
23 #include "failover-table.h"
25 #include "stored_value_factories.h"
27 #include "vbucketdeletiontask.h"
29 EPVBucket::EPVBucket(id_type i,
30 vbucket_state_t newState,
32 CheckpointConfig& chkConfig,
35 uint64_t lastSnapStart,
37 std::unique_ptr<FailoverTable> table,
38 std::shared_ptr<Callback<id_type> > flusherCb,
39 NewSeqnoCallback newSeqnoCb,
40 Configuration& config,
41 item_eviction_policy_t evictionPolicy,
42 vbucket_state_t initState,
45 const std::string& collectionsManifest)
55 std::make_unique<StoredValueFactory>(st),
56 std::move(newSeqnoCb),
63 multiBGFetchEnabled(kvshard
64 ? kvshard->getROUnderlying()
65 ->getStorageProperties()
71 EPVBucket::~EPVBucket() {
72 if (!pendingBGFetches.empty()) {
73 LOG(EXTENSION_LOG_WARNING,
74 "Have %ld pending BG fetches while destroying vbucket\n",
75 pendingBGFetches.size());
79 ENGINE_ERROR_CODE EPVBucket::completeBGFetchForSingleItem(
81 const VBucketBGFetchItem& fetched_item,
82 const ProcessClock::time_point startTime) {
83 ENGINE_ERROR_CODE status = fetched_item.value.getStatus();
84 Item* fetchedValue = fetched_item.value.getValue();
86 ReaderLockHolder rlh(getStateLock());
87 auto hbl = ht.getLockedBucket(key);
88 StoredValue* v = fetchValidValue(hbl,
94 if (fetched_item.metaDataOnly) {
95 if (status == ENGINE_SUCCESS) {
96 if (v && v->isTempInitialItem()) {
97 ht.unlocked_restoreMeta(hbl.getHTLock(), *fetchedValue, *v);
99 } else if (status == ENGINE_KEY_ENOENT) {
100 if (v && v->isTempInitialItem()) {
103 /* If ENGINE_KEY_ENOENT is the status from storage and the temp
104 key is removed from hash table by the time bgfetch returns
105 (in case multiple bgfetch is scheduled for a key), we still
106 need to return ENGINE_SUCCESS to the memcached worker thread,
107 so that the worker thread can visit the ep-engine and figure
108 out the correct flow */
109 status = ENGINE_SUCCESS;
111 if (v && !v->isTempInitialItem()) {
112 status = ENGINE_SUCCESS;
116 bool restore = false;
117 if (v && v->isResident()) {
118 status = ENGINE_SUCCESS;
122 if (v && !v->isResident()) {
128 if (v->isTempInitialItem() || !v->isResident()) {
134 throw std::logic_error("Unknown eviction policy");
139 if (status == ENGINE_SUCCESS) {
140 ht.unlocked_restoreValue(
141 hbl.getHTLock(), *fetchedValue, *v);
142 if (!v->isResident()) {
143 throw std::logic_error(
144 "VBucket::completeBGFetchForSingleItem: "
145 "storedvalue (which has seqno " +
146 std::to_string(v->getBySeqno()) +
147 ") should be resident after calling "
150 } else if (status == ENGINE_KEY_ENOENT) {
152 if (eviction == FULL_EVICTION) {
153 // For the full eviction, we should notify
154 // ENGINE_SUCCESS to the memcached worker thread,
155 // so that the worker thread can visit the
156 // ep-engine and figure out the correct error
158 status = ENGINE_SUCCESS;
161 // underlying kvstore couldn't fetch requested data
162 // log returned error and notify TMPFAIL to client
163 LOG(EXTENSION_LOG_WARNING,
164 "Failed background fetch for vb:%" PRIu16
168 status = ENGINE_TMPFAIL;
172 } // locked scope ends
174 if (fetched_item.metaDataOnly) {
175 ++stats.bg_meta_fetched;
180 updateBGStats(fetched_item.initTime, startTime, ProcessClock::now());
184 vb_bgfetch_queue_t EPVBucket::getBGFetchItems() {
185 vb_bgfetch_queue_t fetches;
186 LockHolder lh(pendingBGFetchesLock);
187 fetches.swap(pendingBGFetches);
191 bool EPVBucket::hasPendingBGFetchItems() {
192 LockHolder lh(pendingBGFetchesLock);
193 return !pendingBGFetches.empty();
196 HighPriorityVBReqStatus EPVBucket::checkAddHighPriorityVBEntry(
197 uint64_t seqnoOrChkId,
199 HighPriorityVBNotify reqType) {
201 ++shard->highPriorityCount;
203 addHighPriorityVBEntry(seqnoOrChkId, cookie, reqType);
204 return HighPriorityVBReqStatus::RequestScheduled;
207 void EPVBucket::notifyHighPriorityRequests(EventuallyPersistentEngine& engine,
209 HighPriorityVBNotify notifyType) {
210 auto toNotify = getHighPriorityNotifications(engine, idNum, notifyType);
213 shard->highPriorityCount.fetch_sub(toNotify.size());
216 for (auto& notify : toNotify) {
217 engine.notifyIOComplete(notify.first, notify.second);
221 void EPVBucket::notifyAllPendingConnsFailed(EventuallyPersistentEngine& e) {
222 auto toNotify = tmpFailAndGetAllHpNotifies(e);
225 shard->highPriorityCount.fetch_sub(toNotify.size());
228 // Add all the pendingBGFetches to the toNotify map
230 LockHolder lh(pendingBGFetchesLock);
231 size_t num_of_deleted_pending_fetches = 0;
232 for (auto& bgf : pendingBGFetches) {
233 vb_bgfetch_item_ctx_t& bg_itm_ctx = bgf.second;
234 for (auto& bgitem : bg_itm_ctx.bgfetched_list) {
235 toNotify[bgitem->cookie] = ENGINE_NOT_MY_VBUCKET;
236 e.storeEngineSpecific(bgitem->cookie, nullptr);
237 ++num_of_deleted_pending_fetches;
240 stats.numRemainingBgItems.fetch_sub(num_of_deleted_pending_fetches);
241 pendingBGFetches.clear();
244 for (auto& notify : toNotify) {
245 e.notifyIOComplete(notify.first, notify.second);
251 size_t EPVBucket::getNumItems() const {
252 if (eviction == VALUE_ONLY) {
253 return ht.getNumInMemoryItems();
255 return ht.getNumItems();
259 ENGINE_ERROR_CODE EPVBucket::statsVKey(const DocKey& key,
261 EventuallyPersistentEngine& engine,
262 const int bgFetchDelay) {
263 auto hbl = ht.getLockedBucket(key);
264 StoredValue* v = fetchValidValue(hbl,
271 if (v->isDeleted() || v->isTempDeletedItem() ||
272 v->isTempNonExistentItem()) {
273 return ENGINE_KEY_ENOENT;
275 ++stats.numRemainingBgJobs;
276 ExecutorPool* iom = ExecutorPool::get();
277 ExTask task = new VKeyStatBGFetchTask(&engine,
285 return ENGINE_EWOULDBLOCK;
287 if (eviction == VALUE_ONLY) {
288 return ENGINE_KEY_ENOENT;
290 AddStatus rv = addTempStoredValue(hbl, key);
292 case AddStatus::NoMem:
293 return ENGINE_ENOMEM;
294 case AddStatus::Exists:
295 case AddStatus::UnDel:
296 case AddStatus::Success:
297 case AddStatus::AddTmpAndBgFetch:
298 // Since the hashtable bucket is locked, we shouldn't get here
299 throw std::logic_error(
300 "VBucket::statsVKey: "
301 "Invalid result from unlocked_addTempItem (" +
302 std::to_string(static_cast<uint16_t>(rv)) + ")");
304 case AddStatus::BgFetch: {
305 ++stats.numRemainingBgJobs;
306 ExecutorPool* iom = ExecutorPool::get();
307 ExTask task = new VKeyStatBGFetchTask(
308 &engine, key, getId(), -1, cookie, bgFetchDelay, false);
312 return ENGINE_EWOULDBLOCK;
317 void EPVBucket::completeStatsVKey(const DocKey& key,
318 const RememberingCallback<GetValue>& gcb) {
319 auto hbl = ht.getLockedBucket(key);
320 StoredValue* v = fetchValidValue(hbl,
326 if (v && v->isTempInitialItem()) {
327 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
328 ht.unlocked_restoreValue(
329 hbl.getHTLock(), *(gcb.val.getValue()), *v);
330 if (!v->isResident()) {
331 throw std::logic_error(
332 "VBucket::completeStatsVKey: "
333 "storedvalue (which has seqno:" +
334 std::to_string(v->getBySeqno()) +
335 ") should be resident after calling restoreValue()");
337 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
340 // underlying kvstore couldn't fetch requested data
341 // log returned error and notify TMPFAIL to client
342 LOG(EXTENSION_LOG_WARNING,
343 "VBucket::completeStatsVKey: "
344 "Failed background fetch for vb:%" PRIu16 ", seqno:%" PRIu64,
351 void EPVBucket::addStats(bool details, ADD_STAT add_stat, const void* c) {
352 _addStats(details, add_stat, c);
356 DBFileInfo fileInfo =
357 shard->getRWUnderlying()->getDbFileInfo(getId());
358 addStat("db_data_size", fileInfo.spaceUsed, add_stat, c);
359 addStat("db_file_size", fileInfo.fileSize, add_stat, c);
360 } catch (std::runtime_error& e) {
361 LOG(EXTENSION_LOG_WARNING,
362 "VBucket::addStats: Exception caught during getDbFileInfo "
363 "for vb:%" PRIu16 " - what(): %s",
370 protocol_binary_response_status EPVBucket::evictKey(const DocKey& key,
372 auto hbl = ht.getLockedBucket(key);
373 StoredValue* v = fetchValidValue(
374 hbl, key, WantsDeleted::No, TrackReference::No, QueueExpired::Yes);
377 if (eviction == VALUE_ONLY) {
379 return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
381 *msg = "Already ejected.";
382 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
385 if (v->isResident()) {
386 if (ht.unlocked_ejectItem(v, eviction)) {
389 // Add key to bloom filter in case of full eviction mode
390 if (eviction == FULL_EVICTION) {
393 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
395 *msg = "Can't eject: Dirty object.";
396 return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
399 *msg = "Already ejected.";
400 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
403 bool EPVBucket::pageOut(const HashTable::HashBucketLock& lh, StoredValue*& v) {
404 return ht.unlocked_ejectItem(v, eviction);
407 void EPVBucket::queueBackfillItem(queued_item& qi,
408 const GenerateBySeqno generateBySeqno) {
409 LockHolder lh(backfill.mutex);
410 if (GenerateBySeqno::Yes == generateBySeqno) {
411 qi->setBySeqno(checkpointManager.nextBySeqno());
413 checkpointManager.setBySeqno(qi->getBySeqno());
415 backfill.items.push(qi);
416 ++stats.diskQueueSize;
417 ++stats.vbBackfillQueueSize;
418 ++stats.totalEnqueued;
419 doStatsForQueueing(*qi, qi->size());
420 stats.memOverhead->fetch_add(sizeof(queued_item));
423 size_t EPVBucket::queueBGFetchItem(const DocKey& key,
424 std::unique_ptr<VBucketBGFetchItem> fetch,
425 BgFetcher* bgFetcher) {
426 LockHolder lh(pendingBGFetchesLock);
427 vb_bgfetch_item_ctx_t& bgfetch_itm_ctx = pendingBGFetches[key];
429 if (bgfetch_itm_ctx.bgfetched_list.empty()) {
430 bgfetch_itm_ctx.isMetaOnly = true;
433 if (!fetch->metaDataOnly) {
434 bgfetch_itm_ctx.isMetaOnly = false;
437 bgfetch_itm_ctx.bgfetched_list.push_back(std::move(fetch));
439 bgFetcher->addPendingVB(getId());
440 return pendingBGFetches.size();
443 std::tuple<StoredValue*, MutationStatus, VBNotifyCtx>
444 EPVBucket::updateStoredValue(const HashTable::HashBucketLock& hbl,
447 const VBQueueItemCtx* queueItmCtx,
449 MutationStatus status;
451 status = MutationStatus::WasDirty;
453 status = ht.unlocked_updateStoredValue(hbl.getHTLock(), v, itm);
457 return std::make_tuple(&v, status, queueDirty(v, *queueItmCtx));
459 return std::make_tuple(&v, status, VBNotifyCtx());
462 std::pair<StoredValue*, VBNotifyCtx> EPVBucket::addNewStoredValue(
463 const HashTable::HashBucketLock& hbl,
465 const VBQueueItemCtx* queueItmCtx) {
466 StoredValue* v = ht.unlocked_addNewStoredValue(hbl, itm);
469 return {v, queueDirty(*v, *queueItmCtx)};
472 return {v, VBNotifyCtx()};
475 std::tuple<StoredValue*, VBNotifyCtx> EPVBucket::softDeleteStoredValue(
476 const HashTable::HashBucketLock& hbl,
478 bool onlyMarkDeleted,
479 const VBQueueItemCtx& queueItmCtx,
481 ht.unlocked_softDelete(hbl.getHTLock(), v, onlyMarkDeleted);
483 if (queueItmCtx.genBySeqno == GenerateBySeqno::No) {
484 v.setBySeqno(bySeqno);
487 return std::make_tuple(&v, queueDirty(v, queueItmCtx));
490 void EPVBucket::bgFetch(const DocKey& key,
492 EventuallyPersistentEngine& engine,
493 const int bgFetchDelay,
495 if (multiBGFetchEnabled) {
496 // schedule to the current batch of background fetch of the given
498 size_t bgfetch_size = queueBGFetchItem(
500 std::make_unique<VBucketBGFetchItem>(cookie, isMeta),
501 getShard()->getBgFetcher());
503 getShard()->getBgFetcher()->notifyBGEvent();
505 LOG(EXTENSION_LOG_DEBUG,
506 "Queued a background fetch, now at %" PRIu64,
507 uint64_t(bgfetch_size));
509 ++stats.numRemainingBgJobs;
510 stats.maxRemainingBgJobs.store(
511 std::max(stats.maxRemainingBgJobs.load(),
512 stats.numRemainingBgJobs.load()));
513 ExecutorPool* iom = ExecutorPool::get();
514 ExTask task = new SingleBGFetcherTask(
515 &engine, key, getId(), cookie, isMeta, bgFetchDelay, false);
517 LOG(EXTENSION_LOG_DEBUG,
518 "Queued a background fetch, now at %" PRIu64,
519 uint64_t(stats.numRemainingBgJobs.load()));
523 /* [TBD]: Get rid of std::unique_lock<std::mutex> lock */
525 EPVBucket::addTempItemAndBGFetch(HashTable::HashBucketLock& hbl,
528 EventuallyPersistentEngine& engine,
531 bool isReplication) {
532 AddStatus rv = addTempStoredValue(hbl, key, isReplication);
534 case AddStatus::NoMem:
535 return ENGINE_ENOMEM;
537 case AddStatus::Exists:
538 case AddStatus::UnDel:
539 case AddStatus::Success:
540 case AddStatus::AddTmpAndBgFetch:
541 // Since the hashtable bucket is locked, we shouldn't get here
542 throw std::logic_error(
543 "VBucket::addTempItemAndBGFetch: "
544 "Invalid result from addTempItem: " +
545 std::to_string(static_cast<uint16_t>(rv)));
547 case AddStatus::BgFetch:
548 hbl.getHTLock().unlock();
549 bgFetch(key, cookie, engine, bgFetchDelay, metadataOnly);
551 return ENGINE_EWOULDBLOCK;
554 void EPVBucket::updateBGStats(const ProcessClock::time_point init,
555 const ProcessClock::time_point start,
556 const ProcessClock::time_point stop) {
557 ++stats.bgNumOperations;
559 std::chrono::duration_cast<std::chrono::nanoseconds>(start - init)
561 hrtime_t w = waitNs / 1000;
562 BlockTimer::log(waitNs, "bgwait", stats.timingLog);
563 stats.bgWaitHisto.add(w);
564 stats.bgWait.fetch_add(w);
565 atomic_setIfLess(stats.bgMinWait, w);
566 atomic_setIfBigger(stats.bgMaxWait, w);
569 std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start)
571 hrtime_t l = lNs / 1000;
572 BlockTimer::log(lNs, "bgload", stats.timingLog);
573 stats.bgLoadHisto.add(l);
574 stats.bgLoad.fetch_add(l);
575 atomic_setIfLess(stats.bgMinLoad, l);
576 atomic_setIfBigger(stats.bgMaxLoad, l);
579 GetValue EPVBucket::getInternalNonResident(const DocKey& key,
581 EventuallyPersistentEngine& engine,
583 get_options_t options,
584 const StoredValue& v) {
585 if (options & QUEUE_BG_FETCH) {
586 bgFetch(key, cookie, engine, bgFetchDelay);
587 } else if (options & get_options_t::ALLOW_META_ONLY) {
588 // You can't both ask for a background fetch and just the meta...
589 return GetValue(v.toItem(false, 0).release(),
597 NULL, ENGINE_EWOULDBLOCK, v.getBySeqno(), true, v.getNRUValue());
600 void EPVBucket::setupDeferredDeletion(const void* cookie) {
601 setDeferredDeletionCookie(cookie);
602 deferredDeletionFileRevision.store(
603 getShard()->getRWUnderlying()->prepareToDelete(getId()));
604 setDeferredDeletion(true);
607 void EPVBucket::scheduleDeferredDeletion(EventuallyPersistentEngine& engine) {
608 ExTask task = new VBucketMemoryAndDiskDeletionTask(engine, *shard, this);
609 ExecutorPool::get()->schedule(task);