5e29ccc619edebd2c34e34e87b28cd7819e79384
[ep-engine.git] / src / kv_bucket.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <string.h>
21 #include <time.h>
22
23 #include <fstream>
24 #include <functional>
25 #include <iostream>
26 #include <map>
27 #include <sstream>
28 #include <string>
29 #include <utility>
30 #include <vector>
31
32 #include <phosphor/phosphor.h>
33 #include <platform/make_unique.h>
34
35 #include "access_scanner.h"
36 #include "checkpoint_remover.h"
37 #include "conflict_resolution.h"
38 #include "dcp/dcpconnmap.h"
39 #include "defragmenter.h"
40 #include "kv_bucket.h"
41 #include "ep_engine.h"
42 #include "ext_meta_parser.h"
43 #include "failover-table.h"
44 #include "flusher.h"
45 #include "htresizer.h"
46 #include "kvshard.h"
47 #include "kvstore.h"
48 #include "locks.h"
49 #include "mutation_log.h"
50 #include "warmup.h"
51 #include "connmap.h"
52 #include "replicationthrottle.h"
53 #include "statwriter.h"
54 #include "tapconnmap.h"
55 #include "vbucketmemorydeletiontask.h"
56 #include "vb_count_visitor.h"
57
58 class StatsValueChangeListener : public ValueChangedListener {
59 public:
60     StatsValueChangeListener(EPStats& st, KVBucket& str)
61         : stats(st), store(str) {
62         // EMPTY
63     }
64
65     virtual void sizeValueChanged(const std::string &key, size_t value) {
66         if (key.compare("max_size") == 0) {
67             stats.setMaxDataSize(value);
68             store.getEPEngine().getDcpConnMap(). \
69                                      updateMaxActiveSnoozingBackfills(value);
70             size_t low_wat = static_cast<size_t>
71                     (static_cast<double>(value) * stats.mem_low_wat_percent);
72             size_t high_wat = static_cast<size_t>
73                     (static_cast<double>(value) * stats.mem_high_wat_percent);
74             stats.mem_low_wat.store(low_wat);
75             stats.mem_high_wat.store(high_wat);
76             store.setCursorDroppingLowerUpperThresholds(value);
77         } else if (key.compare("mem_low_wat") == 0) {
78             stats.mem_low_wat.store(value);
79             stats.mem_low_wat_percent.store(
80                                     (double)(value) / stats.getMaxDataSize());
81         } else if (key.compare("mem_high_wat") == 0) {
82             stats.mem_high_wat.store(value);
83             stats.mem_high_wat_percent.store(
84                                     (double)(value) / stats.getMaxDataSize());
85         } else if (key.compare("replication_throttle_threshold") == 0) {
86             stats.replicationThrottleThreshold.store(
87                                           static_cast<double>(value) / 100.0);
88         } else if (key.compare("warmup_min_memory_threshold") == 0) {
89             stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
90         } else if (key.compare("warmup_min_items_threshold") == 0) {
91             stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
92         } else {
93             LOG(EXTENSION_LOG_WARNING,
94                 "Failed to change value for unknown variable, %s\n",
95                 key.c_str());
96         }
97     }
98
99 private:
100     EPStats& stats;
101     KVBucket& store;
102 };
103
104 /**
105  * A configuration value changed listener that responds to ep-engine
106  * parameter changes by invoking engine-specific methods on
107  * configuration change events.
108  */
109 class EPStoreValueChangeListener : public ValueChangedListener {
110 public:
111     EPStoreValueChangeListener(KVBucket& st) : store(st) {
112     }
113
114     virtual void sizeValueChanged(const std::string &key, size_t value) {
115         if (key.compare("bg_fetch_delay") == 0) {
116             store.setBGFetchDelay(static_cast<uint32_t>(value));
117         } else if (key.compare("compaction_write_queue_cap") == 0) {
118             store.setCompactionWriteQueueCap(value);
119         } else if (key.compare("exp_pager_stime") == 0) {
120             store.setExpiryPagerSleeptime(value);
121         } else if (key.compare("alog_sleep_time") == 0) {
122             store.setAccessScannerSleeptime(value, false);
123         } else if (key.compare("alog_task_time") == 0) {
124             store.resetAccessScannerStartTime();
125         } else if (key.compare("mutation_mem_threshold") == 0) {
126             double mem_threshold = static_cast<double>(value) / 100;
127             StoredValue::setMutationMemoryThreshold(mem_threshold);
128         } else if (key.compare("backfill_mem_threshold") == 0) {
129             double backfill_threshold = static_cast<double>(value) / 100;
130             store.setBackfillMemoryThreshold(backfill_threshold);
131         } else if (key.compare("compaction_exp_mem_threshold") == 0) {
132             store.setCompactionExpMemThreshold(value);
133         } else if (key.compare("replication_throttle_cap_pcnt") == 0) {
134             store.getEPEngine().getReplicationThrottle().setCapPercent(value);
135         } else {
136             LOG(EXTENSION_LOG_WARNING,
137                 "Failed to change value for unknown variable, %s\n",
138                 key.c_str());
139         }
140     }
141
142     virtual void ssizeValueChanged(const std::string& key, ssize_t value) {
143         if (key.compare("exp_pager_initial_run_time") == 0) {
144             store.setExpiryPagerTasktime(value);
145         } else if (key.compare("replication_throttle_queue_cap") == 0) {
146             store.getEPEngine().getReplicationThrottle().setQueueCap(value);
147         }
148     }
149
150     virtual void booleanValueChanged(const std::string &key, bool value) {
151         if (key.compare("access_scanner_enabled") == 0) {
152             if (value) {
153                 store.enableAccessScannerTask();
154             } else {
155                 store.disableAccessScannerTask();
156             }
157         } else if (key.compare("bfilter_enabled") == 0) {
158             store.setAllBloomFilters(value);
159         } else if (key.compare("exp_pager_enabled") == 0) {
160             if (value) {
161                 store.enableExpiryPager();
162             } else {
163                 store.disableExpiryPager();
164             }
165         }
166     }
167
168     virtual void floatValueChanged(const std::string &key, float value) {
169         if (key.compare("bfilter_residency_threshold") == 0) {
170             store.setBfiltersResidencyThreshold(value);
171         } else if (key.compare("dcp_min_compression_ratio") == 0) {
172             store.getEPEngine().updateDcpMinCompressionRatio(value);
173         }
174     }
175
176 private:
177     KVBucket& store;
178 };
179
180 /**
181  * Callback class used by EpStore, for adding relevant keys
182  * to bloomfilter during compaction.
183  */
184 class BloomFilterCallback : public Callback<uint16_t&, const DocKey&, bool&> {
185 public:
186     BloomFilterCallback(KVBucket& eps)
187         : store(eps) {
188     }
189
190     void callback(uint16_t& vbucketId, const DocKey& key, bool& isDeleted) {
191         VBucketPtr vb = store.getVBucket(vbucketId);
192         if (vb) {
193             /* Check if a temporary filter has been initialized. If not,
194              * initialize it. If initialization fails, throw an exception
195              * to the caller and let the caller deal with it.
196              */
197             bool tempFilterInitialized = vb->isTempFilterAvailable();
198             if (!tempFilterInitialized) {
199                 tempFilterInitialized = initTempFilter(vbucketId);
200             }
201
202             if (!tempFilterInitialized) {
203                 throw std::runtime_error("BloomFilterCallback::callback: Failed "
204                     "to initialize temporary filter for vbucket: " +
205                     std::to_string(vbucketId));
206             }
207
208             if (store.getItemEvictionPolicy() == VALUE_ONLY) {
209                 /**
210                  * VALUE-ONLY EVICTION POLICY
211                  * Consider deleted items only.
212                  */
213                 if (isDeleted) {
214                     vb->addToTempFilter(key);
215                 }
216             } else {
217                 /**
218                  * FULL EVICTION POLICY
219                  * If vbucket's resident ratio is found to be less than
220                  * the residency threshold, consider all items, otherwise
221                  * consider deleted and non-resident items only.
222                  */
223                 bool residentRatioLessThanThreshold =
224                         vb->isResidentRatioUnderThreshold(
225                                 store.getBfiltersResidencyThreshold());
226                  if (residentRatioLessThanThreshold) {
227                      vb->addToTempFilter(key);
228                  } else {
229                      if (isDeleted || !store.isMetaDataResident(vb, key)) {
230                          vb->addToTempFilter(key);
231                      }
232                  }
233             }
234         }
235     }
236
237 private:
238     bool initTempFilter(uint16_t vbucketId);
239     KVBucket& store;
240 };
241
242 bool BloomFilterCallback::initTempFilter(uint16_t vbucketId) {
243     Configuration& config = store.getEPEngine().getConfiguration();
244     VBucketPtr vb = store.getVBucket(vbucketId);
245     if (!vb) {
246         return false;
247     }
248
249     size_t initial_estimation = config.getBfilterKeyCount();
250     size_t estimated_count;
251     size_t num_deletes = store.getROUnderlying(vbucketId)->
252                                          getNumPersistedDeletes(vbucketId);
253     item_eviction_policy_t eviction_policy = store.getItemEvictionPolicy();
254     if (eviction_policy == VALUE_ONLY) {
255         /**
256          * VALUE-ONLY EVICTION POLICY
257          * Obtain number of persisted deletes from underlying kvstore.
258          * Bloomfilter's estimated_key_count = 1.25 * deletes
259          */
260         estimated_count = round(1.25 * num_deletes);
261     } else {
262         /**
263          * FULL EVICTION POLICY
264          * First determine if the resident ratio of vbucket is less than
265          * the threshold from configuration.
266          */
267         bool residentRatioAlert = vb->isResidentRatioUnderThreshold(
268                 store.getBfiltersResidencyThreshold());
269
270         /**
271          * Based on resident ratio against threshold, estimate count.
272          *
273          * 1. If resident ratio is greater than the threshold:
274          * Obtain number of persisted deletes from underlying kvstore.
275          * Obtain number of non-resident-items for vbucket.
276          * Bloomfilter's estimated_key_count =
277          *                              1.25 * (deletes + non-resident)
278          *
279          * 2. Otherwise:
280          * Obtain number of items for vbucket.
281          * Bloomfilter's estimated_key_count =
282          *                              1.25 * (num_items)
283          */
284
285          if (residentRatioAlert) {
286              estimated_count = round(1.25 * vb->getNumItems());
287          } else {
288              estimated_count = round(1.25 * (num_deletes +
289                                       vb->getNumNonResidentItems()));
290          }
291     }
292
293     if (estimated_count < initial_estimation) {
294         estimated_count = initial_estimation;
295     }
296
297     vb->initTempFilter(estimated_count, config.getBfilterFpProb());
298
299     return true;
300 }
301
302 class ExpiredItemsCallback : public Callback<uint16_t&, const DocKey&, uint64_t&,
303                                              time_t&> {
304     public:
305         ExpiredItemsCallback(KVBucket& store)
306             : epstore(store) { }
307
308         void callback(uint16_t& vbid, const DocKey& key, uint64_t& revSeqno,
309                       time_t& startTime) {
310             if (epstore.compactionCanExpireItems()) {
311                 epstore.deleteExpiredItem(
312                         vbid, key, startTime, revSeqno, ExpireBy::Compactor);
313             }
314         }
315
316     private:
317         KVBucket& epstore;
318 };
319
320 class PendingOpsNotification : public GlobalTask {
321 public:
322     PendingOpsNotification(EventuallyPersistentEngine& e, VBucketPtr& vb)
323         : GlobalTask(&e, TaskId::PendingOpsNotification, 0, false),
324           engine(e),
325           vbucket(vb),
326           description("Notify pending operations for vbucket " +
327                       std::to_string(vbucket->getId())) {
328     }
329
330     cb::const_char_buffer getDescription() {
331         return description;
332     }
333
334     bool run(void) {
335         TRACE_EVENT("ep-engine/task", "PendingOpsNotification",
336                      vbucket->getId());
337         vbucket->fireAllOps(engine);
338         return false;
339     }
340
341 private:
342     EventuallyPersistentEngine &engine;
343     VBucketPtr vbucket;
344     const std::string description;
345 };
346
347 KVBucket::KVBucket(EventuallyPersistentEngine& theEngine)
348     : engine(theEngine),
349       stats(engine.getEpStats()),
350       vbMap(theEngine.getConfiguration(), *this),
351       defragmenterTask(NULL),
352       diskDeleteAll(false),
353       bgFetchDelay(0),
354       backfillMemoryThreshold(0.95),
355       statsSnapshotTaskId(0),
356       lastTransTimePerItem(0) {
357     cachedResidentRatio.activeRatio.store(0);
358     cachedResidentRatio.replicaRatio.store(0);
359
360     Configuration &config = engine.getConfiguration();
361     for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
362         accessLog.emplace_back(
363                 config.getAlogPath() + "." + std::to_string(i),
364                 config.getAlogBlockSize());
365     }
366
367
368     const size_t size = GlobalTask::allTaskIds.size();
369     stats.schedulingHisto.resize(size);
370     stats.taskRuntimeHisto.resize(size);
371
372     for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
373         stats.schedulingHisto[i].reset();
374         stats.taskRuntimeHisto[i].reset();
375     }
376
377     ExecutorPool::get()->registerTaskable(ObjectRegistry::getCurrentEngine()->getTaskable());
378
379     size_t num_vbs = config.getMaxVbuckets();
380     vb_mutexes = new std::mutex[num_vbs];
381
382     *stats.memOverhead = sizeof(KVBucket);
383
384     stats.setMaxDataSize(config.getMaxSize());
385     config.addValueChangedListener("max_size",
386                                    new StatsValueChangeListener(stats, *this));
387     getEPEngine().getDcpConnMap().updateMaxActiveSnoozingBackfills(
388                                                         config.getMaxSize());
389
390     stats.mem_low_wat.store(config.getMemLowWat());
391     config.addValueChangedListener("mem_low_wat",
392                                    new StatsValueChangeListener(stats, *this));
393     stats.mem_low_wat_percent.store(
394                 (double)(stats.mem_low_wat.load()) / stats.getMaxDataSize());
395
396     stats.mem_high_wat.store(config.getMemHighWat());
397     config.addValueChangedListener("mem_high_wat",
398                                    new StatsValueChangeListener(stats, *this));
399     stats.mem_high_wat_percent.store(
400                 (double)(stats.mem_high_wat.load()) / stats.getMaxDataSize());
401
402     setCursorDroppingLowerUpperThresholds(config.getMaxSize());
403
404     stats.replicationThrottleThreshold.store(static_cast<double>
405                                     (config.getReplicationThrottleThreshold())
406                                      / 100.0);
407     config.addValueChangedListener("replication_throttle_threshold",
408                                    new StatsValueChangeListener(stats, *this));
409
410     stats.replicationThrottleWriteQueueCap.store(
411                                     config.getReplicationThrottleQueueCap());
412     config.addValueChangedListener("replication_throttle_queue_cap",
413                                    new EPStoreValueChangeListener(*this));
414     config.addValueChangedListener("replication_throttle_cap_pcnt",
415                                    new EPStoreValueChangeListener(*this));
416
417     setBGFetchDelay(config.getBgFetchDelay());
418     config.addValueChangedListener("bg_fetch_delay",
419                                    new EPStoreValueChangeListener(*this));
420
421     stats.warmupMemUsedCap.store(static_cast<double>
422                                (config.getWarmupMinMemoryThreshold()) / 100.0);
423     config.addValueChangedListener("warmup_min_memory_threshold",
424                                    new StatsValueChangeListener(stats, *this));
425     stats.warmupNumReadCap.store(static_cast<double>
426                                 (config.getWarmupMinItemsThreshold()) / 100.0);
427     config.addValueChangedListener("warmup_min_items_threshold",
428                                    new StatsValueChangeListener(stats, *this));
429
430     double mem_threshold = static_cast<double>
431                                       (config.getMutationMemThreshold()) / 100;
432     StoredValue::setMutationMemoryThreshold(mem_threshold);
433     config.addValueChangedListener("mutation_mem_threshold",
434                                    new EPStoreValueChangeListener(*this));
435
436     double backfill_threshold = static_cast<double>
437                                       (config.getBackfillMemThreshold()) / 100;
438     setBackfillMemoryThreshold(backfill_threshold);
439     config.addValueChangedListener("backfill_mem_threshold",
440                                    new EPStoreValueChangeListener(*this));
441
442     config.addValueChangedListener("bfilter_enabled",
443                                    new EPStoreValueChangeListener(*this));
444
445     bfilterResidencyThreshold = config.getBfilterResidencyThreshold();
446     config.addValueChangedListener("bfilter_residency_threshold",
447                                    new EPStoreValueChangeListener(*this));
448
449     compactionExpMemThreshold = config.getCompactionExpMemThreshold();
450     config.addValueChangedListener("compaction_exp_mem_threshold",
451                                    new EPStoreValueChangeListener(*this));
452
453     compactionWriteQueueCap = config.getCompactionWriteQueueCap();
454     config.addValueChangedListener("compaction_write_queue_cap",
455                                    new EPStoreValueChangeListener(*this));
456
457     config.addValueChangedListener("dcp_min_compression_ratio",
458                                    new EPStoreValueChangeListener(*this));
459
460     if (config.isWarmup()) {
461         warmupTask = std::make_unique<Warmup>(*this, config);
462     }
463 }
464
465 bool KVBucket::initialize() {
466     // We should nuke everything unless we want warmup
467     Configuration &config = engine.getConfiguration();
468     if (!config.isWarmup()) {
469         reset();
470     }
471
472     if (warmupTask) {
473         warmupTask->start();
474     } else {
475         // No warmup, immediately online the bucket.
476         warmupCompleted();
477     }
478
479     // Always create the item pager; but leave scheduling up to the specific
480     // KVBucket subclasses.
481     itemPagerTask = new ItemPager(&engine, stats);
482
483     initializeExpiryPager(config);
484
485     ExTask htrTask = make_STRCPtr<HashtableResizerTask>(this, 10);
486     ExecutorPool::get()->schedule(htrTask);
487
488     size_t checkpointRemoverInterval = config.getChkRemoverStime();
489     chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
490                                                    checkpointRemoverInterval);
491     ExecutorPool::get()->schedule(chkTask);
492
493     ExTask workloadMonitorTask = make_STRCPtr<WorkLoadMonitor>(&engine, false);
494     ExecutorPool::get()->schedule(workloadMonitorTask);
495
496 #if HAVE_JEMALLOC
497     /* Only create the defragmenter task if we have an underlying memory
498      * allocator which can facilitate defragmenting memory.
499      */
500     defragmenterTask = new DefragmenterTask(&engine, stats);
501     ExecutorPool::get()->schedule(defragmenterTask);
502 #endif
503
504     return true;
505 }
506
507 void KVBucket::deinitialize() {
508     stopWarmup();
509     ExecutorPool::get()->stopTaskGroup(engine.getTaskable().getGID(),
510                                        NONIO_TASK_IDX, stats.forceShutdown);
511
512     ExecutorPool::get()->cancel(statsSnapshotTaskId);
513
514     {
515         LockHolder lh(accessScanner.mutex);
516         ExecutorPool::get()->cancel(accessScanner.task);
517     }
518
519     ExecutorPool::get()->unregisterTaskable(engine.getTaskable(),
520                                             stats.forceShutdown);
521 }
522
523 KVBucket::~KVBucket() {
524     delete [] vb_mutexes;
525     defragmenterTask.reset();
526 }
527
528 const Flusher* KVBucket::getFlusher(uint16_t shardId) {
529     return vbMap.shards[shardId]->getFlusher();
530 }
531
532 Warmup* KVBucket::getWarmup(void) const {
533     return warmupTask.get();
534 }
535
536 bool KVBucket::pauseFlusher() {
537     // Nothing do to - no flusher in this class
538     return false;
539 }
540
541 bool KVBucket::resumeFlusher() {
542     // Nothing do to - no flusher in this class
543     return false;
544 }
545
546 void KVBucket::wakeUpFlusher() {
547     // Nothing do to - no flusher in this class
548 }
549
550 protocol_binary_response_status KVBucket::evictKey(const DocKey& key,
551                                                    VBucket::id_type vbucket,
552                                                    const char** msg) {
553     VBucketPtr vb = getVBucket(vbucket);
554     if (!vb || (vb->getState() != vbucket_state_active)) {
555         return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
556     }
557
558     return vb->evictKey(key, msg);
559 }
560
561 void KVBucket::deleteExpiredItem(uint16_t vbid,
562                                  const DocKey& key,
563                                  time_t startTime,
564                                  uint64_t revSeqno,
565                                  ExpireBy source) {
566     VBucketPtr vb = getVBucket(vbid);
567     if (vb) {
568         // Obtain reader access to the VB state change lock so that
569         // the VB can't switch state whilst we're processing
570         ReaderLockHolder rlh(vb->getStateLock());
571         if (vb->getState() == vbucket_state_active) {
572             vb->deleteExpiredItem(key, startTime, revSeqno, source);
573         }
574     }
575 }
576
577 void KVBucket::deleteExpiredItems(
578         std::list<std::pair<uint16_t, StoredDocKey>>& keys, ExpireBy source) {
579     std::list<std::pair<uint16_t, std::string> >::iterator it;
580     time_t startTime = ep_real_time();
581     for (const auto& it : keys) {
582         deleteExpiredItem(it.first, it.second, startTime, 0, source);
583     }
584 }
585
586 bool KVBucket::isMetaDataResident(VBucketPtr &vb, const DocKey& key) {
587
588     if (!vb) {
589         throw std::invalid_argument("EPStore::isMetaDataResident: vb is NULL");
590     }
591
592     auto hbl = vb->ht.getLockedBucket(key);
593     StoredValue* v = vb->ht.unlocked_find(
594             key, hbl.getBucketNum(), WantsDeleted::No, TrackReference::No);
595
596     if (v && !v->isTempItem()) {
597         return true;
598     } else {
599         return false;
600     }
601 }
602
603 ENGINE_ERROR_CODE KVBucket::set(Item &itm, const void *cookie) {
604
605     VBucketPtr vb = getVBucket(itm.getVBucketId());
606     if (!vb) {
607         ++stats.numNotMyVBuckets;
608         return ENGINE_NOT_MY_VBUCKET;
609     }
610
611     // Obtain read-lock on VB state to ensure VB state changes are interlocked
612     // with this set
613     ReaderLockHolder rlh(vb->getStateLock());
614     if (vb->getState() == vbucket_state_dead) {
615         ++stats.numNotMyVBuckets;
616         return ENGINE_NOT_MY_VBUCKET;
617     } else if (vb->getState() == vbucket_state_replica) {
618         ++stats.numNotMyVBuckets;
619         return ENGINE_NOT_MY_VBUCKET;
620     } else if (vb->getState() == vbucket_state_pending) {
621         if (vb->addPendingOp(cookie)) {
622             return ENGINE_EWOULDBLOCK;
623         }
624     } else if (vb->isTakeoverBackedUp()) {
625         LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a set op"
626                 ", becuase takeover is lagging", vb->getId());
627         return ENGINE_TMPFAIL;
628     }
629
630     { // collections read-lock scope
631         auto collectionsRHandle = vb->lockCollections();
632         if (!collectionsRHandle.doesKeyContainValidCollection(itm.getKey())) {
633             return ENGINE_UNKNOWN_COLLECTION;
634         } // now hold collections read access for the duration of the set
635
636         return vb->set(itm, cookie, engine, bgFetchDelay);
637     }
638 }
639
640 ENGINE_ERROR_CODE KVBucket::add(Item &itm, const void *cookie)
641 {
642     VBucketPtr vb = getVBucket(itm.getVBucketId());
643     if (!vb) {
644         ++stats.numNotMyVBuckets;
645         return ENGINE_NOT_MY_VBUCKET;
646     }
647
648     // Obtain read-lock on VB state to ensure VB state changes are interlocked
649     // with this add
650     ReaderLockHolder rlh(vb->getStateLock());
651     if (vb->getState() == vbucket_state_dead ||
652         vb->getState() == vbucket_state_replica) {
653         ++stats.numNotMyVBuckets;
654         return ENGINE_NOT_MY_VBUCKET;
655     } else if (vb->getState() == vbucket_state_pending) {
656         if (vb->addPendingOp(cookie)) {
657             return ENGINE_EWOULDBLOCK;
658         }
659     } else if (vb->isTakeoverBackedUp()) {
660         LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a add op"
661                 ", becuase takeover is lagging", vb->getId());
662         return ENGINE_TMPFAIL;
663     }
664
665     if (itm.getCas() != 0) {
666         // Adding with a cas value doesn't make sense..
667         return ENGINE_NOT_STORED;
668     }
669
670     return vb->add(itm, cookie, engine, bgFetchDelay);
671 }
672
673 ENGINE_ERROR_CODE KVBucket::replace(Item &itm, const void *cookie) {
674     VBucketPtr vb = getVBucket(itm.getVBucketId());
675     if (!vb) {
676         ++stats.numNotMyVBuckets;
677         return ENGINE_NOT_MY_VBUCKET;
678     }
679
680     // Obtain read-lock on VB state to ensure VB state changes are interlocked
681     // with this replace
682     ReaderLockHolder rlh(vb->getStateLock());
683     if (vb->getState() == vbucket_state_dead ||
684         vb->getState() == vbucket_state_replica) {
685         ++stats.numNotMyVBuckets;
686         return ENGINE_NOT_MY_VBUCKET;
687     } else if (vb->getState() == vbucket_state_pending) {
688         if (vb->addPendingOp(cookie)) {
689             return ENGINE_EWOULDBLOCK;
690         }
691     }
692
693     return vb->replace(itm, cookie, engine, bgFetchDelay);
694 }
695
696 ENGINE_ERROR_CODE KVBucket::addBackfillItem(Item& itm,
697                                             GenerateBySeqno genBySeqno,
698                                             ExtendedMetaData* emd) {
699     VBucketPtr vb = getVBucket(itm.getVBucketId());
700     if (!vb) {
701         ++stats.numNotMyVBuckets;
702         return ENGINE_NOT_MY_VBUCKET;
703     }
704
705     // Obtain read-lock on VB state to ensure VB state changes are interlocked
706     // with this add-tapbackfill
707     ReaderLockHolder rlh(vb->getStateLock());
708     if (vb->getState() == vbucket_state_dead ||
709         vb->getState() == vbucket_state_active) {
710         ++stats.numNotMyVBuckets;
711         return ENGINE_NOT_MY_VBUCKET;
712     }
713
714     //check for the incoming item's CAS validity
715     if (!Item::isValidCas(itm.getCas())) {
716         return ENGINE_KEY_EEXISTS;
717     }
718
719     return vb->addBackfillItem(itm, genBySeqno);
720 }
721
722 ENGINE_ERROR_CODE KVBucket::setVBucketState(uint16_t vbid,
723                                             vbucket_state_t to,
724                                             bool transfer,
725                                             bool notify_dcp) {
726     // Lock to prevent a race condition between a failed update and add.
727     LockHolder lh(vbsetMutex);
728     return setVBucketState_UNLOCKED(vbid, to, transfer, notify_dcp, lh);
729 }
730
731 ENGINE_ERROR_CODE KVBucket::setVBucketState_UNLOCKED(uint16_t vbid,
732                                                      vbucket_state_t to,
733                                                      bool transfer,
734                                                      bool notify_dcp,
735                                                      LockHolder& vbset) {
736     VBucketPtr vb = vbMap.getBucket(vbid);
737     if (vb && to == vb->getState()) {
738         return ENGINE_SUCCESS;
739     }
740
741     if (vb) {
742         vbucket_state_t oldstate = vb->getState();
743
744         vb->setState(to);
745
746         if (oldstate != to && notify_dcp) {
747             bool closeInboundStreams = false;
748             if (to == vbucket_state_active && !transfer) {
749                 /**
750                  * Close inbound (passive) streams into the vbucket
751                  * only in case of a failover.
752                  */
753                 closeInboundStreams = true;
754             }
755             engine.getDcpConnMap().vbucketStateChanged(vbid, to,
756                                                        closeInboundStreams);
757         }
758
759         if (to == vbucket_state_active && oldstate == vbucket_state_replica) {
760             /**
761              * Update snapshot range when vbucket goes from being a replica
762              * to active, to maintain the correct snapshot sequence numbers
763              * even in a failover scenario.
764              */
765             vb->checkpointManager.resetSnapshotRange();
766         }
767
768         if (to == vbucket_state_active && !transfer) {
769             const snapshot_range_t range = vb->getPersistedSnapshot();
770             if (range.end == vb->getPersistenceSeqno()) {
771                 vb->failovers->createEntry(range.end);
772             } else {
773                 vb->failovers->createEntry(range.start);
774             }
775         }
776
777         if (oldstate == vbucket_state_pending &&
778             to == vbucket_state_active) {
779             ExTask notifyTask =
780                     make_STRCPtr<PendingOpsNotification>(engine, vb);
781             ExecutorPool::get()->schedule(notifyTask);
782         }
783         scheduleVBStatePersist(vbid);
784     } else if (vbid < vbMap.getSize()) {
785         auto ft =
786                 std::make_unique<FailoverTable>(engine.getMaxFailoverEntries());
787         KVShard* shard = vbMap.getShardByVbId(vbid);
788
789         VBucketPtr newvb =
790                 makeVBucket(vbid,
791                             to,
792                             shard,
793                             std::move(ft),
794                             std::make_unique<NotifyNewSeqnoCB>(*this));
795
796         Configuration& config = engine.getConfiguration();
797         if (config.isBfilterEnabled()) {
798             // Initialize bloom filters upon vbucket creation during
799             // bucket creation and rebalance
800             newvb->createFilter(config.getBfilterKeyCount(),
801                                 config.getBfilterFpProb());
802         }
803
804         // The first checkpoint for active vbucket should start with id 2.
805         uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
806         newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
807         if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
808             return ENGINE_ERANGE;
809         }
810         // When the VBucket is constructed we initialize
811         // persistenceSeqno(0) && persistenceCheckpointId(0)
812         newvb->setBucketCreation(true);
813         scheduleVBStatePersist(vbid);
814     } else {
815         return ENGINE_ERANGE;
816     }
817     return ENGINE_SUCCESS;
818 }
819
820 void KVBucket::scheduleVBStatePersist() {
821     for (auto vbid : vbMap.getBuckets()) {
822         scheduleVBStatePersist(vbid);
823     }
824 }
825
826 void KVBucket::scheduleVBStatePersist(VBucket::id_type vbid) {
827     VBucketPtr vb = getVBucket(vbid);
828
829     if (!vb) {
830         LOG(EXTENSION_LOG_WARNING,
831             "EPStore::scheduleVBStatePersist: vb:%" PRIu16
832             " does not not exist. Unable to schedule persistence.", vbid);
833         return;
834     }
835
836     vb->checkpointManager.queueSetVBState(*vb);
837 }
838
839 bool KVBucket::completeVBucketDeletion(uint16_t vbid, const void* cookie) {
840     hrtime_t start_time(gethrtime());
841     bool bucketDeleting;
842     {
843         LockHolder lh(vbsetMutex);
844         VBucketPtr vb = vbMap.getBucket(vbid);
845         bucketDeleting = !vb ||
846                 vb->getState() == vbucket_state_dead ||
847                 vb->isBucketDeletion();
848
849         if (bucketDeleting) {
850             LockHolder vlh(vb_mutexes[vbid]);
851             if (!getRWUnderlying(vbid)->delVBucket(vbid)) {
852                 return false;
853             }
854             if (vb) {
855                 vb->setBucketDeletion(false);
856                 vb->setBucketCreation(false);
857                 vb->setPersistenceSeqno(0);
858             }
859             ++stats.vbucketDeletions;
860         }
861     }
862
863     hrtime_t spent(gethrtime() - start_time);
864     hrtime_t wall_time = spent / 1000;
865     BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
866     stats.diskVBDelHisto.add(wall_time);
867     atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
868     stats.vbucketDelTotWalltime.fetch_add(wall_time);
869     if (cookie) {
870         engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
871     }
872
873     return true;
874 }
875
876 void KVBucket::scheduleVBDeletion(VBucketPtr &vb, const void* cookie,
877                                   double delay) {
878     ExTask delTask = make_STRCPtr<VBucketMemoryDeletionTask>(engine, vb, delay);
879     ExecutorPool::get()->schedule(delTask);
880
881     if (vb->setBucketDeletion(true)) {
882         ExTask task = make_STRCPtr<VBDeleteTask>(&engine, vb->getId(), cookie);
883         ExecutorPool::get()->schedule(task);
884     }
885 }
886
887 ENGINE_ERROR_CODE KVBucket::deleteVBucket(uint16_t vbid, const void* c) {
888     // Lock to prevent a race condition between a failed update and add
889     // (and delete).
890     VBucketPtr vb;
891     {
892         LockHolder lh(vbsetMutex);
893         vb = vbMap.getBucket(vbid);
894         if (!vb) {
895             return ENGINE_NOT_MY_VBUCKET;
896         }
897
898         vb->setState(vbucket_state_dead);
899         engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
900         vbMap.removeBucket(vbid);
901     }
902     scheduleVBDeletion(vb, c);
903     if (c) {
904         return ENGINE_EWOULDBLOCK;
905     }
906     return ENGINE_SUCCESS;
907 }
908
909 ENGINE_ERROR_CODE KVBucket::checkForDBExistence(DBFileId db_file_id) {
910     std::string backend = engine.getConfiguration().getBackend();
911     if (backend.compare("couchdb") == 0) {
912         VBucketPtr vb = vbMap.getBucket(db_file_id);
913         if (!vb) {
914             return ENGINE_NOT_MY_VBUCKET;
915         }
916     } else if (backend.compare("forestdb") == 0) {
917         if (db_file_id > (vbMap.getNumShards() - 1)) {
918             //TODO: find a better error code
919             return ENGINE_EINVAL;
920         }
921     } else {
922         LOG(EXTENSION_LOG_WARNING,
923             "Unknown backend specified for db file id: %d", db_file_id);
924         return ENGINE_FAILED;
925     }
926
927     return ENGINE_SUCCESS;
928 }
929
930 ENGINE_ERROR_CODE KVBucket::scheduleCompaction(uint16_t vbid, compaction_ctx c,
931                                                const void *cookie) {
932     ENGINE_ERROR_CODE errCode = checkForDBExistence(c.db_file_id);
933     if (errCode != ENGINE_SUCCESS) {
934         return errCode;
935     }
936
937     /* Obtain the vbucket so we can get the previous purge seqno */
938     VBucketPtr vb = vbMap.getBucket(vbid);
939     if (!vb) {
940         return ENGINE_NOT_MY_VBUCKET;
941     }
942
943     /* Update the compaction ctx with the previous purge seqno */
944     c.max_purged_seq[vbid] = vb->getPurgeSeqno();
945
946     LockHolder lh(compactionLock);
947     ExTask task = make_STRCPtr<CompactTask>(&engine, c, cookie);
948     compactionTasks.push_back(std::make_pair(c.db_file_id, task));
949     if (compactionTasks.size() > 1) {
950         if ((stats.diskQueueSize > compactionWriteQueueCap &&
951             compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
952             engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
953             // Snooze a new compaction task.
954             // We will wake it up when one of the existing compaction tasks is done.
955             task->snooze(60);
956         }
957     }
958
959     ExecutorPool::get()->schedule(task);
960
961     LOG(EXTENSION_LOG_DEBUG,
962         "Scheduled compaction task %" PRIu64 " on db %d,"
963         "purge_before_ts = %" PRIu64 ", purge_before_seq = %" PRIu64
964         ", dropdeletes = %d",
965         uint64_t(task->getId()),c.db_file_id, c.purge_before_ts,
966         c.purge_before_seq, c.drop_deletes);
967
968    return ENGINE_EWOULDBLOCK;
969 }
970
971 uint16_t KVBucket::getDBFileId(const protocol_binary_request_compact_db& req) {
972     KVStore *store = vbMap.shards[0]->getROUnderlying();
973     return store->getDBFileId(req);
974 }
975
976 void KVBucket::compactInternal(compaction_ctx *ctx) {
977     BloomFilterCBPtr filter(new BloomFilterCallback(*this));
978     ctx->bloomFilterCallback = filter;
979
980     ExpiredItemsCBPtr expiry(new ExpiredItemsCallback(*this));
981     ctx->expiryCallback = expiry;
982
983     KVShard* shard = vbMap.getShardByVbId(ctx->db_file_id);
984     KVStore* store = shard->getRWUnderlying();
985     bool result = store->compactDB(ctx);
986
987     Configuration& config = getEPEngine().getConfiguration();
988     /* Iterate over all the vbucket ids set in max_purged_seq map. If there is an entry
989      * in the map for a vbucket id, then it was involved in compaction and thus can
990      * be used to update the associated bloom filters and purge sequence numbers
991      */
992     for (auto& it : ctx->max_purged_seq) {
993         const uint16_t vbid = it.first;
994         VBucketPtr vb = getVBucket(vbid);
995         if (!vb) {
996             continue;
997         }
998
999         if (config.isBfilterEnabled() && result) {
1000             vb->swapFilter();
1001         } else {
1002             vb->clearFilter();
1003         }
1004         vb->setPurgeSeqno(it.second);
1005     }
1006 }
1007
1008 bool KVBucket::doCompact(compaction_ctx *ctx, const void *cookie) {
1009     ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1010     StorageProperties storeProp = getStorageProperties();
1011     bool concWriteCompact = storeProp.hasConcWriteCompact();
1012     uint16_t vbid = ctx->db_file_id;
1013
1014     /**
1015      * Check if the underlying storage engine allows writes concurrently
1016      * as the database file is being compacted. If not, a lock needs to
1017      * be held in order to serialize access to the database file between
1018      * the writer and compactor threads
1019      */
1020     if (concWriteCompact == false) {
1021         VBucketPtr vb = getVBucket(vbid);
1022         if (!vb) {
1023             err = ENGINE_NOT_MY_VBUCKET;
1024             engine.storeEngineSpecific(cookie, NULL);
1025             /**
1026              * Decrement session counter here, as memcached thread wouldn't
1027              * visit the engine interface in case of a NOT_MY_VB notification
1028              */
1029             engine.decrementSessionCtr();
1030         } else {
1031             std::unique_lock<std::mutex> lh(vb_mutexes[vbid], std::try_to_lock);
1032             if (!lh.owns_lock()) {
1033                 return true;
1034             }
1035
1036             compactInternal(ctx);
1037         }
1038     } else {
1039         compactInternal(ctx);
1040     }
1041
1042     updateCompactionTasks(ctx->db_file_id);
1043
1044     if (cookie) {
1045         engine.notifyIOComplete(cookie, err);
1046     }
1047     --stats.pendingCompactions;
1048     return false;
1049 }
1050
1051 void KVBucket::updateCompactionTasks(DBFileId db_file_id) {
1052     LockHolder lh(compactionLock);
1053     bool erased = false, woke = false;
1054     std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
1055     while (it != compactionTasks.end()) {
1056         if ((*it).first == db_file_id) {
1057             it = compactionTasks.erase(it);
1058             erased = true;
1059         } else {
1060             ExTask &task = (*it).second;
1061             if (task->getState() == TASK_SNOOZED) {
1062                 ExecutorPool::get()->wake(task->getId());
1063                 woke = true;
1064             }
1065             ++it;
1066         }
1067         if (erased && woke) {
1068             break;
1069         }
1070     }
1071 }
1072
1073 bool KVBucket::resetVBucket(uint16_t vbid) {
1074     LockHolder lh(vbsetMutex);
1075     return resetVBucket_UNLOCKED(vbid, lh);
1076 }
1077
1078 bool KVBucket::resetVBucket_UNLOCKED(uint16_t vbid, LockHolder& vbset) {
1079     bool rv(false);
1080
1081     VBucketPtr vb = vbMap.getBucket(vbid);
1082     if (vb) {
1083         vbucket_state_t vbstate = vb->getState();
1084
1085         vbMap.removeBucket(vbid);
1086
1087         checkpointCursorInfoList cursors =
1088                                         vb->checkpointManager.getAllCursors();
1089         // Delete and recreate the vbucket database file
1090         scheduleVBDeletion(vb, NULL, 0);
1091         setVBucketState_UNLOCKED(vbid, vbstate,
1092                                  false/*transfer*/, true/*notifyDcp*/, vbset);
1093
1094         // Copy the all cursors from the old vbucket into the new vbucket
1095         VBucketPtr newvb = vbMap.getBucket(vbid);
1096         newvb->checkpointManager.resetCursors(cursors);
1097
1098         rv = true;
1099     }
1100     return rv;
1101 }
1102
1103 extern "C" {
1104
1105     typedef struct {
1106         EventuallyPersistentEngine* engine;
1107         std::map<std::string, std::string> smap;
1108     } snapshot_stats_t;
1109
1110     static void add_stat(const char *key, const uint16_t klen,
1111                          const char *val, const uint32_t vlen,
1112                          const void *cookie) {
1113         if (cookie == nullptr) {
1114             throw std::invalid_argument("add_stat: cookie is NULL");
1115         }
1116         void *ptr = const_cast<void *>(cookie);
1117         snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1118         ObjectRegistry::onSwitchThread(snap->engine);
1119
1120         std::string k(key, klen);
1121         std::string v(val, vlen);
1122         snap->smap.insert(std::pair<std::string, std::string>(k, v));
1123     }
1124 }
1125
1126 void KVBucket::snapshotStats() {
1127     snapshot_stats_t snap;
1128     snap.engine = &engine;
1129     bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1130               engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1131               engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS;
1132
1133     if (rv && stats.isShutdown) {
1134         snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1135                                                               "true" : "false";
1136         std::stringstream ss;
1137         ss << ep_real_time();
1138         snap.smap["ep_shutdown_time"] = ss.str();
1139     }
1140     getOneRWUnderlying()->snapshotStats(snap.smap);
1141 }
1142
1143 void KVBucket::getAggregatedVBucketStats(const void* cookie,
1144                                          ADD_STAT add_stat) {
1145     // Create visitors for each of the four vBucket states, and collect
1146     // stats for each.
1147     auto active = makeVBCountVisitor(vbucket_state_active);
1148     auto replica = makeVBCountVisitor(vbucket_state_replica);
1149     auto pending = makeVBCountVisitor(vbucket_state_pending);
1150     auto dead = makeVBCountVisitor(vbucket_state_dead);
1151
1152     VBucketCountAggregator aggregator;
1153     aggregator.addVisitor(active.get());
1154     aggregator.addVisitor(replica.get());
1155     aggregator.addVisitor(pending.get());
1156     aggregator.addVisitor(dead.get());
1157     visit(aggregator);
1158
1159     updateCachedResidentRatio(active->getMemResidentPer(),
1160                               replica->getMemResidentPer());
1161     engine.getReplicationThrottle().adjustWriteQueueCap(active->getNumItems() +
1162                                                         replica->getNumItems() +
1163                                                         pending->getNumItems());
1164
1165     // And finally actually return the stats using the ADD_STAT callback.
1166     appendAggregatedVBucketStats(
1167             *active, *replica, *pending, *dead, cookie, add_stat);
1168 }
1169
1170 std::unique_ptr<VBucketCountVisitor> KVBucket::makeVBCountVisitor(
1171         vbucket_state_t state) {
1172     return std::make_unique<VBucketCountVisitor>(state);
1173 }
1174
1175 void KVBucket::appendAggregatedVBucketStats(VBucketCountVisitor& active,
1176                                             VBucketCountVisitor& replica,
1177                                             VBucketCountVisitor& pending,
1178                                             VBucketCountVisitor& dead,
1179                                             const void* cookie,
1180                                             ADD_STAT add_stat) {
1181 // Simplify the repetition of calling add_casted_stat with `add_stat` and
1182 // cookie each time. (Note: if we had C++14 we could use a polymorphic
1183 // lambda, but for now will have to stick to C++98 and macros :).
1184 #define DO_STAT(k, v)                            \
1185     do {                                         \
1186         add_casted_stat(k, v, add_stat, cookie); \
1187     } while (0)
1188
1189     // Top-level stats:
1190     DO_STAT("ep_flush_all", isDeleteAllScheduled());
1191     DO_STAT("curr_items", active.getNumItems());
1192     DO_STAT("curr_temp_items", active.getNumTempItems());
1193     DO_STAT("curr_items_tot",
1194             active.getNumItems() + replica.getNumItems() +
1195                     pending.getNumItems());
1196
1197     // Active vBuckets:
1198     DO_STAT("vb_active_backfill_queue_size", active.getBackfillQueueSize());
1199     DO_STAT("vb_active_num", active.getVBucketNumber());
1200     DO_STAT("vb_active_curr_items", active.getNumItems());
1201     DO_STAT("vb_active_hp_vb_req_size", active.getNumHpVBReqs());
1202     DO_STAT("vb_active_num_non_resident", active.getNonResident());
1203     DO_STAT("vb_active_perc_mem_resident", active.getMemResidentPer());
1204     DO_STAT("vb_active_eject", active.getEjects());
1205     DO_STAT("vb_active_expired", active.getExpired());
1206     DO_STAT("vb_active_meta_data_memory", active.getMetaDataMemory());
1207     DO_STAT("vb_active_meta_data_disk", active.getMetaDataDisk());
1208     DO_STAT("vb_active_ht_memory", active.getHashtableMemory());
1209     DO_STAT("vb_active_itm_memory", active.getItemMemory());
1210     DO_STAT("vb_active_ops_create", active.getOpsCreate());
1211     DO_STAT("vb_active_ops_update", active.getOpsUpdate());
1212     DO_STAT("vb_active_ops_delete", active.getOpsDelete());
1213     DO_STAT("vb_active_ops_reject", active.getOpsReject());
1214     DO_STAT("vb_active_queue_size", active.getQueueSize());
1215     DO_STAT("vb_active_queue_memory", active.getQueueMemory());
1216     DO_STAT("vb_active_queue_age", active.getAge());
1217     DO_STAT("vb_active_queue_pending", active.getPendingWrites());
1218     DO_STAT("vb_active_queue_fill", active.getQueueFill());
1219     DO_STAT("vb_active_queue_drain", active.getQueueDrain());
1220     DO_STAT("vb_active_rollback_item_count", active.getRollbackItemCount());
1221
1222     // Replica vBuckets:
1223     DO_STAT("vb_replica_backfill_queue_size", replica.getBackfillQueueSize());
1224     DO_STAT("vb_replica_num", replica.getVBucketNumber());
1225     DO_STAT("vb_replica_curr_items", replica.getNumItems());
1226     DO_STAT("vb_replica_hp_vb_req_size", replica.getNumHpVBReqs());
1227     DO_STAT("vb_replica_num_non_resident", replica.getNonResident());
1228     DO_STAT("vb_replica_perc_mem_resident", replica.getMemResidentPer());
1229     DO_STAT("vb_replica_eject", replica.getEjects());
1230     DO_STAT("vb_replica_expired", replica.getExpired());
1231     DO_STAT("vb_replica_meta_data_memory", replica.getMetaDataMemory());
1232     DO_STAT("vb_replica_meta_data_disk", replica.getMetaDataDisk());
1233     DO_STAT("vb_replica_ht_memory", replica.getHashtableMemory());
1234     DO_STAT("vb_replica_itm_memory", replica.getItemMemory());
1235     DO_STAT("vb_replica_ops_create", replica.getOpsCreate());
1236     DO_STAT("vb_replica_ops_update", replica.getOpsUpdate());
1237     DO_STAT("vb_replica_ops_delete", replica.getOpsDelete());
1238     DO_STAT("vb_replica_ops_reject", replica.getOpsReject());
1239     DO_STAT("vb_replica_queue_size", replica.getQueueSize());
1240     DO_STAT("vb_replica_queue_memory", replica.getQueueMemory());
1241     DO_STAT("vb_replica_queue_age", replica.getAge());
1242     DO_STAT("vb_replica_queue_pending", replica.getPendingWrites());
1243     DO_STAT("vb_replica_queue_fill", replica.getQueueFill());
1244     DO_STAT("vb_replica_queue_drain", replica.getQueueDrain());
1245     DO_STAT("vb_replica_rollback_item_count", replica.getRollbackItemCount());
1246
1247     // Pending vBuckets:
1248     DO_STAT("vb_pending_backfill_queue_size", pending.getBackfillQueueSize());
1249     DO_STAT("vb_pending_num", pending.getVBucketNumber());
1250     DO_STAT("vb_pending_curr_items", pending.getNumItems());
1251     DO_STAT("vb_pending_hp_vb_req_size", pending.getNumHpVBReqs());
1252     DO_STAT("vb_pending_num_non_resident", pending.getNonResident());
1253     DO_STAT("vb_pending_perc_mem_resident", pending.getMemResidentPer());
1254     DO_STAT("vb_pending_eject", pending.getEjects());
1255     DO_STAT("vb_pending_expired", pending.getExpired());
1256     DO_STAT("vb_pending_meta_data_memory", pending.getMetaDataMemory());
1257     DO_STAT("vb_pending_meta_data_disk", pending.getMetaDataDisk());
1258     DO_STAT("vb_pending_ht_memory", pending.getHashtableMemory());
1259     DO_STAT("vb_pending_itm_memory", pending.getItemMemory());
1260     DO_STAT("vb_pending_ops_create", pending.getOpsCreate());
1261     DO_STAT("vb_pending_ops_update", pending.getOpsUpdate());
1262     DO_STAT("vb_pending_ops_delete", pending.getOpsDelete());
1263     DO_STAT("vb_pending_ops_reject", pending.getOpsReject());
1264     DO_STAT("vb_pending_queue_size", pending.getQueueSize());
1265     DO_STAT("vb_pending_queue_memory", pending.getQueueMemory());
1266     DO_STAT("vb_pending_queue_age", pending.getAge());
1267     DO_STAT("vb_pending_queue_pending", pending.getPendingWrites());
1268     DO_STAT("vb_pending_queue_fill", pending.getQueueFill());
1269     DO_STAT("vb_pending_queue_drain", pending.getQueueDrain());
1270     DO_STAT("vb_pending_rollback_item_count", pending.getRollbackItemCount());
1271
1272     // Dead vBuckets:
1273     DO_STAT("vb_dead_num", dead.getVBucketNumber());
1274
1275     // Totals:
1276     DO_STAT("ep_vb_total",
1277             active.getVBucketNumber() + replica.getVBucketNumber() +
1278                     pending.getVBucketNumber() + dead.getVBucketNumber());
1279     DO_STAT("ep_total_new_items",
1280             active.getOpsCreate() + replica.getOpsCreate() +
1281                     pending.getOpsCreate());
1282     DO_STAT("ep_total_del_items",
1283             active.getOpsDelete() + replica.getOpsDelete() +
1284                     pending.getOpsDelete());
1285     DO_STAT("ep_diskqueue_memory",
1286             active.getQueueMemory() + replica.getQueueMemory() +
1287                     pending.getQueueMemory());
1288     DO_STAT("ep_diskqueue_fill",
1289             active.getQueueFill() + replica.getQueueFill() +
1290                     pending.getQueueFill());
1291     DO_STAT("ep_diskqueue_drain",
1292             active.getQueueDrain() + replica.getQueueDrain() +
1293                     pending.getQueueDrain());
1294     DO_STAT("ep_diskqueue_pending",
1295             active.getPendingWrites() + replica.getPendingWrites() +
1296                     pending.getPendingWrites());
1297     DO_STAT("ep_meta_data_memory",
1298             active.getMetaDataMemory() + replica.getMetaDataMemory() +
1299                     pending.getMetaDataMemory());
1300     DO_STAT("ep_meta_data_disk",
1301             active.getMetaDataDisk() + replica.getMetaDataDisk() +
1302                     pending.getMetaDataDisk());
1303     DO_STAT("ep_total_cache_size",
1304             active.getCacheSize() + replica.getCacheSize() +
1305                     pending.getCacheSize());
1306     DO_STAT("rollback_item_count",
1307             active.getRollbackItemCount() + replica.getRollbackItemCount() +
1308                     pending.getRollbackItemCount());
1309     DO_STAT("ep_num_non_resident",
1310             active.getNonResident() + pending.getNonResident() +
1311                     replica.getNonResident());
1312     DO_STAT("ep_chk_persistence_remains",
1313             active.getChkPersistRemaining() + pending.getChkPersistRemaining() +
1314                     replica.getChkPersistRemaining());
1315
1316     // Add stats for tracking HLC drift
1317     DO_STAT("ep_active_hlc_drift", active.getTotalAbsHLCDrift().total);
1318     DO_STAT("ep_active_hlc_drift_count", active.getTotalAbsHLCDrift().updates);
1319     DO_STAT("ep_replica_hlc_drift", replica.getTotalAbsHLCDrift().total);
1320     DO_STAT("ep_replica_hlc_drift_count",
1321             replica.getTotalAbsHLCDrift().updates);
1322
1323     DO_STAT("ep_active_ahead_exceptions",
1324             active.getTotalHLCDriftExceptionCounters().ahead);
1325     DO_STAT("ep_active_behind_exceptions",
1326             active.getTotalHLCDriftExceptionCounters().behind);
1327     DO_STAT("ep_replica_ahead_exceptions",
1328             replica.getTotalHLCDriftExceptionCounters().ahead);
1329     DO_STAT("ep_replica_behind_exceptions",
1330             replica.getTotalHLCDriftExceptionCounters().behind);
1331
1332     // A single total for ahead exceptions accross all active/replicas
1333     DO_STAT("ep_clock_cas_drift_threshold_exceeded",
1334             active.getTotalHLCDriftExceptionCounters().ahead +
1335                     replica.getTotalHLCDriftExceptionCounters().ahead);
1336
1337     for (uint8_t ii = 0; ii < active.getNumDatatypes(); ++ii) {
1338         std::string name = "ep_active_datatype_";
1339         name += mcbp::datatype::to_string(ii);
1340         DO_STAT(name.c_str(), active.getDatatypeCount(ii));
1341     }
1342
1343     for (uint8_t ii = 0; ii < replica.getNumDatatypes(); ++ii) {
1344         std::string name = "ep_replica_datatype_";
1345         name += mcbp::datatype::to_string(ii);
1346         DO_STAT(name.c_str(), replica.getDatatypeCount(ii));
1347     }
1348
1349 #undef DO_STAT
1350 }
1351
1352 void KVBucket::completeBGFetch(const DocKey& key,
1353                                uint16_t vbucket,
1354                                const void* cookie,
1355                                ProcessClock::time_point init,
1356                                bool isMeta) {
1357     ProcessClock::time_point startTime(ProcessClock::now());
1358     // Go find the data
1359     RememberingCallback<GetValue> gcb;
1360     if (isMeta) {
1361         gcb.val.setPartial();
1362     }
1363     getROUnderlying(vbucket)->get(key, vbucket, gcb);
1364     gcb.waitForValue();
1365
1366     {
1367       // Lock to prevent a race condition between a fetch for restore and delete
1368         LockHolder lh(vbsetMutex);
1369
1370         VBucketPtr vb = getVBucket(vbucket);
1371         if (vb) {
1372             VBucketBGFetchItem item{gcb.val, cookie, init, isMeta};
1373             ENGINE_ERROR_CODE status =
1374                     vb->completeBGFetchForSingleItem(key, item, startTime);
1375             engine.notifyIOComplete(item.cookie, status);
1376         } else {
1377             LOG(EXTENSION_LOG_INFO, "vb:%" PRIu16 " file was deleted in the "
1378                 "middle of a bg fetch for key{%.*s}\n", vbucket, int(key.size()),
1379                 key.data());
1380             engine.notifyIOComplete(cookie, ENGINE_NOT_MY_VBUCKET);
1381         }
1382     }
1383
1384     --stats.numRemainingBgJobs;
1385
1386     delete gcb.val.getValue();
1387 }
1388
1389 void KVBucket::completeBGFetchMulti(uint16_t vbId,
1390                                     std::vector<bgfetched_item_t>& fetchedItems,
1391                                     ProcessClock::time_point startTime) {
1392     VBucketPtr vb = getVBucket(vbId);
1393     if (vb) {
1394         for (const auto& item : fetchedItems) {
1395             auto& key = item.first;
1396             auto* fetched_item = item.second;
1397             ENGINE_ERROR_CODE status = vb->completeBGFetchForSingleItem(
1398                     key, *fetched_item, startTime);
1399             engine.notifyIOComplete(fetched_item->cookie, status);
1400         }
1401         LOG(EXTENSION_LOG_DEBUG,
1402             "EP Store completes %" PRIu64 " of batched background fetch "
1403             "for vBucket = %d endTime = %" PRIu64,
1404             uint64_t(fetchedItems.size()), vbId, gethrtime()/1000000);
1405     } else {
1406         for (const auto& item : fetchedItems) {
1407             engine.notifyIOComplete(item.second->cookie,
1408                                     ENGINE_NOT_MY_VBUCKET);
1409         }
1410         LOG(EXTENSION_LOG_WARNING,
1411             "EP Store completes %d of batched background fetch for "
1412             "for vBucket = %d that is already deleted\n",
1413             (int)fetchedItems.size(), vbId);
1414
1415     }
1416 }
1417
1418 GetValue KVBucket::getInternal(const DocKey& key, uint16_t vbucket,
1419                                const void *cookie, vbucket_state_t allowedState,
1420                                get_options_t options) {
1421
1422     vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1423         vbucket_state_replica : vbucket_state_active;
1424     VBucketPtr vb = getVBucket(vbucket);
1425
1426     if (!vb) {
1427         ++stats.numNotMyVBuckets;
1428         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1429     }
1430
1431     const bool honorStates = (options & HONOR_STATES);
1432
1433     ReaderLockHolder rlh(vb->getStateLock());
1434     if (honorStates) {
1435         vbucket_state_t vbState = vb->getState();
1436         if (vbState == vbucket_state_dead) {
1437             ++stats.numNotMyVBuckets;
1438             return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1439         } else if (vbState == disallowedState) {
1440             ++stats.numNotMyVBuckets;
1441             return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1442         } else if (vbState == vbucket_state_pending) {
1443             if (vb->addPendingOp(cookie)) {
1444                 return GetValue(NULL, ENGINE_EWOULDBLOCK);
1445             }
1446         }
1447     }
1448
1449     { // collections read scope
1450         auto collectionsRHandle = vb->lockCollections();
1451         if (!collectionsRHandle.doesKeyContainValidCollection(key)) {
1452             return GetValue(NULL, ENGINE_UNKNOWN_COLLECTION);
1453         }
1454
1455         return vb->getInternal(
1456                 key, cookie, engine, bgFetchDelay, options, diskDeleteAll);
1457     }
1458 }
1459
1460 GetValue KVBucket::getRandomKey() {
1461     VBucketMap::id_type max = vbMap.getSize();
1462
1463     const long start = random() % max;
1464     long curr = start;
1465     std::unique_ptr<Item> itm;
1466
1467     while (itm == NULL) {
1468         VBucketPtr vb = getVBucket(curr++);
1469         while (!vb || vb->getState() != vbucket_state_active) {
1470             if (curr == start) {
1471                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1472             }
1473             if (curr == max) {
1474                 curr = 0;
1475             }
1476
1477             vb = getVBucket(curr++);
1478         }
1479
1480         if ((itm = vb->ht.getRandomKey(random()))) {
1481             GetValue rv(itm.release(), ENGINE_SUCCESS);
1482             return rv;
1483         }
1484
1485         if (curr == max) {
1486             curr = 0;
1487         }
1488
1489         if (curr == start) {
1490             return GetValue(NULL, ENGINE_KEY_ENOENT);
1491         }
1492         // Search next vbucket
1493     }
1494
1495     return GetValue(NULL, ENGINE_KEY_ENOENT);
1496 }
1497
1498 ENGINE_ERROR_CODE KVBucket::getMetaData(const DocKey& key,
1499                                         uint16_t vbucket,
1500                                         const void* cookie,
1501                                         bool fetchDatatype,
1502                                         ItemMetaData& metadata,
1503                                         uint32_t& deleted,
1504                                         uint8_t& datatype)
1505 {
1506     VBucketPtr vb = getVBucket(vbucket);
1507
1508     if (!vb) {
1509         ++stats.numNotMyVBuckets;
1510         return ENGINE_NOT_MY_VBUCKET;
1511     }
1512
1513     ReaderLockHolder rlh(vb->getStateLock());
1514     if (vb->getState() == vbucket_state_dead ||
1515         vb->getState() == vbucket_state_replica) {
1516         ++stats.numNotMyVBuckets;
1517         return ENGINE_NOT_MY_VBUCKET;
1518     }
1519
1520     return vb->getMetaData(
1521             key, cookie, engine, bgFetchDelay, fetchDatatype, metadata,
1522             deleted, datatype);
1523 }
1524
1525 ENGINE_ERROR_CODE KVBucket::setWithMeta(Item &itm,
1526                                         uint64_t cas,
1527                                         uint64_t *seqno,
1528                                         const void *cookie,
1529                                         bool force,
1530                                         bool allowExisting,
1531                                         GenerateBySeqno genBySeqno,
1532                                         GenerateCas genCas,
1533                                         ExtendedMetaData *emd,
1534                                         bool isReplication)
1535 {
1536     VBucketPtr vb = getVBucket(itm.getVBucketId());
1537     if (!vb) {
1538         ++stats.numNotMyVBuckets;
1539         return ENGINE_NOT_MY_VBUCKET;
1540     }
1541
1542     ReaderLockHolder rlh(vb->getStateLock());
1543     if (vb->getState() == vbucket_state_dead) {
1544         ++stats.numNotMyVBuckets;
1545         return ENGINE_NOT_MY_VBUCKET;
1546     } else if (vb->getState() == vbucket_state_replica && !force) {
1547         ++stats.numNotMyVBuckets;
1548         return ENGINE_NOT_MY_VBUCKET;
1549     } else if (vb->getState() == vbucket_state_pending && !force) {
1550         if (vb->addPendingOp(cookie)) {
1551             return ENGINE_EWOULDBLOCK;
1552         }
1553     } else if (vb->isTakeoverBackedUp()) {
1554         LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a setWithMeta op"
1555                 ", becuase takeover is lagging", vb->getId());
1556         return ENGINE_TMPFAIL;
1557     }
1558
1559     //check for the incoming item's CAS validity
1560     if (!Item::isValidCas(itm.getCas())) {
1561         return ENGINE_KEY_EEXISTS;
1562     }
1563
1564     return vb->setWithMeta(itm,
1565                            cas,
1566                            seqno,
1567                            cookie,
1568                            engine,
1569                            bgFetchDelay,
1570                            force,
1571                            allowExisting,
1572                            genBySeqno,
1573                            genCas,
1574                            isReplication);
1575 }
1576
1577 GetValue KVBucket::getAndUpdateTtl(const DocKey& key, uint16_t vbucket,
1578                                    const void *cookie, time_t exptime)
1579 {
1580     VBucketPtr vb = getVBucket(vbucket);
1581     if (!vb) {
1582         ++stats.numNotMyVBuckets;
1583         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1584     }
1585
1586     ReaderLockHolder rlh(vb->getStateLock());
1587     if (vb->getState() == vbucket_state_dead) {
1588         ++stats.numNotMyVBuckets;
1589         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1590     } else if (vb->getState() == vbucket_state_replica) {
1591         ++stats.numNotMyVBuckets;
1592         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1593     } else if (vb->getState() == vbucket_state_pending) {
1594         if (vb->addPendingOp(cookie)) {
1595             return GetValue(NULL, ENGINE_EWOULDBLOCK);
1596         }
1597     }
1598
1599     return vb->getAndUpdateTtl(key, cookie, engine, bgFetchDelay, exptime);
1600 }
1601
1602 GetValue KVBucket::getLocked(const DocKey& key, uint16_t vbucket,
1603                              rel_time_t currentTime, uint32_t lockTimeout,
1604                              const void *cookie) {
1605     VBucketPtr vb = getVBucket(vbucket);
1606     if (!vb || vb->getState() != vbucket_state_active) {
1607         ++stats.numNotMyVBuckets;
1608         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1609     }
1610
1611     return vb->getLocked(
1612             key, currentTime, lockTimeout, cookie, engine, bgFetchDelay);
1613 }
1614
1615 ENGINE_ERROR_CODE KVBucket::unlockKey(const DocKey& key,
1616                                       uint16_t vbucket,
1617                                       uint64_t cas,
1618                                       rel_time_t currentTime)
1619 {
1620
1621     VBucketPtr vb = getVBucket(vbucket);
1622     if (!vb || vb->getState() != vbucket_state_active) {
1623         ++stats.numNotMyVBuckets;
1624         return ENGINE_NOT_MY_VBUCKET;
1625     }
1626
1627     auto hbl = vb->ht.getLockedBucket(key);
1628     StoredValue* v = vb->fetchValidValue(hbl,
1629                                          key,
1630                                          WantsDeleted::Yes,
1631                                          TrackReference::Yes,
1632                                          QueueExpired::Yes);
1633
1634     if (v) {
1635         if (v->isDeleted() || v->isTempNonExistentItem() ||
1636             v->isTempDeletedItem()) {
1637             return ENGINE_KEY_ENOENT;
1638         }
1639         if (v->isLocked(currentTime)) {
1640             if (v->getCas() == cas) {
1641                 v->unlock();
1642                 return ENGINE_SUCCESS;
1643             }
1644         }
1645         return ENGINE_TMPFAIL;
1646     } else {
1647         if (eviction_policy == VALUE_ONLY) {
1648             return ENGINE_KEY_ENOENT;
1649         } else {
1650             // With the full eviction, an item's lock is automatically
1651             // released when the item is evicted from memory. Therefore,
1652             // we simply return ENGINE_TMPFAIL when we receive unlockKey
1653             // for an item that is not in memocy cache. Note that we don't
1654             // spawn any bg fetch job to figure out if an item actually
1655             // exists in disk or not.
1656             return ENGINE_TMPFAIL;
1657         }
1658     }
1659 }
1660
1661 ENGINE_ERROR_CODE KVBucket::getKeyStats(const DocKey& key,
1662                                         uint16_t vbucket,
1663                                         const void* cookie,
1664                                         struct key_stats& kstats,
1665                                         WantsDeleted wantsDeleted) {
1666     VBucketPtr vb = getVBucket(vbucket);
1667     if (!vb) {
1668         return ENGINE_NOT_MY_VBUCKET;
1669     }
1670
1671     return vb->getKeyStats(
1672             key, cookie, engine, bgFetchDelay, kstats, wantsDeleted);
1673 }
1674
1675 std::string KVBucket::validateKey(const DocKey& key, uint16_t vbucket,
1676                                   Item &diskItem) {
1677     VBucketPtr vb = getVBucket(vbucket);
1678     auto hbl = vb->ht.getLockedBucket(key);
1679     StoredValue* v = vb->fetchValidValue(
1680             hbl, key, WantsDeleted::Yes, TrackReference::No, QueueExpired::Yes);
1681
1682     if (v) {
1683         if (v->isDeleted() || v->isTempNonExistentItem() ||
1684             v->isTempDeletedItem()) {
1685             return "item_deleted";
1686         }
1687
1688         if (diskItem.getFlags() != v->getFlags()) {
1689             return "flags_mismatch";
1690         } else if (v->isResident() && memcmp(diskItem.getData(),
1691                                              v->getValue()->getData(),
1692                                              diskItem.getNBytes())) {
1693             return "data_mismatch";
1694         } else {
1695             return "valid";
1696         }
1697     } else {
1698         return "item_deleted";
1699     }
1700
1701 }
1702
1703 ENGINE_ERROR_CODE KVBucket::deleteItem(const DocKey& key,
1704                                        uint64_t& cas,
1705                                        uint16_t vbucket,
1706                                        const void* cookie,
1707                                        Item* itm,
1708                                        ItemMetaData* itemMeta,
1709                                        mutation_descr_t* mutInfo) {
1710     VBucketPtr vb = getVBucket(vbucket);
1711     if (!vb || vb->getState() == vbucket_state_dead) {
1712         ++stats.numNotMyVBuckets;
1713         return ENGINE_NOT_MY_VBUCKET;
1714     } else if (vb->getState() == vbucket_state_replica) {
1715         ++stats.numNotMyVBuckets;
1716         return ENGINE_NOT_MY_VBUCKET;
1717     } else if (vb->getState() == vbucket_state_pending) {
1718         if (vb->addPendingOp(cookie)) {
1719             return ENGINE_EWOULDBLOCK;
1720         }
1721     } else if (vb->isTakeoverBackedUp()) {
1722         LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a delete op"
1723                 ", becuase takeover is lagging", vb->getId());
1724         return ENGINE_TMPFAIL;
1725     }
1726
1727     return vb->deleteItem(key,
1728                           cas,
1729                           cookie,
1730                           engine,
1731                           bgFetchDelay,
1732                           itm,
1733                           itemMeta,
1734                           mutInfo);
1735 }
1736
1737 ENGINE_ERROR_CODE KVBucket::deleteWithMeta(const DocKey& key,
1738                                            uint64_t& cas,
1739                                            uint64_t* seqno,
1740                                            uint16_t vbucket,
1741                                            const void* cookie,
1742                                            bool force,
1743                                            const ItemMetaData& itemMeta,
1744                                            bool backfill,
1745                                            GenerateBySeqno genBySeqno,
1746                                            GenerateCas generateCas,
1747                                            uint64_t bySeqno,
1748                                            ExtendedMetaData* emd,
1749                                            bool isReplication) {
1750     VBucketPtr vb = getVBucket(vbucket);
1751
1752     if (!vb) {
1753         ++stats.numNotMyVBuckets;
1754         return ENGINE_NOT_MY_VBUCKET;
1755     }
1756
1757     ReaderLockHolder rlh(vb->getStateLock());
1758     if (vb->getState() == vbucket_state_dead) {
1759         ++stats.numNotMyVBuckets;
1760         return ENGINE_NOT_MY_VBUCKET;
1761     } else if (vb->getState() == vbucket_state_replica && !force) {
1762         ++stats.numNotMyVBuckets;
1763         return ENGINE_NOT_MY_VBUCKET;
1764     } else if (vb->getState() == vbucket_state_pending && !force) {
1765         if (vb->addPendingOp(cookie)) {
1766             return ENGINE_EWOULDBLOCK;
1767         }
1768     } else if (vb->isTakeoverBackedUp()) {
1769         LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a deleteWithMeta "
1770                 "op, because takeover is lagging", vb->getId());
1771         return ENGINE_TMPFAIL;
1772     }
1773
1774     //check for the incoming item's CAS validity
1775     if (!Item::isValidCas(itemMeta.cas)) {
1776         return ENGINE_KEY_EEXISTS;
1777     }
1778
1779     return vb->deleteWithMeta(key,
1780                               cas,
1781                               seqno,
1782                               cookie,
1783                               engine,
1784                               bgFetchDelay,
1785                               force,
1786                               itemMeta,
1787                               backfill,
1788                               genBySeqno,
1789                               generateCas,
1790                               bySeqno,
1791                               isReplication);
1792 }
1793
1794 void KVBucket::reset() {
1795     auto buckets = vbMap.getBuckets();
1796     for (auto vbid : buckets) {
1797         VBucketPtr vb = getVBucket(vbid);
1798         if (vb) {
1799             LockHolder lh(vb_mutexes[vb->getId()]);
1800             vb->ht.clear();
1801             vb->checkpointManager.clear(vb->getState());
1802             vb->resetStats();
1803             vb->setPersistedSnapshot(0, 0);
1804         }
1805     }
1806 }
1807
1808 /**
1809  * Callback invoked after persisting an item from memory to disk.
1810  *
1811  * This class exists to create a closure around a few variables within
1812  * KVBucket::flushOne so that an object can be
1813  * requeued in case of failure to store in the underlying layer.
1814  */
1815 class PersistenceCallback : public Callback<mutation_result>,
1816                             public Callback<int> {
1817 public:
1818
1819     PersistenceCallback(const queued_item &qi, VBucketPtr &vb,
1820                         EPStats& s, uint64_t c)
1821         : queuedItem(qi), vbucket(vb), stats(s), cas(c) {
1822         if (!vb) {
1823             throw std::invalid_argument("PersistenceCallback(): vb is NULL");
1824         }
1825     }
1826
1827     // This callback is invoked for set only.
1828     void callback(mutation_result &value) {
1829         if (value.first == 1) {
1830             auto hbl = vbucket->ht.getLockedBucket(queuedItem->getKey());
1831             StoredValue* v = vbucket->fetchValidValue(hbl,
1832                                                       queuedItem->getKey(),
1833                                                       WantsDeleted::Yes,
1834                                                       TrackReference::No,
1835                                                       QueueExpired::Yes);
1836             if (v) {
1837                 if (v->getCas() == cas) {
1838                     // mark this item clean only if current and stored cas
1839                     // value match
1840                     v->markClean();
1841                 }
1842                 if (v->isNewCacheItem()) {
1843                     if (value.second) {
1844                         // Insert in value-only or full eviction mode.
1845                         ++vbucket->opsCreate;
1846                         vbucket->incrMetaDataDisk(*queuedItem);
1847                     } else { // Update in full eviction mode.
1848                         ++vbucket->opsUpdate;
1849                     }
1850
1851                     v->setNewCacheItem(false);
1852                 } else { // Update in value-only or full eviction mode.
1853                     ++vbucket->opsUpdate;
1854                 }
1855             }
1856
1857             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
1858             --stats.diskQueueSize;
1859             stats.totalPersisted++;
1860         } else {
1861             // If the return was 0 here, we're in a bad state because
1862             // we do not know the rowid of this object.
1863             if (value.first == 0) {
1864                 auto hbl = vbucket->ht.getLockedBucket(queuedItem->getKey());
1865                 StoredValue* v = vbucket->fetchValidValue(hbl,
1866                                                           queuedItem->getKey(),
1867                                                           WantsDeleted::Yes,
1868                                                           TrackReference::No,
1869                                                           QueueExpired::Yes);
1870                 if (v) {
1871                     LOG(EXTENSION_LOG_WARNING,
1872                         "PersistenceCallback::callback: Persisting on "
1873                         "vb:%" PRIu16 ", seqno:%" PRIu64 " returned 0 updates",
1874                         queuedItem->getVBucketId(), v->getBySeqno());
1875                 } else {
1876                     LOG(EXTENSION_LOG_WARNING,
1877                         "PersistenceCallback::callback: Error persisting, a key"
1878                         "is missing from vb:%" PRIu16,
1879                         queuedItem->getVBucketId());
1880                 }
1881
1882                 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
1883                 --stats.diskQueueSize;
1884             } else {
1885                 LOG(EXTENSION_LOG_WARNING,
1886                     "PersistenceCallback::callback: Fatal error in persisting "
1887                     "SET on vb:%" PRIu16, queuedItem->getVBucketId());
1888                 redirty();
1889             }
1890         }
1891     }
1892
1893     // This callback is invoked for deletions only.
1894     //
1895     // The boolean indicates whether the underlying storage
1896     // successfully deleted the item.
1897     void callback(int &value) {
1898         // > 1 would be bad.  We were only trying to delete one row.
1899         if (value > 1) {
1900             throw std::logic_error("PersistenceCallback::callback: value "
1901                     "(which is " + std::to_string(value) +
1902                     ") should be <= 1 for deletions");
1903         }
1904         // -1 means fail
1905         // 1 means we deleted one row
1906         // 0 means we did not delete a row, but did not fail (did not exist)
1907         if (value >= 0) {
1908             // We have successfully removed an item from the disk, we
1909             // may now remove it from the hash table.
1910             vbucket->deletedOnDiskCbk(*queuedItem, (value > 0));
1911         } else {
1912             LOG(EXTENSION_LOG_WARNING,
1913                 "PersistenceCallback::callback: Fatal error in persisting "
1914                 "DELETE on vb:%" PRIu16, queuedItem->getVBucketId());
1915             redirty();
1916         }
1917     }
1918
1919     VBucketPtr& getVBucket() {
1920         return vbucket;
1921     }
1922
1923 private:
1924
1925     void redirty() {
1926         if (vbucket->isBucketDeletion()) {
1927             // updating the member stats for the vbucket is not really necessary
1928             // as the vbucket is about to be deleted
1929             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
1930             // the following is a global stat and so is worth updating
1931             --stats.diskQueueSize;
1932             return;
1933         }
1934         ++stats.flushFailed;
1935         vbucket->markDirty(queuedItem->getKey());
1936         vbucket->rejectQueue.push(queuedItem);
1937         ++vbucket->opsReject;
1938     }
1939
1940     const queued_item queuedItem;
1941     VBucketPtr vbucket;
1942     EPStats& stats;
1943     uint64_t cas;
1944     DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
1945 };
1946
1947 bool KVBucket::scheduleDeleteAllTask(const void* cookie) {
1948     bool inverse = false;
1949     if (diskDeleteAll.compare_exchange_strong(inverse, true)) {
1950         deleteAllTaskCtx.cookie = cookie;
1951         deleteAllTaskCtx.delay.compare_exchange_strong(inverse, true);
1952         ExTask task = make_STRCPtr<DeleteAllTask>(&engine);
1953         ExecutorPool::get()->schedule(task);
1954         return true;
1955     } else {
1956         return false;
1957     }
1958 }
1959
1960 void KVBucket::setDeleteAllComplete() {
1961     // Notify memcached about delete all task completion, and
1962     // set diskFlushall flag to false
1963     if (deleteAllTaskCtx.cookie) {
1964         engine.notifyIOComplete(deleteAllTaskCtx.cookie, ENGINE_SUCCESS);
1965     }
1966     bool inverse = false;
1967     deleteAllTaskCtx.delay.compare_exchange_strong(inverse, true);
1968     inverse = true;
1969     diskDeleteAll.compare_exchange_strong(inverse, false);
1970 }
1971
1972 void KVBucket::flushOneDeleteAll() {
1973     for (VBucketMap::id_type i = 0; i < vbMap.getSize(); ++i) {
1974         VBucketPtr vb = getVBucket(i);
1975         // Reset the vBucket if it's non-null and not already in the middle of
1976         // being created / destroyed.
1977         if (vb && !(vb->isBucketCreation() || vb->isBucketDeletion())) {
1978             LockHolder lh(vb_mutexes[vb->getId()]);
1979             getRWUnderlying(vb->getId())->reset(i);
1980         }
1981     }
1982
1983     --stats.diskQueueSize;
1984     setDeleteAllComplete();
1985 }
1986
1987 int KVBucket::flushVBucket(uint16_t vbid) {
1988     KVShard *shard = vbMap.getShardByVbId(vbid);
1989     if (diskDeleteAll && !deleteAllTaskCtx.delay) {
1990         if (shard->getId() == EP_PRIMARY_SHARD) {
1991             flushOneDeleteAll();
1992         } else {
1993             // disk flush is pending just return
1994             return 0;
1995         }
1996     }
1997
1998     int items_flushed = 0;
1999     const hrtime_t flush_start = gethrtime();
2000
2001     VBucketPtr vb = vbMap.getBucket(vbid);
2002     if (vb) {
2003         std::unique_lock<std::mutex> lh(vb_mutexes[vbid], std::try_to_lock);
2004         if (!lh.owns_lock()) { // Try another bucket if this one is locked
2005             return RETRY_FLUSH_VBUCKET; // to avoid blocking flusher
2006         }
2007
2008         std::vector<queued_item> items;
2009         KVStore *rwUnderlying = getRWUnderlying(vbid);
2010
2011         while (!vb->rejectQueue.empty()) {
2012             items.push_back(vb->rejectQueue.front());
2013             vb->rejectQueue.pop();
2014         }
2015
2016         // Append any 'backfill' items (mutations added by a TAP stream).
2017         vb->getBackfillItems(items);
2018
2019         // Append all items outstanding for the persistence cursor.
2020         snapshot_range_t range;
2021         hrtime_t _begin_ = gethrtime();
2022         range = vb->checkpointManager.getAllItemsForCursor(
2023                 CheckpointManager::pCursorName, items);
2024         stats.persistenceCursorGetItemsHisto.add((gethrtime() - _begin_) / 1000);
2025
2026         if (!items.empty()) {
2027             while (!rwUnderlying->begin()) {
2028                 ++stats.beginFailed;
2029                 LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
2030                     "Retry in 1 sec ...");
2031                 sleep(1);
2032             }
2033             rwUnderlying->optimizeWrites(items);
2034
2035             Item *prev = NULL;
2036             auto vbstate = vb->getVBucketState();
2037             uint64_t maxSeqno = 0;
2038             range.start = std::max(range.start, vbstate.lastSnapStart);
2039
2040             bool mustCheckpointVBState = false;
2041             std::list<PersistenceCallback*>& pcbs = rwUnderlying->getPersistenceCbList();
2042
2043             SystemEventFlush sef;
2044
2045             for (const auto& item : items) {
2046
2047                 if (!item->shouldPersist()) {
2048                     continue;
2049                 }
2050
2051                 // Pass the Item through the SystemEventFlush which may filter
2052                 // the item away (return Skip).
2053                 if (sef.process(item) == ProcessStatus::Skip) {
2054                     // The item has no further flushing actions i.e. we've
2055                     // absorbed it in the process function.
2056                     // Update stats and carry-on
2057                     --stats.diskQueueSize;
2058                     vb->doStatsForFlushing(*item, item->size());
2059                     continue;
2060                 }
2061
2062                 if (item->getOperation() == queue_op::set_vbucket_state) {
2063                     // No actual item explicitly persisted to (this op exists
2064                     // to ensure a commit occurs with the current vbstate);
2065                     // flag that we must trigger a snapshot even if there are
2066                     // no 'real' items in the checkpoint.
2067                     mustCheckpointVBState = true;
2068
2069                     // Update queuing stats how this item has logically been
2070                     // processed.
2071                     --stats.diskQueueSize;
2072                     vb->doStatsForFlushing(*item, item->size());
2073
2074                 } else if (!prev || prev->getKey() != item->getKey()) {
2075                     prev = item.get();
2076                     ++items_flushed;
2077                     PersistenceCallback *cb = flushOneDelOrSet(item, vb);
2078                     if (cb) {
2079                         pcbs.push_back(cb);
2080                     }
2081
2082                     maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
2083                     vbstate.maxCas = std::max(vbstate.maxCas, item->getCas());
2084                     if (item->isDeleted()) {
2085                         vbstate.maxDeletedSeqno =
2086                                 std::max(vbstate.maxDeletedSeqno,
2087                                          item->getRevSeqno());
2088                     }
2089                     ++stats.flusher_todo;
2090
2091                 } else {
2092                     // Item is the same key as the previous[1] one - don't need
2093                     // to flush to disk.
2094                     // [1] Previous here really means 'next' - optimizeWrites()
2095                     //     above has actually re-ordered items such that items
2096                     //     with the same key are ordered from high->low seqno.
2097                     //     This means we only write the highest (i.e. newest)
2098                     //     item for a given key, and discard any duplicate,
2099                     //     older items.
2100                     --stats.diskQueueSize;
2101                     vb->doStatsForFlushing(*item, item->size());
2102                 }
2103             }
2104
2105
2106             {
2107                 ReaderLockHolder rlh(vb->getStateLock());
2108                 if (vb->getState() == vbucket_state_active) {
2109                     if (maxSeqno) {
2110                         range.start = maxSeqno;
2111                         range.end = maxSeqno;
2112                     }
2113                 }
2114
2115                 // Update VBstate based on the changes we have just made,
2116                 // then tell the rwUnderlying the 'new' state
2117                 // (which will persisted as part of the commit() below).
2118                 vbstate.lastSnapStart = range.start;
2119                 vbstate.lastSnapEnd = range.end;
2120
2121                 // Do we need to trigger a persist of the state?
2122                 // If there are no "real" items to flush, and we encountered
2123                 // a set_vbucket_state meta-item.
2124                 auto options = VBStatePersist::VBSTATE_CACHE_UPDATE_ONLY;
2125                 if ((items_flushed == 0) && mustCheckpointVBState) {
2126                     options = VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT;
2127                 }
2128
2129                 if (rwUnderlying->snapshotVBucket(vb->getId(), vbstate,
2130                                                   options) != true) {
2131                     return RETRY_FLUSH_VBUCKET;
2132                 }
2133
2134                 if (vb->setBucketCreation(false)) {
2135                     LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
2136                 }
2137             }
2138
2139             /* Perform an explicit commit to disk if the commit
2140              * interval reaches zero and if there is a non-zero number
2141              * of items to flush.
2142              * Or if there is a manifest item
2143              */
2144             if (items_flushed > 0 || sef.getCollectionsManifestItem()) {
2145                 commit(*rwUnderlying, sef.getCollectionsManifestItem());
2146
2147                 // Now the commit is complete, vBucket file must exist.
2148                 if (vb->setBucketCreation(false)) {
2149                     LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
2150                 }
2151             }
2152
2153             hrtime_t flush_end = gethrtime();
2154             uint64_t trans_time = (flush_end - flush_start) / 1000000;
2155
2156             lastTransTimePerItem.store((items_flushed == 0) ? 0 :
2157                                        static_cast<double>(trans_time) /
2158                                        static_cast<double>(items_flushed));
2159             stats.cumulativeFlushTime.fetch_add(trans_time);
2160             stats.flusher_todo.store(0);
2161             stats.totalPersistVBState++;
2162
2163             if (vb->rejectQueue.empty()) {
2164                 vb->setPersistedSnapshot(range.start, range.end);
2165                 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
2166                 if (highSeqno > 0 &&
2167                     highSeqno != vb->getPersistenceSeqno()) {
2168                     vb->setPersistenceSeqno(highSeqno);
2169                 }
2170             }
2171         }
2172
2173         rwUnderlying->pendingTasks();
2174
2175         if (vb->checkpointManager.getNumCheckpoints() > 1) {
2176             wakeUpCheckpointRemover();
2177         }
2178
2179         if (vb->rejectQueue.empty()) {
2180             vb->checkpointManager.itemsPersisted();
2181             uint64_t seqno = vb->getPersistenceSeqno();
2182             uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
2183             vb->notifyHighPriorityRequests(
2184                     engine, seqno, HighPriorityVBNotify::Seqno);
2185             vb->notifyHighPriorityRequests(
2186                     engine, chkid, HighPriorityVBNotify::ChkPersistence);
2187             if (chkid > 0 && chkid != vb->getPersistenceCheckpointId()) {
2188                 vb->setPersistenceCheckpointId(chkid);
2189             }
2190         } else {
2191             return RETRY_FLUSH_VBUCKET;
2192         }
2193     }
2194
2195     return items_flushed;
2196 }
2197
2198 void KVBucket::commit(KVStore& kvstore, const Item* collectionsManifest) {
2199     std::list<PersistenceCallback*>& pcbs = kvstore.getPersistenceCbList();
2200     BlockTimer timer(&stats.diskCommitHisto, "disk_commit", stats.timingLog);
2201     hrtime_t commit_start = gethrtime();
2202
2203     while (!kvstore.commit(collectionsManifest)) {
2204         ++stats.commitFailed;
2205         LOG(EXTENSION_LOG_WARNING,
2206             "KVBucket::commit: kvstore.commit failed!!! Retry in 1 sec...");
2207         sleep(1);
2208     }
2209
2210     //Update the total items in the case of full eviction
2211     if (getItemEvictionPolicy() == FULL_EVICTION) {
2212         std::unordered_set<uint16_t> vbSet;
2213         for (auto pcbIter : pcbs) {
2214             PersistenceCallback *pcb = pcbIter;
2215             VBucketPtr& vb = pcb->getVBucket();
2216             uint16_t vbid = vb->getId();
2217             auto found = vbSet.find(vbid);
2218             if (found == vbSet.end()) {
2219                 vbSet.insert(vbid);
2220                 vb->ht.setNumTotalItems(
2221                         getRWUnderlying(vbid)->getItemCount(vbid));
2222             }
2223         }
2224     }
2225
2226     while (!pcbs.empty()) {
2227          delete pcbs.front();
2228          pcbs.pop_front();
2229     }
2230
2231     ++stats.flusherCommits;
2232     hrtime_t commit_end = gethrtime();
2233     uint64_t commit_time = (commit_end - commit_start) / 1000000;
2234     stats.commit_time.store(commit_time);
2235     stats.cumulativeCommitTime.fetch_add(commit_time);
2236 }
2237
2238 PersistenceCallback* KVBucket::flushOneDelOrSet(const queued_item &qi,
2239                                                 VBucketPtr &vb) {
2240
2241     if (!vb) {
2242         --stats.diskQueueSize;
2243         return NULL;
2244     }
2245
2246     int64_t bySeqno = qi->getBySeqno();
2247     rel_time_t queued(qi->getQueuedTime());
2248
2249     int dirtyAge = ep_current_time() - queued;
2250     stats.dirtyAgeHisto.add(dirtyAge * 1000000);
2251     stats.dirtyAge.store(dirtyAge);
2252     stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
2253                                          stats.dirtyAgeHighWat.load()));
2254
2255     KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
2256     if (SystemEventFlush::isUpsert(*qi)) {
2257         // TODO: Need to separate disk_insert from disk_update because
2258         // bySeqno doesn't give us that information.
2259         BlockTimer timer(bySeqno == -1 ?
2260                          &stats.diskInsertHisto : &stats.diskUpdateHisto,
2261                          bySeqno == -1 ? "disk_insert" : "disk_update",
2262                          stats.timingLog);
2263         PersistenceCallback *cb =
2264             new PersistenceCallback(qi, vb, stats, qi->getCas());
2265         rwUnderlying->set(*qi, *cb);
2266         return cb;
2267     } else {
2268         BlockTimer timer(&stats.diskDelHisto, "disk_delete",
2269                          stats.timingLog);
2270         PersistenceCallback *cb =
2271             new PersistenceCallback(qi, vb, stats, 0);
2272         rwUnderlying->del(*qi, *cb);
2273         return cb;
2274     }
2275 }
2276
2277 std::vector<vbucket_state *> KVBucket::loadVBucketState()
2278 {
2279     return getOneROUnderlying()->listPersistedVbuckets();
2280 }
2281
2282 void KVBucket::warmupCompleted() {
2283     // Snapshot VBucket state after warmup to ensure Failover table is
2284     // persisted.
2285     scheduleVBStatePersist();
2286
2287     if (engine.getConfiguration().getAlogPath().length() > 0) {
2288
2289         if (engine.getConfiguration().isAccessScannerEnabled()) {
2290             {
2291                 LockHolder lh(accessScanner.mutex);
2292                 accessScanner.enabled = true;
2293             }
2294             LOG(EXTENSION_LOG_NOTICE, "Access Scanner task enabled");
2295             size_t smin = engine.getConfiguration().getAlogSleepTime();
2296             setAccessScannerSleeptime(smin, true);
2297         } else {
2298             LockHolder lh(accessScanner.mutex);
2299             accessScanner.enabled = false;
2300             LOG(EXTENSION_LOG_NOTICE, "Access Scanner task disabled");
2301         }
2302
2303         Configuration &config = engine.getConfiguration();
2304         config.addValueChangedListener("access_scanner_enabled",
2305                                        new EPStoreValueChangeListener(*this));
2306         config.addValueChangedListener("alog_sleep_time",
2307                                        new EPStoreValueChangeListener(*this));
2308         config.addValueChangedListener("alog_task_time",
2309                                        new EPStoreValueChangeListener(*this));
2310     }
2311
2312     // "0" sleep_time means that the first snapshot task will be executed
2313     // right after warmup. Subsequent snapshot tasks will be scheduled every
2314     // 60 sec by default.
2315     ExecutorPool *iom = ExecutorPool::get();
2316     ExTask task = make_STRCPtr<StatSnap>(&engine, 0, false);
2317     statsSnapshotTaskId = iom->schedule(task);
2318 }
2319
2320 bool KVBucket::maybeEnableTraffic()
2321 {
2322     // @todo rename.. skal vaere isTrafficDisabled elns
2323     double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
2324     double maxSize = static_cast<double>(stats.getMaxDataSize());
2325
2326     if (memoryUsed  >= stats.mem_low_wat) {
2327         LOG(EXTENSION_LOG_NOTICE,
2328             "Total memory use reached to the low water mark, stop warmup"
2329             ": memoryUsed (%f) >= low water mark (%" PRIu64 ")",
2330             memoryUsed, uint64_t(stats.mem_low_wat.load()));
2331         return true;
2332     } else if (memoryUsed > (maxSize * stats.warmupMemUsedCap)) {
2333         LOG(EXTENSION_LOG_NOTICE,
2334                 "Enough MB of data loaded to enable traffic"
2335                 ": memoryUsed (%f) > (maxSize(%f) * warmupMemUsedCap(%f))",
2336                  memoryUsed, maxSize, stats.warmupMemUsedCap.load());
2337         return true;
2338     } else if (eviction_policy == VALUE_ONLY &&
2339                stats.warmedUpValues >=
2340                                (stats.warmedUpKeys * stats.warmupNumReadCap)) {
2341         // Let ep-engine think we're done with the warmup phase
2342         // (we should refactor this into "enableTraffic")
2343         LOG(EXTENSION_LOG_NOTICE,
2344             "Enough number of items loaded to enable traffic (value eviction)"
2345             ": warmedUpValues(%" PRIu64 ") >= (warmedUpKeys(%" PRIu64 ") * "
2346             "warmupNumReadCap(%f))",  uint64_t(stats.warmedUpValues.load()),
2347             uint64_t(stats.warmedUpKeys.load()), stats.warmupNumReadCap.load());
2348         return true;
2349     } else if (eviction_policy == FULL_EVICTION &&
2350                stats.warmedUpValues >=
2351                             (warmupTask->getEstimatedItemCount() *
2352                              stats.warmupNumReadCap)) {
2353         // In case of FULL EVICTION, warmed up keys always matches the number
2354         // of warmed up values, therefore for honoring the min_item threshold
2355         // in this scenario, we can consider warmup's estimated item count.
2356         LOG(EXTENSION_LOG_NOTICE,
2357             "Enough number of items loaded to enable traffic (full eviction)"
2358             ": warmedUpValues(%" PRIu64 ") >= (warmup est items(%" PRIu64 ") * "
2359             "warmupNumReadCap(%f))",  uint64_t(stats.warmedUpValues.load()),
2360             uint64_t(warmupTask->getEstimatedItemCount()),
2361             stats.warmupNumReadCap.load());
2362         return true;
2363     }
2364     return false;
2365 }
2366
2367 bool KVBucket::isWarmingUp() {
2368     return warmupTask && !warmupTask->isComplete();
2369 }
2370
2371 bool KVBucket::isWarmupOOMFailure() {
2372     return warmupTask && warmupTask->hasOOMFailure();
2373 }
2374
2375 void KVBucket::stopWarmup(void)
2376 {
2377     // forcefully stop current warmup task
2378     if (isWarmingUp()) {
2379         LOG(EXTENSION_LOG_NOTICE, "Stopping warmup while engine is loading "
2380             "data from underlying storage, shutdown = %s\n",
2381             stats.isShutdown ? "yes" : "no");
2382         warmupTask->stop();
2383     }
2384 }
2385
2386 bool KVBucket::isMemoryUsageTooHigh() {
2387     double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
2388     double maxSize = static_cast<double>(stats.getMaxDataSize());
2389     return memoryUsed > (maxSize * backfillMemoryThreshold);
2390 }
2391
2392 void KVBucket::setBackfillMemoryThreshold(double threshold) {
2393     backfillMemoryThreshold = threshold;
2394 }
2395
2396 void KVBucket::setExpiryPagerSleeptime(size_t val) {
2397     LockHolder lh(expiryPager.mutex);
2398
2399     ExecutorPool::get()->cancel(expiryPager.task);
2400
2401     expiryPager.sleeptime = val;
2402     if (expiryPager.enabled) {
2403         ExTask expTask = make_STRCPtr<ExpiredItemPager>(
2404                 &engine, stats, expiryPager.sleeptime);
2405         expiryPager.task = ExecutorPool::get()->schedule(expTask);
2406     } else {
2407         LOG(EXTENSION_LOG_DEBUG, "Expiry pager disabled, "
2408                                  "enabling it will make exp_pager_stime (%lu)"
2409                                  "to go into effect!", val);
2410     }
2411 }
2412
2413 void KVBucket::setExpiryPagerTasktime(ssize_t val) {
2414     LockHolder lh(expiryPager.mutex);
2415     if (expiryPager.enabled) {
2416         ExecutorPool::get()->cancel(expiryPager.task);
2417         ExTask expTask = make_STRCPtr<ExpiredItemPager>(
2418                 &engine, stats, expiryPager.sleeptime, val);
2419         expiryPager.task = ExecutorPool::get()->schedule(expTask);
2420     } else {
2421         LOG(EXTENSION_LOG_DEBUG, "Expiry pager disabled, "
2422                                  "enabling it will make exp_pager_stime (%lu)"
2423                                  "to go into effect!", val);
2424     }
2425 }
2426
2427 void KVBucket::enableExpiryPager() {
2428     LockHolder lh(expiryPager.mutex);
2429     if (!expiryPager.enabled) {
2430         expiryPager.enabled = true;
2431
2432         ExecutorPool::get()->cancel(expiryPager.task);
2433         ExTask expTask = make_STRCPtr<ExpiredItemPager>(
2434                 &engine, stats, expiryPager.sleeptime);
2435         expiryPager.task = ExecutorPool::get()->schedule(expTask);
2436     } else {
2437         LOG(EXTENSION_LOG_DEBUG, "Expiry Pager already enabled!");
2438     }
2439 }
2440
2441 void KVBucket::disableExpiryPager() {
2442     LockHolder lh(expiryPager.mutex);
2443     if (expiryPager.enabled) {
2444         ExecutorPool::get()->cancel(expiryPager.task);
2445         expiryPager.enabled = false;
2446     } else {
2447         LOG(EXTENSION_LOG_DEBUG, "Expiry Pager already disabled!");
2448     }
2449 }
2450
2451 void KVBucket::enableItemPager() {
2452     ExecutorPool::get()->cancel(itemPagerTask->getId());
2453     ExecutorPool::get()->schedule(itemPagerTask);
2454 }
2455
2456 void KVBucket::disableItemPager() {
2457     ExecutorPool::get()->cancel(itemPagerTask->getId());
2458 }
2459
2460 void KVBucket::enableAccessScannerTask() {
2461     LockHolder lh(accessScanner.mutex);
2462     if (!accessScanner.enabled) {
2463         accessScanner.enabled = true;
2464
2465         if (accessScanner.sleeptime != 0) {
2466             ExecutorPool::get()->cancel(accessScanner.task);
2467         }
2468
2469         size_t alogSleepTime = engine.getConfiguration().getAlogSleepTime();
2470         accessScanner.sleeptime = alogSleepTime * 60;
2471         if (accessScanner.sleeptime != 0) {
2472             ExTask task = make_STRCPtr<AccessScanner>(
2473                     *this, stats, accessScanner.sleeptime, true);
2474             accessScanner.task = ExecutorPool::get()->schedule(task);
2475         } else {
2476             LOG(EXTENSION_LOG_NOTICE, "Did not enable access scanner task, "
2477                                       "as alog_sleep_time is set to zero!");
2478         }
2479     } else {
2480         LOG(EXTENSION_LOG_DEBUG, "Access scanner already enabled!");
2481     }
2482 }
2483
2484 void KVBucket::disableAccessScannerTask() {
2485     LockHolder lh(accessScanner.mutex);
2486     if (accessScanner.enabled) {
2487         ExecutorPool::get()->cancel(accessScanner.task);
2488         accessScanner.sleeptime = 0;
2489         accessScanner.enabled = false;
2490     } else {
2491         LOG(EXTENSION_LOG_DEBUG, "Access scanner already disabled!");
2492     }
2493 }
2494
2495 void KVBucket::setAccessScannerSleeptime(size_t val, bool useStartTime) {
2496     LockHolder lh(accessScanner.mutex);
2497
2498     if (accessScanner.enabled) {
2499         if (accessScanner.sleeptime != 0) {
2500             ExecutorPool::get()->cancel(accessScanner.task);
2501         }
2502
2503         // store sleeptime in seconds
2504         accessScanner.sleeptime = val * 60;
2505         if (accessScanner.sleeptime != 0) {
2506             ExTask task = make_STRCPtr<AccessScanner>(
2507                     *this, stats, accessScanner.sleeptime, useStartTime);
2508             accessScanner.task = ExecutorPool::get()->schedule(task);
2509         }
2510     }
2511 }
2512
2513 void KVBucket::resetAccessScannerStartTime() {
2514     LockHolder lh(accessScanner.mutex);
2515
2516     if (accessScanner.enabled) {
2517         if (accessScanner.sleeptime != 0) {
2518             ExecutorPool::get()->cancel(accessScanner.task);
2519             // re-schedule task according to the new task start hour
2520             ExTask task = make_STRCPtr<AccessScanner>(
2521                     *this, stats, accessScanner.sleeptime, true);
2522             accessScanner.task = ExecutorPool::get()->schedule(task);
2523         }
2524     }
2525 }
2526
2527 void KVBucket::setAllBloomFilters(bool to) {
2528     for (VBucketMap::id_type vbid = 0; vbid < vbMap.getSize(); vbid++) {
2529         VBucketPtr vb = vbMap.getBucket(vbid);
2530         if (vb) {
2531             if (to) {
2532                 vb->setFilterStatus(BFILTER_ENABLED);
2533             } else {
2534                 vb->setFilterStatus(BFILTER_DISABLED);
2535             }
2536         }
2537     }
2538 }
2539
2540 void KVBucket::visit(VBucketVisitor &visitor)
2541 {
2542     for (VBucketMap::id_type vbid = 0; vbid < vbMap.getSize(); ++vbid) {
2543         VBucketPtr vb = vbMap.getBucket(vbid);
2544         if (vb) {
2545             visitor.visitBucket(vb);
2546         }
2547     }
2548     visitor.complete();
2549 }
2550
2551 KVBucket::Position KVBucket::pauseResumeVisit(
2552                                             PauseResumeEPStoreVisitor& visitor,
2553                                             Position& start_pos)
2554 {
2555     uint16_t vbid = start_pos.vbucket_id;
2556     for (; vbid < vbMap.getSize(); ++vbid) {
2557         VBucketPtr vb = vbMap.getBucket(vbid);
2558         if (vb) {
2559             bool paused = !visitor.visit(vbid, vb->ht);
2560             if (paused) {
2561                 break;
2562             }
2563         }
2564     }
2565
2566     return KVBucket::Position(vbid);
2567 }
2568
2569 KVBucket::Position KVBucket::startPosition() const
2570 {
2571     return KVBucket::Position(0);
2572 }
2573
2574 KVBucket::Position KVBucket::endPosition() const
2575 {
2576     return KVBucket::Position(vbMap.getSize());
2577 }
2578
2579 VBCBAdaptor::VBCBAdaptor(KVBucket* s,
2580                          TaskId id,
2581                          std::unique_ptr<VBucketVisitor> v,
2582                          const char* l,
2583                          double sleep,
2584                          bool shutdown)
2585     : GlobalTask(&s->getEPEngine(), id, 0, shutdown),
2586       store(s),
2587       visitor(std::move(v)),
2588       label(l),
2589       sleepTime(sleep),
2590       currentvb(0) {
2591     updateDescription();
2592     const VBucketFilter& vbFilter = visitor->getVBucketFilter();
2593     for (auto vbid : store->getVBuckets().getBuckets()) {
2594         if (vbFilter(vbid)) {
2595             vbList.push(vbid);
2596         }
2597     }
2598 }
2599
2600 bool VBCBAdaptor::run(void) {
2601     if (!vbList.empty()) {
2602         TRACE_EVENT("ep-engine/task", "VBCBAdaptor", vbList.front());
2603         currentvb.store(vbList.front());
2604         updateDescription();
2605         VBucketPtr vb = store->getVBucket(currentvb);
2606         if (vb) {
2607             if (visitor->pauseVisitor()) {
2608                 snooze(sleepTime);
2609                 return true;
2610             }
2611             visitor->visitBucket(vb);
2612         }
2613         vbList.pop();
2614     }
2615
2616     bool isdone = vbList.empty();
2617     if (isdone) {
2618         visitor->complete();
2619     }
2620     return !isdone;
2621 }
2622
2623 void VBCBAdaptor::updateDescription() {
2624     std::unique_lock<std::mutex> lock(description.mutex);
2625     description.text =
2626             std::string(label) + " on vb " + std::to_string(currentvb.load());
2627 }
2628
2629 void KVBucket::resetUnderlyingStats(void)
2630 {
2631     for (size_t i = 0; i < vbMap.shards.size(); i++) {
2632         KVShard *shard = vbMap.shards[i].get();
2633         shard->getRWUnderlying()->resetStats();
2634         shard->getROUnderlying()->resetStats();
2635     }
2636
2637     for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
2638         stats.schedulingHisto[i].reset();
2639         stats.taskRuntimeHisto[i].reset();
2640     }
2641 }
2642
2643 void KVBucket::addKVStoreStats(ADD_STAT add_stat, const void* cookie) {
2644     for (size_t i = 0; i < vbMap.shards.size(); i++) {
2645         /* Add the different KVStore instances into a set and then
2646          * retrieve the stats from each instance separately. This
2647          * is because CouchKVStore has separate read only and read
2648          * write instance whereas ForestKVStore has only instance
2649          * for both read write and read-only.
2650          */
2651         std::set<KVStore *> underlyingSet;
2652         underlyingSet.insert(vbMap.shards[i]->getRWUnderlying());
2653         underlyingSet.insert(vbMap.shards[i]->getROUnderlying());
2654
2655         for (auto* store : underlyingSet) {
2656             store->addStats(add_stat, cookie);
2657         }
2658     }
2659 }
2660
2661 void KVBucket::addKVStoreTimingStats(ADD_STAT add_stat, const void* cookie) {
2662     for (size_t i = 0; i < vbMap.shards.size(); i++) {
2663         std::set<KVStore*> underlyingSet;
2664         underlyingSet.insert(vbMap.shards[i]->getRWUnderlying());
2665         underlyingSet.insert(vbMap.shards[i]->getROUnderlying());
2666
2667         for (auto* store : underlyingSet) {
2668             store->addTimingStats(add_stat, cookie);
2669         }
2670     }
2671 }
2672
2673 bool KVBucket::getKVStoreStat(const char* name, size_t& value, KVSOption option)
2674 {
2675     value = 0;
2676     bool success = true;
2677     for (const auto& shard : vbMap.shards) {
2678         size_t per_shard_value;
2679
2680         if (option == KVSOption::RO || option == KVSOption::BOTH) {
2681             success &= shard->getROUnderlying()->getStat(name, per_shard_value);
2682             value += per_shard_value;
2683         }
2684
2685         if (option == KVSOption::RW || option == KVSOption::BOTH) {
2686             success &= shard->getRWUnderlying()->getStat(name, per_shard_value);
2687             value += per_shard_value;
2688         }
2689     }
2690     return success;
2691 }
2692
2693 KVStore *KVBucket::getOneROUnderlying(void) {
2694     return vbMap.shards[EP_PRIMARY_SHARD]->getROUnderlying();
2695 }
2696
2697 KVStore *KVBucket::getOneRWUnderlying(void) {
2698     return vbMap.shards[EP_PRIMARY_SHARD]->getRWUnderlying();
2699 }
2700
2701 ENGINE_ERROR_CODE KVBucket::rollback(uint16_t vbid, uint64_t rollbackSeqno) {
2702     LockHolder vbset(vbsetMutex);
2703
2704     std::unique_lock<std::mutex> lh(vb_mutexes[vbid], std::try_to_lock);
2705
2706     if (!lh.owns_lock()) {
2707         return ENGINE_TMPFAIL; // Reschedule a vbucket rollback task.
2708     }
2709
2710     VBucketPtr vb = vbMap.getBucket(vbid);
2711     if (!vb) {
2712         return ENGINE_NOT_MY_VBUCKET;
2713     }
2714
2715     ReaderLockHolder rlh(vb->getStateLock());
2716     if (vb->getState() == vbucket_state_replica) {
2717         uint64_t prevHighSeqno = static_cast<uint64_t>
2718                                         (vb->checkpointManager.getHighSeqno());
2719         if (rollbackSeqno != 0) {
2720             RollbackResult result = doRollback(vbid, rollbackSeqno);
2721
2722             if (result.success /* not suceess hence reset vbucket to
2723                                   avoid data loss */
2724                 &&
2725                 (result.highSeqno > 0) /* if 0, reset vbucket for a clean start
2726                                           instead of deleting everything in it
2727                                         */) {
2728                 rollbackUnpersistedItems(*vb, result.highSeqno);
2729                 vb->postProcessRollback(result, prevHighSeqno);
2730                 return ENGINE_SUCCESS;
2731             }
2732         }
2733
2734         if (resetVBucket_UNLOCKED(vbid, vbset)) {
2735             VBucketPtr newVb = vbMap.getBucket(vbid);
2736             newVb->incrRollbackItemCount(prevHighSeqno);
2737             return ENGINE_SUCCESS;
2738         }
2739         return ENGINE_NOT_MY_VBUCKET;
2740     } else {
2741         return ENGINE_EINVAL;
2742     }
2743 }
2744
2745 void KVBucket::runDefragmenterTask() {
2746     defragmenterTask->run();
2747 }
2748
2749 bool KVBucket::runAccessScannerTask() {
2750     return ExecutorPool::get()->wake(accessScanner.task);
2751 }
2752
2753 void KVBucket::runVbStatePersistTask(int vbid) {
2754     scheduleVBStatePersist(vbid);
2755 }
2756
2757 void KVBucket::setCursorDroppingLowerUpperThresholds(size_t maxSize) {
2758     Configuration &config = engine.getConfiguration();
2759     stats.cursorDroppingLThreshold.store(static_cast<size_t>(maxSize *
2760                     ((double)(config.getCursorDroppingLowerMark()) / 100)));
2761     stats.cursorDroppingUThreshold.store(static_cast<size_t>(maxSize *
2762                     ((double)(config.getCursorDroppingUpperMark()) / 100)));
2763 }
2764
2765 size_t KVBucket::getActiveResidentRatio() const {
2766     return cachedResidentRatio.activeRatio.load();
2767 }
2768
2769 size_t KVBucket::getReplicaResidentRatio() const {
2770     return cachedResidentRatio.replicaRatio.load();
2771 }
2772
2773 ENGINE_ERROR_CODE KVBucket::forceMaxCas(uint16_t vbucket, uint64_t cas) {
2774     VBucketPtr vb = vbMap.getBucket(vbucket);
2775     if (vb) {
2776         vb->forceMaxCas(cas);
2777         return ENGINE_SUCCESS;
2778     }
2779     return ENGINE_NOT_MY_VBUCKET;
2780 }
2781
2782 std::ostream& operator<<(std::ostream& os, const KVBucket::Position& pos) {
2783     os << "vbucket:" << pos.vbucket_id;
2784     return os;
2785 }
2786
2787 void KVBucket::notifyFlusher(const uint16_t vbid) {
2788     KVShard* shard = vbMap.getShardByVbId(vbid);
2789     if (shard) {
2790         shard->getFlusher()->notifyFlushEvent();
2791     } else {
2792         throw std::logic_error(
2793                 "KVBucket::notifyFlusher() : shard null for "
2794                 "vbucket " +
2795                 std::to_string(vbid));
2796     }
2797 }
2798
2799 void KVBucket::notifyReplication(const uint16_t vbid, const int64_t bySeqno) {
2800     engine.getTapConnMap().notifyVBConnections(vbid);
2801     engine.getDcpConnMap().notifyVBConnections(vbid, bySeqno);
2802 }
2803
2804 void KVBucket::initializeExpiryPager(Configuration& config) {
2805     {
2806         LockHolder elh(expiryPager.mutex);
2807         expiryPager.enabled = config.isExpPagerEnabled();
2808     }
2809
2810     setExpiryPagerSleeptime(config.getExpPagerStime());
2811
2812     config.addValueChangedListener("exp_pager_stime",
2813                                    new EPStoreValueChangeListener(*this));
2814     config.addValueChangedListener("exp_pager_enabled",
2815                                    new EPStoreValueChangeListener(*this));
2816     config.addValueChangedListener("exp_pager_initial_run_time",
2817                                    new EPStoreValueChangeListener(*this));
2818 }