6b1e05886d866ccb0fc41e619414694e3a894873
[ep-engine.git] / src / ep_vb.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "ep_vb.h"
19
20 #include "bgfetcher.h"
21 #include "ep_engine.h"
22 #include "executorpool.h"
23 #include "failover-table.h"
24 #include "kvshard.h"
25 #include "stored_value_factories.h"
26 #include "tasks.h"
27 #include "vbucketdeletiontask.h"
28
29 EPVBucket::EPVBucket(id_type i,
30                      vbucket_state_t newState,
31                      EPStats& st,
32                      CheckpointConfig& chkConfig,
33                      KVShard* kvshard,
34                      int64_t lastSeqno,
35                      uint64_t lastSnapStart,
36                      uint64_t lastSnapEnd,
37                      std::unique_ptr<FailoverTable> table,
38                      std::shared_ptr<Callback<id_type> > flusherCb,
39                      NewSeqnoCallback newSeqnoCb,
40                      Configuration& config,
41                      item_eviction_policy_t evictionPolicy,
42                      vbucket_state_t initState,
43                      uint64_t purgeSeqno,
44                      uint64_t maxCas,
45                      const std::string& collectionsManifest)
46     : VBucket(i,
47               newState,
48               st,
49               chkConfig,
50               lastSeqno,
51               lastSnapStart,
52               lastSnapEnd,
53               std::move(table),
54               flusherCb,
55               std::make_unique<StoredValueFactory>(st),
56               std::move(newSeqnoCb),
57               config,
58               evictionPolicy,
59               initState,
60               purgeSeqno,
61               maxCas,
62               collectionsManifest),
63       multiBGFetchEnabled(kvshard
64                                   ? kvshard->getROUnderlying()
65                                             ->getStorageProperties()
66                                             .hasEfficientGet()
67                                   : false),
68       shard(kvshard) {
69 }
70
71 EPVBucket::~EPVBucket() {
72     if (!pendingBGFetches.empty()) {
73         LOG(EXTENSION_LOG_WARNING,
74             "Have %ld pending BG fetches while destroying vbucket\n",
75             pendingBGFetches.size());
76     }
77 }
78
79 ENGINE_ERROR_CODE EPVBucket::completeBGFetchForSingleItem(
80         const DocKey& key,
81         const VBucketBGFetchItem& fetched_item,
82         const ProcessClock::time_point startTime) {
83     ENGINE_ERROR_CODE status = fetched_item.value.getStatus();
84     Item* fetchedValue = fetched_item.value.getValue();
85     { // locking scope
86         ReaderLockHolder rlh(getStateLock());
87         auto hbl = ht.getLockedBucket(key);
88         StoredValue* v = fetchValidValue(hbl,
89                                          key,
90                                          WantsDeleted::Yes,
91                                          TrackReference::Yes,
92                                          QueueExpired::Yes);
93
94         if (fetched_item.metaDataOnly) {
95             if (status == ENGINE_SUCCESS) {
96                 if (v && v->isTempInitialItem()) {
97                     ht.unlocked_restoreMeta(hbl.getHTLock(), *fetchedValue, *v);
98                 }
99             } else if (status == ENGINE_KEY_ENOENT) {
100                 if (v && v->isTempInitialItem()) {
101                     v->setNonExistent();
102                 }
103                 /* If ENGINE_KEY_ENOENT is the status from storage and the temp
104                  key is removed from hash table by the time bgfetch returns
105                  (in case multiple bgfetch is scheduled for a key), we still
106                  need to return ENGINE_SUCCESS to the memcached worker thread,
107                  so that the worker thread can visit the ep-engine and figure
108                  out the correct flow */
109                 status = ENGINE_SUCCESS;
110             } else {
111                 if (v && !v->isTempInitialItem()) {
112                     status = ENGINE_SUCCESS;
113                 }
114             }
115         } else {
116             bool restore = false;
117             if (v && v->isResident()) {
118                 status = ENGINE_SUCCESS;
119             } else {
120                 switch (eviction) {
121                 case VALUE_ONLY:
122                     if (v && !v->isResident()) {
123                         restore = true;
124                     }
125                     break;
126                 case FULL_EVICTION:
127                     if (v) {
128                         if (v->isTempInitialItem() || !v->isResident()) {
129                             restore = true;
130                         }
131                     }
132                     break;
133                 default:
134                     throw std::logic_error("Unknown eviction policy");
135                 }
136             }
137
138             if (restore) {
139                 if (status == ENGINE_SUCCESS) {
140                     ht.unlocked_restoreValue(
141                             hbl.getHTLock(), *fetchedValue, *v);
142                     if (!v->isResident()) {
143                         throw std::logic_error(
144                                 "VBucket::completeBGFetchForSingleItem: "
145                                 "storedvalue (which has seqno " +
146                                 std::to_string(v->getBySeqno()) +
147                                 ") should be resident after calling "
148                                 "restoreValue()");
149                     }
150                 } else if (status == ENGINE_KEY_ENOENT) {
151                     v->setNonExistent();
152                     if (eviction == FULL_EVICTION) {
153                         // For the full eviction, we should notify
154                         // ENGINE_SUCCESS to the memcached worker thread,
155                         // so that the worker thread can visit the
156                         // ep-engine and figure out the correct error
157                         // code.
158                         status = ENGINE_SUCCESS;
159                     }
160                 } else {
161                     // underlying kvstore couldn't fetch requested data
162                     // log returned error and notify TMPFAIL to client
163                     LOG(EXTENSION_LOG_WARNING,
164                         "Failed background fetch for vb:%" PRIu16
165                         ", seqno:%" PRIu64,
166                         getId(),
167                         v->getBySeqno());
168                     status = ENGINE_TMPFAIL;
169                 }
170             }
171         }
172     } // locked scope ends
173
174     if (fetched_item.metaDataOnly) {
175         ++stats.bg_meta_fetched;
176     } else {
177         ++stats.bg_fetched;
178     }
179
180     updateBGStats(fetched_item.initTime, startTime, ProcessClock::now());
181     return status;
182 }
183
184 vb_bgfetch_queue_t EPVBucket::getBGFetchItems() {
185     vb_bgfetch_queue_t fetches;
186     LockHolder lh(pendingBGFetchesLock);
187     fetches.swap(pendingBGFetches);
188     return fetches;
189 }
190
191 bool EPVBucket::hasPendingBGFetchItems() {
192     LockHolder lh(pendingBGFetchesLock);
193     return !pendingBGFetches.empty();
194 }
195
196 HighPriorityVBReqStatus EPVBucket::checkAddHighPriorityVBEntry(
197         uint64_t seqnoOrChkId,
198         const void* cookie,
199         HighPriorityVBNotify reqType) {
200     if (shard) {
201         ++shard->highPriorityCount;
202     }
203     addHighPriorityVBEntry(seqnoOrChkId, cookie, reqType);
204     return HighPriorityVBReqStatus::RequestScheduled;
205 }
206
207 void EPVBucket::notifyHighPriorityRequests(EventuallyPersistentEngine& engine,
208                                            uint64_t idNum,
209                                            HighPriorityVBNotify notifyType) {
210     auto toNotify = getHighPriorityNotifications(engine, idNum, notifyType);
211
212     if (shard) {
213         shard->highPriorityCount.fetch_sub(toNotify.size());
214     }
215
216     for (auto& notify : toNotify) {
217         engine.notifyIOComplete(notify.first, notify.second);
218     }
219 }
220
221 void EPVBucket::notifyAllPendingConnsFailed(EventuallyPersistentEngine& e) {
222     auto toNotify = tmpFailAndGetAllHpNotifies(e);
223
224     if (shard) {
225         shard->highPriorityCount.fetch_sub(toNotify.size());
226     }
227
228     // Add all the pendingBGFetches to the toNotify map
229     {
230         LockHolder lh(pendingBGFetchesLock);
231         size_t num_of_deleted_pending_fetches = 0;
232         for (auto& bgf : pendingBGFetches) {
233             vb_bgfetch_item_ctx_t& bg_itm_ctx = bgf.second;
234             for (auto& bgitem : bg_itm_ctx.bgfetched_list) {
235                 toNotify[bgitem->cookie] = ENGINE_NOT_MY_VBUCKET;
236                 e.storeEngineSpecific(bgitem->cookie, nullptr);
237                 ++num_of_deleted_pending_fetches;
238             }
239         }
240         stats.numRemainingBgItems.fetch_sub(num_of_deleted_pending_fetches);
241         pendingBGFetches.clear();
242     }
243
244     for (auto& notify : toNotify) {
245         e.notifyIOComplete(notify.first, notify.second);
246     }
247
248     fireAllOps(e);
249 }
250
251 size_t EPVBucket::getNumItems() const {
252     if (eviction == VALUE_ONLY) {
253         return ht.getNumInMemoryItems();
254     } else {
255         return ht.getNumItems();
256     }
257 }
258
259 ENGINE_ERROR_CODE EPVBucket::statsVKey(const DocKey& key,
260                                        const void* cookie,
261                                        EventuallyPersistentEngine& engine,
262                                        const int bgFetchDelay) {
263     auto hbl = ht.getLockedBucket(key);
264     StoredValue* v = fetchValidValue(hbl,
265                                      key,
266                                      WantsDeleted::Yes,
267                                      TrackReference::Yes,
268                                      QueueExpired::Yes);
269
270     if (v) {
271         if (v->isDeleted() || v->isTempDeletedItem() ||
272             v->isTempNonExistentItem()) {
273             return ENGINE_KEY_ENOENT;
274         }
275         ++stats.numRemainingBgJobs;
276         ExecutorPool* iom = ExecutorPool::get();
277         ExTask task = new VKeyStatBGFetchTask(&engine,
278                                               key,
279                                               getId(),
280                                               v->getBySeqno(),
281                                               cookie,
282                                               bgFetchDelay,
283                                               false);
284         iom->schedule(task);
285         return ENGINE_EWOULDBLOCK;
286     } else {
287         if (eviction == VALUE_ONLY) {
288             return ENGINE_KEY_ENOENT;
289         } else {
290             AddStatus rv = addTempStoredValue(hbl, key);
291             switch (rv) {
292             case AddStatus::NoMem:
293                 return ENGINE_ENOMEM;
294             case AddStatus::Exists:
295             case AddStatus::UnDel:
296             case AddStatus::Success:
297             case AddStatus::AddTmpAndBgFetch:
298                 // Since the hashtable bucket is locked, we shouldn't get here
299                 throw std::logic_error(
300                         "VBucket::statsVKey: "
301                         "Invalid result from unlocked_addTempItem (" +
302                         std::to_string(static_cast<uint16_t>(rv)) + ")");
303
304             case AddStatus::BgFetch: {
305                 ++stats.numRemainingBgJobs;
306                 ExecutorPool* iom = ExecutorPool::get();
307                 ExTask task = new VKeyStatBGFetchTask(
308                         &engine, key, getId(), -1, cookie, bgFetchDelay, false);
309                 iom->schedule(task);
310             }
311             }
312             return ENGINE_EWOULDBLOCK;
313         }
314     }
315 }
316
317 void EPVBucket::completeStatsVKey(const DocKey& key,
318                                   const RememberingCallback<GetValue>& gcb) {
319     auto hbl = ht.getLockedBucket(key);
320     StoredValue* v = fetchValidValue(hbl,
321                                      key,
322                                      WantsDeleted::Yes,
323                                      TrackReference::Yes,
324                                      QueueExpired::Yes);
325
326     if (v && v->isTempInitialItem()) {
327         if (gcb.val.getStatus() == ENGINE_SUCCESS) {
328             ht.unlocked_restoreValue(
329                     hbl.getHTLock(), *(gcb.val.getValue()), *v);
330             if (!v->isResident()) {
331                 throw std::logic_error(
332                         "VBucket::completeStatsVKey: "
333                         "storedvalue (which has seqno:" +
334                         std::to_string(v->getBySeqno()) +
335                         ") should be resident after calling restoreValue()");
336             }
337         } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
338             v->setNonExistent();
339         } else {
340             // underlying kvstore couldn't fetch requested data
341             // log returned error and notify TMPFAIL to client
342             LOG(EXTENSION_LOG_WARNING,
343                 "VBucket::completeStatsVKey: "
344                 "Failed background fetch for vb:%" PRIu16 ", seqno:%" PRIu64,
345                 getId(),
346                 v->getBySeqno());
347         }
348     }
349 }
350
351 void EPVBucket::addStats(bool details, ADD_STAT add_stat, const void* c) {
352     _addStats(details, add_stat, c);
353
354     if (details) {
355         try {
356             DBFileInfo fileInfo =
357                     shard->getRWUnderlying()->getDbFileInfo(getId());
358             addStat("db_data_size", fileInfo.spaceUsed, add_stat, c);
359             addStat("db_file_size", fileInfo.fileSize, add_stat, c);
360         } catch (std::runtime_error& e) {
361             LOG(EXTENSION_LOG_WARNING,
362                 "VBucket::addStats: Exception caught during getDbFileInfo "
363                 "for vb:%" PRIu16 " - what(): %s",
364                 getId(),
365                 e.what());
366         }
367     }
368 }
369
370 protocol_binary_response_status EPVBucket::evictKey(const DocKey& key,
371                                                     const char** msg) {
372     auto hbl = ht.getLockedBucket(key);
373     StoredValue* v = fetchValidValue(
374             hbl, key, WantsDeleted::No, TrackReference::No, QueueExpired::Yes);
375
376     if (!v) {
377         if (eviction == VALUE_ONLY) {
378             *msg = "Not found.";
379             return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
380         }
381         *msg = "Already ejected.";
382         return PROTOCOL_BINARY_RESPONSE_SUCCESS;
383     }
384
385     if (v->isResident()) {
386         if (ht.unlocked_ejectItem(v, eviction)) {
387             *msg = "Ejected.";
388
389             // Add key to bloom filter in case of full eviction mode
390             if (eviction == FULL_EVICTION) {
391                 addToFilter(key);
392             }
393             return PROTOCOL_BINARY_RESPONSE_SUCCESS;
394         }
395         *msg = "Can't eject: Dirty object.";
396         return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
397     }
398
399     *msg = "Already ejected.";
400     return PROTOCOL_BINARY_RESPONSE_SUCCESS;
401 }
402
403 bool EPVBucket::pageOut(const HashTable::HashBucketLock& lh, StoredValue*& v) {
404     return ht.unlocked_ejectItem(v, eviction);
405 }
406
407 void EPVBucket::queueBackfillItem(queued_item& qi,
408                                   const GenerateBySeqno generateBySeqno) {
409     LockHolder lh(backfill.mutex);
410     if (GenerateBySeqno::Yes == generateBySeqno) {
411         qi->setBySeqno(checkpointManager.nextBySeqno());
412     } else {
413         checkpointManager.setBySeqno(qi->getBySeqno());
414     }
415     backfill.items.push(qi);
416     ++stats.diskQueueSize;
417     ++stats.vbBackfillQueueSize;
418     ++stats.totalEnqueued;
419     doStatsForQueueing(*qi, qi->size());
420     stats.memOverhead->fetch_add(sizeof(queued_item));
421 }
422
423 size_t EPVBucket::queueBGFetchItem(const DocKey& key,
424                                    std::unique_ptr<VBucketBGFetchItem> fetch,
425                                    BgFetcher* bgFetcher) {
426     LockHolder lh(pendingBGFetchesLock);
427     vb_bgfetch_item_ctx_t& bgfetch_itm_ctx = pendingBGFetches[key];
428
429     if (bgfetch_itm_ctx.bgfetched_list.empty()) {
430         bgfetch_itm_ctx.isMetaOnly = true;
431     }
432
433     if (!fetch->metaDataOnly) {
434         bgfetch_itm_ctx.isMetaOnly = false;
435     }
436
437     bgfetch_itm_ctx.bgfetched_list.push_back(std::move(fetch));
438
439     bgFetcher->addPendingVB(getId());
440     return pendingBGFetches.size();
441 }
442
443 std::tuple<StoredValue*, MutationStatus, VBNotifyCtx>
444 EPVBucket::updateStoredValue(const HashTable::HashBucketLock& hbl,
445                              StoredValue& v,
446                              const Item& itm,
447                              const VBQueueItemCtx* queueItmCtx) {
448     MutationStatus status =
449             ht.unlocked_updateStoredValue(hbl.getHTLock(), v, itm);
450
451     if (queueItmCtx) {
452         return std::make_tuple(&v, status, queueDirty(v, *queueItmCtx));
453     }
454     return std::make_tuple(&v, status, VBNotifyCtx());
455 }
456
457 std::pair<StoredValue*, VBNotifyCtx> EPVBucket::addNewStoredValue(
458         const HashTable::HashBucketLock& hbl,
459         const Item& itm,
460         const VBQueueItemCtx* queueItmCtx) {
461     StoredValue* v = ht.unlocked_addNewStoredValue(hbl, itm);
462
463     if (queueItmCtx) {
464         return {v, queueDirty(*v, *queueItmCtx)};
465     }
466
467     return {v, VBNotifyCtx()};
468 }
469
470 std::tuple<StoredValue*, VBNotifyCtx> EPVBucket::softDeleteStoredValue(
471         const HashTable::HashBucketLock& hbl,
472         StoredValue& v,
473         bool onlyMarkDeleted,
474         const VBQueueItemCtx& queueItmCtx,
475         uint64_t bySeqno) {
476     ht.unlocked_softDelete(hbl.getHTLock(), v, onlyMarkDeleted);
477
478     if (queueItmCtx.genBySeqno == GenerateBySeqno::No) {
479         v.setBySeqno(bySeqno);
480     }
481
482     return std::make_tuple(&v, queueDirty(v, queueItmCtx));
483 }
484
485 void EPVBucket::bgFetch(const DocKey& key,
486                         const void* cookie,
487                         EventuallyPersistentEngine& engine,
488                         const int bgFetchDelay,
489                         const bool isMeta) {
490     if (multiBGFetchEnabled) {
491         // schedule to the current batch of background fetch of the given
492         // vbucket
493         size_t bgfetch_size = queueBGFetchItem(
494                 key,
495                 std::make_unique<VBucketBGFetchItem>(cookie, isMeta),
496                 getShard()->getBgFetcher());
497         if (getShard()) {
498             getShard()->getBgFetcher()->notifyBGEvent();
499         }
500         LOG(EXTENSION_LOG_DEBUG,
501             "Queued a background fetch, now at %" PRIu64,
502             uint64_t(bgfetch_size));
503     } else {
504         ++stats.numRemainingBgJobs;
505         stats.maxRemainingBgJobs.store(
506                 std::max(stats.maxRemainingBgJobs.load(),
507                          stats.numRemainingBgJobs.load()));
508         ExecutorPool* iom = ExecutorPool::get();
509         ExTask task = new SingleBGFetcherTask(
510                 &engine, key, getId(), cookie, isMeta, bgFetchDelay, false);
511         iom->schedule(task);
512         LOG(EXTENSION_LOG_DEBUG,
513             "Queued a background fetch, now at %" PRIu64,
514             uint64_t(stats.numRemainingBgJobs.load()));
515     }
516 }
517
518 /* [TBD]: Get rid of std::unique_lock<std::mutex> lock */
519 ENGINE_ERROR_CODE
520 EPVBucket::addTempItemAndBGFetch(HashTable::HashBucketLock& hbl,
521                                  const DocKey& key,
522                                  const void* cookie,
523                                  EventuallyPersistentEngine& engine,
524                                  int bgFetchDelay,
525                                  bool metadataOnly,
526                                  bool isReplication) {
527     AddStatus rv = addTempStoredValue(hbl, key, isReplication);
528     switch (rv) {
529     case AddStatus::NoMem:
530         return ENGINE_ENOMEM;
531
532     case AddStatus::Exists:
533     case AddStatus::UnDel:
534     case AddStatus::Success:
535     case AddStatus::AddTmpAndBgFetch:
536         // Since the hashtable bucket is locked, we shouldn't get here
537         throw std::logic_error(
538                 "VBucket::addTempItemAndBGFetch: "
539                 "Invalid result from addTempItem: " +
540                 std::to_string(static_cast<uint16_t>(rv)));
541
542     case AddStatus::BgFetch:
543         hbl.getHTLock().unlock();
544         bgFetch(key, cookie, engine, bgFetchDelay, metadataOnly);
545     }
546     return ENGINE_EWOULDBLOCK;
547 }
548
549 void EPVBucket::updateBGStats(const ProcessClock::time_point init,
550                               const ProcessClock::time_point start,
551                               const ProcessClock::time_point stop) {
552     ++stats.bgNumOperations;
553     hrtime_t waitNs =
554             std::chrono::duration_cast<std::chrono::nanoseconds>(start - init)
555                     .count();
556     hrtime_t w = waitNs / 1000;
557     BlockTimer::log(waitNs, "bgwait", stats.timingLog);
558     stats.bgWaitHisto.add(w);
559     stats.bgWait.fetch_add(w);
560     atomic_setIfLess(stats.bgMinWait, w);
561     atomic_setIfBigger(stats.bgMaxWait, w);
562
563     hrtime_t lNs =
564             std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start)
565                     .count();
566     hrtime_t l = lNs / 1000;
567     BlockTimer::log(lNs, "bgload", stats.timingLog);
568     stats.bgLoadHisto.add(l);
569     stats.bgLoad.fetch_add(l);
570     atomic_setIfLess(stats.bgMinLoad, l);
571     atomic_setIfBigger(stats.bgMaxLoad, l);
572 }
573
574 GetValue EPVBucket::getInternalNonResident(const DocKey& key,
575                                            const void* cookie,
576                                            EventuallyPersistentEngine& engine,
577                                            int bgFetchDelay,
578                                            get_options_t options,
579                                            const StoredValue& v) {
580     if (options & QUEUE_BG_FETCH) {
581         bgFetch(key, cookie, engine, bgFetchDelay);
582     } else if (options & get_options_t::ALLOW_META_ONLY) {
583         // You can't both ask for a background fetch and just the meta...
584         return GetValue(v.toItem(false, 0).release(),
585                         ENGINE_SUCCESS,
586                         v.getBySeqno(),
587                         true,
588                         v.getNRUValue());
589     }
590
591     return GetValue(
592             NULL, ENGINE_EWOULDBLOCK, v.getBySeqno(), true, v.getNRUValue());
593 }
594
595 void EPVBucket::setupDeferredDeletion(const void* cookie) {
596     setDeferredDeletionCookie(cookie);
597     deferredDeletionFileRevision.store(
598             getShard()->getRWUnderlying()->prepareToDelete(getId()));
599     setDeferredDeletion(true);
600 }
601
602 void EPVBucket::scheduleDeferredDeletion(EventuallyPersistentEngine& engine) {
603     ExTask task = new VBucketMemoryAndDiskDeletionTask(engine, *shard, this);
604     ExecutorPool::get()->schedule(task);
605 }