MB-24066/MB-22178: Set opencheckpointid to 1 after rollback
[ep-engine.git] / src / ep.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <string.h>
21 #include <time.h>
22
23 #include <fstream>
24 #include <functional>
25 #include <iostream>
26 #include <map>
27 #include <sstream>
28 #include <string>
29 #include <utility>
30 #include <vector>
31
32 #include "access_scanner.h"
33 #include "bgfetcher.h"
34 #include "checkpoint_remover.h"
35 #include "conflict_resolution.h"
36 #include "defragmenter.h"
37 #include "ep.h"
38 #include "ep_engine.h"
39 #include "ext_meta_parser.h"
40 #include "failover-table.h"
41 #include "flusher.h"
42 #include "htresizer.h"
43 #include "kvshard.h"
44 #include "kvstore.h"
45 #include "locks.h"
46 #include "mutation_log.h"
47 #include "warmup.h"
48 #include "connmap.h"
49 #include "replicationthrottle.h"
50 #include "tasks.h"
51
52 class StatsValueChangeListener : public ValueChangedListener {
53 public:
54     StatsValueChangeListener(EPStats &st, EventuallyPersistentStore &str)
55         : stats(st), store(str) {
56         // EMPTY
57     }
58
59     virtual void sizeValueChanged(const std::string &key, size_t value) {
60         if (key.compare("max_size") == 0) {
61             stats.setMaxDataSize(value);
62             store.getEPEngine().getDcpConnMap(). \
63                                      updateMaxActiveSnoozingBackfills(value);
64             size_t low_wat = static_cast<size_t>
65                     (static_cast<double>(value) * stats.mem_low_wat_percent);
66             size_t high_wat = static_cast<size_t>
67                     (static_cast<double>(value) * stats.mem_high_wat_percent);
68             stats.mem_low_wat.store(low_wat);
69             stats.mem_high_wat.store(high_wat);
70             store.setCursorDroppingLowerUpperThresholds(value);
71         } else if (key.compare("mem_low_wat") == 0) {
72             stats.mem_low_wat.store(value);
73             stats.mem_low_wat_percent.store(
74                                     (double)(value) / stats.getMaxDataSize());
75         } else if (key.compare("mem_high_wat") == 0) {
76             stats.mem_high_wat.store(value);
77             stats.mem_high_wat_percent.store(
78                                     (double)(value) / stats.getMaxDataSize());
79         } else if (key.compare("replication_throttle_threshold") == 0) {
80             stats.replicationThrottleThreshold.store(
81                                           static_cast<double>(value) / 100.0);
82         } else if (key.compare("warmup_min_memory_threshold") == 0) {
83             stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
84         } else if (key.compare("warmup_min_items_threshold") == 0) {
85             stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
86         } else {
87             LOG(EXTENSION_LOG_WARNING,
88                 "Failed to change value for unknown variable, %s\n",
89                 key.c_str());
90         }
91     }
92
93 private:
94     EPStats &stats;
95     EventuallyPersistentStore &store;
96 };
97
98 /**
99  * A configuration value changed listener that responds to ep-engine
100  * parameter changes by invoking engine-specific methods on
101  * configuration change events.
102  */
103 class EPStoreValueChangeListener : public ValueChangedListener {
104 public:
105     EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
106     }
107
108     virtual void sizeValueChanged(const std::string &key, size_t value) {
109         if (key.compare("bg_fetch_delay") == 0) {
110             store.setBGFetchDelay(static_cast<uint32_t>(value));
111         } else if (key.compare("compaction_write_queue_cap") == 0) {
112             store.setCompactionWriteQueueCap(value);
113         } else if (key.compare("exp_pager_stime") == 0) {
114             store.setExpiryPagerSleeptime(value);
115         } else if (key.compare("exp_pager_initial_run_time") == 0) {
116             store.setExpiryPagerTasktime(value);
117         } else if (key.compare("alog_sleep_time") == 0) {
118             store.setAccessScannerSleeptime(value, false);
119         } else if (key.compare("alog_task_time") == 0) {
120             store.resetAccessScannerStartTime();
121         } else if (key.compare("mutation_mem_threshold") == 0) {
122             double mem_threshold = static_cast<double>(value) / 100;
123             StoredValue::setMutationMemoryThreshold(mem_threshold);
124         } else if (key.compare("backfill_mem_threshold") == 0) {
125             double backfill_threshold = static_cast<double>(value) / 100;
126             store.setBackfillMemoryThreshold(backfill_threshold);
127         } else if (key.compare("compaction_exp_mem_threshold") == 0) {
128             store.setCompactionExpMemThreshold(value);
129         } else if (key.compare("replication_throttle_queue_cap") == 0) {
130             store.getEPEngine().getReplicationThrottle().setQueueCap(value);
131         } else if (key.compare("replication_throttle_cap_pcnt") == 0) {
132             store.getEPEngine().getReplicationThrottle().setCapPercent(value);
133         } else {
134             LOG(EXTENSION_LOG_WARNING,
135                 "Failed to change value for unknown variable, %s\n",
136                 key.c_str());
137         }
138     }
139
140     virtual void booleanValueChanged(const std::string &key, bool value) {
141         if (key.compare("access_scanner_enabled") == 0) {
142             if (value) {
143                 store.enableAccessScannerTask();
144             } else {
145                 store.disableAccessScannerTask();
146             }
147         } else if (key.compare("bfilter_enabled") == 0) {
148             store.setAllBloomFilters(value);
149         } else if (key.compare("exp_pager_enabled") == 0) {
150             if (value) {
151                 store.enableExpiryPager();
152             } else {
153                 store.disableExpiryPager();
154             }
155         }
156     }
157
158     virtual void floatValueChanged(const std::string &key, float value) {
159         if (key.compare("bfilter_residency_threshold") == 0) {
160             store.setBfiltersResidencyThreshold(value);
161         } else if (key.compare("dcp_min_compression_ratio") == 0) {
162             store.getEPEngine().updateDcpMinCompressionRatio(value);
163         }
164     }
165
166 private:
167     EventuallyPersistentStore &store;
168 };
169
170 /**
171  * Callback class used by EpStore, for adding relevent keys
172  * to bloomfilter during compaction.
173  */
174 class BloomFilterCallback : public Callback<std::string&, bool&> {
175 public:
176     BloomFilterCallback(EventuallyPersistentStore& eps, uint16_t vbid,
177                         bool residentRatioAlert)
178         : store(eps), vbucketId(vbid),
179           residentRatioLessThanThreshold(residentRatioAlert) {
180     }
181
182     void callback(std::string& key, bool& isDeleted) {
183         RCPtr<VBucket> vb = store.getVBucket(vbucketId);
184         if (vb) {
185             if (vb->isTempFilterAvailable()) {
186                 if (store.getItemEvictionPolicy() == VALUE_ONLY) {
187                     /**
188                      * VALUE-ONLY EVICTION POLICY
189                      * Consider deleted items only.
190                      */
191                     if (isDeleted) {
192                         vb->addToTempFilter(key);
193                     }
194                 } else {
195                     /**
196                      * FULL EVICTION POLICY
197                      * If vbucket's resident ratio is found to be less than
198                      * the residency threshold, consider all items, otherwise
199                      * consider deleted and non-resident items only.
200                      */
201                     if (residentRatioLessThanThreshold) {
202                         vb->addToTempFilter(key);
203                     } else {
204                         if (isDeleted || !store.isMetaDataResident(vb, key)) {
205                             vb->addToTempFilter(key);
206                         }
207                     }
208                 }
209             }
210         }
211     }
212
213 private:
214     EventuallyPersistentStore& store;
215     uint16_t vbucketId;
216     bool residentRatioLessThanThreshold;
217 };
218
219 class PendingOpsNotification : public GlobalTask {
220 public:
221     PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
222         GlobalTask(&e, TaskId::PendingOpsNotification, 0, false),
223         engine(e), vbucket(vb) { }
224
225     std::string getDescription() {
226         std::stringstream ss;
227         ss << "Notify pending operations for vbucket " << vbucket->getId();
228         return ss.str();
229     }
230
231     bool run(void) {
232         vbucket->fireAllOps(engine);
233         return false;
234     }
235
236 private:
237     EventuallyPersistentEngine &engine;
238     RCPtr<VBucket> vbucket;
239 };
240
241 EventuallyPersistentStore::EventuallyPersistentStore(
242     EventuallyPersistentEngine &theEngine) :
243     engine(theEngine), stats(engine.getEpStats()),
244     vbMap(theEngine.getConfiguration(), *this),
245     defragmenterTask(NULL),
246     bgFetchQueue(0),
247     diskFlushAll(false), bgFetchDelay(0),
248     backfillMemoryThreshold(0.95),
249     statsSnapshotTaskId(0), lastTransTimePerItem(0)
250 {
251     cachedResidentRatio.activeRatio.store(0);
252     cachedResidentRatio.replicaRatio.store(0);
253
254     Configuration &config = engine.getConfiguration();
255     MutationLog *shardlog;
256     for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
257         std::stringstream s;
258         s << i;
259         shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
260                                  "." + s.str(),
261                                  engine.getConfiguration().getAlogBlockSize());
262         accessLog.push_back(shardlog);
263     }
264
265     storageProperties = new StorageProperties(true, true, true, true);
266
267     const auto size = GlobalTask::allTaskIds.size();
268     stats.schedulingHisto = new Histogram<ProcessClock::duration::rep>[size];
269     stats.taskRuntimeHisto = new Histogram<ProcessClock::duration::rep>[size];
270
271     for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
272         stats.schedulingHisto[i].reset();
273         stats.taskRuntimeHisto[i].reset();
274     }
275
276     ExecutorPool::get()->registerTaskable(ObjectRegistry::getCurrentEngine()->getTaskable());
277
278     size_t num_vbs = config.getMaxVbuckets();
279     vb_mutexes = new Mutex[num_vbs];
280
281     *stats.memOverhead = sizeof(EventuallyPersistentStore);
282
283     if (config.getConflictResolutionType().compare("lww") == 0) {
284         conflictResolver.reset(new LastWriteWinsResolution());
285     } else {
286         conflictResolver.reset(new RevisionSeqnoResolution());
287     }
288
289     stats.setMaxDataSize(config.getMaxSize());
290     config.addValueChangedListener("max_size",
291                                    new StatsValueChangeListener(stats, *this));
292     getEPEngine().getDcpConnMap().updateMaxActiveSnoozingBackfills(
293                                                         config.getMaxSize());
294
295     stats.mem_low_wat.store(config.getMemLowWat());
296     config.addValueChangedListener("mem_low_wat",
297                                    new StatsValueChangeListener(stats, *this));
298     stats.mem_low_wat_percent.store(
299                 (double)(stats.mem_low_wat.load()) / stats.getMaxDataSize());
300
301     stats.mem_high_wat.store(config.getMemHighWat());
302     config.addValueChangedListener("mem_high_wat",
303                                    new StatsValueChangeListener(stats, *this));
304     stats.mem_high_wat_percent.store(
305                 (double)(stats.mem_high_wat.load()) / stats.getMaxDataSize());
306
307     setCursorDroppingLowerUpperThresholds(config.getMaxSize());
308
309     stats.replicationThrottleThreshold.store(static_cast<double>
310                                     (config.getReplicationThrottleThreshold())
311                                      / 100.0);
312     config.addValueChangedListener("replication_throttle_threshold",
313                                    new StatsValueChangeListener(stats, *this));
314
315     stats.replicationThrottleWriteQueueCap.store(
316                                     config.getReplicationThrottleQueueCap());
317     config.addValueChangedListener("replication_throttle_queue_cap",
318                                    new EPStoreValueChangeListener(*this));
319     config.addValueChangedListener("replication_throttle_cap_pcnt",
320                                    new EPStoreValueChangeListener(*this));
321
322     setBGFetchDelay(config.getBgFetchDelay());
323     config.addValueChangedListener("bg_fetch_delay",
324                                    new EPStoreValueChangeListener(*this));
325
326     stats.warmupMemUsedCap.store(static_cast<double>
327                                (config.getWarmupMinMemoryThreshold()) / 100.0);
328     config.addValueChangedListener("warmup_min_memory_threshold",
329                                    new StatsValueChangeListener(stats, *this));
330     stats.warmupNumReadCap.store(static_cast<double>
331                                 (config.getWarmupMinItemsThreshold()) / 100.0);
332     config.addValueChangedListener("warmup_min_items_threshold",
333                                    new StatsValueChangeListener(stats, *this));
334
335     double mem_threshold = static_cast<double>
336                                       (config.getMutationMemThreshold()) / 100;
337     StoredValue::setMutationMemoryThreshold(mem_threshold);
338     config.addValueChangedListener("mutation_mem_threshold",
339                                    new EPStoreValueChangeListener(*this));
340
341     double backfill_threshold = static_cast<double>
342                                       (config.getBackfillMemThreshold()) / 100;
343     setBackfillMemoryThreshold(backfill_threshold);
344     config.addValueChangedListener("backfill_mem_threshold",
345                                    new EPStoreValueChangeListener(*this));
346
347     config.addValueChangedListener("bfilter_enabled",
348                                    new EPStoreValueChangeListener(*this));
349
350     bfilterResidencyThreshold = config.getBfilterResidencyThreshold();
351     config.addValueChangedListener("bfilter_residency_threshold",
352                                    new EPStoreValueChangeListener(*this));
353
354     compactionExpMemThreshold = config.getCompactionExpMemThreshold();
355     config.addValueChangedListener("compaction_exp_mem_threshold",
356                                    new EPStoreValueChangeListener(*this));
357
358     compactionWriteQueueCap = config.getCompactionWriteQueueCap();
359     config.addValueChangedListener("compaction_write_queue_cap",
360                                    new EPStoreValueChangeListener(*this));
361
362     config.addValueChangedListener("dcp_min_compression_ratio",
363                                    new EPStoreValueChangeListener(*this));
364
365     const std::string &policy = config.getItemEvictionPolicy();
366     if (policy.compare("value_only") == 0) {
367         eviction_policy = VALUE_ONLY;
368     } else {
369         eviction_policy = FULL_EVICTION;
370     }
371
372     warmupTask = new Warmup(*this, config);
373 }
374
375 bool EventuallyPersistentStore::initialize() {
376     // We should nuke everything unless we want warmup
377     Configuration &config = engine.getConfiguration();
378     if (!config.isWarmup()) {
379         reset();
380     }
381
382     if (!startFlusher()) {
383         LOG(EXTENSION_LOG_WARNING,
384             "FATAL: Failed to create and start flushers");
385         return false;
386     }
387     if (!startBgFetcher()) {
388         LOG(EXTENSION_LOG_WARNING,
389            "FATAL: Failed to create and start bgfetchers");
390         return false;
391     }
392
393     warmupTask->start();
394
395     itmpTask = new ItemPager(&engine, stats);
396     ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
397
398     LockHolder elh(expiryPager.mutex);
399     expiryPager.enabled = config.isExpPagerEnabled();
400     elh.unlock();
401
402     size_t expiryPagerSleeptime = config.getExpPagerStime();
403     setExpiryPagerSleeptime(expiryPagerSleeptime);
404     config.addValueChangedListener("exp_pager_stime",
405                                    new EPStoreValueChangeListener(*this));
406     config.addValueChangedListener("exp_pager_enabled",
407                                    new EPStoreValueChangeListener(*this));
408     config.addValueChangedListener("exp_pager_initial_run_time",
409                                    new EPStoreValueChangeListener(*this));
410
411     ExTask htrTask = new HashtableResizerTask(this, 10);
412     ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
413
414     size_t checkpointRemoverInterval = config.getChkRemoverStime();
415     chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
416                                                    checkpointRemoverInterval);
417     ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
418
419     ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
420     ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
421
422 #if HAVE_JEMALLOC
423     /* Only create the defragmenter task if we have an underlying memory
424      * allocator which can facilitate defragmenting memory.
425      */
426     defragmenterTask = new DefragmenterTask(&engine, stats);
427     ExecutorPool::get()->schedule(defragmenterTask, NONIO_TASK_IDX);
428 #endif
429
430     return true;
431 }
432
433 EventuallyPersistentStore::~EventuallyPersistentStore() {
434     stopWarmup();
435     stopBgFetcher();
436     ExecutorPool::get()->stopTaskGroup(engine.getTaskable().getGID(), NONIO_TASK_IDX,
437                                        stats.forceShutdown);
438
439     ExecutorPool::get()->cancel(statsSnapshotTaskId);
440
441     LockHolder lh(accessScanner.mutex);
442     ExecutorPool::get()->cancel(accessScanner.task);
443     lh.unlock();
444
445     stopFlusher();
446
447     ExecutorPool::get()->unregisterTaskable(engine.getTaskable(),
448                                             stats.forceShutdown);
449
450     delete [] vb_mutexes;
451     delete [] stats.schedulingHisto;
452     delete [] stats.taskRuntimeHisto;
453     delete warmupTask;
454     delete storageProperties;
455     defragmenterTask.reset();
456
457     std::vector<MutationLog*>::iterator it;
458     for (it = accessLog.begin(); it != accessLog.end(); it++) {
459         delete *it;
460     }
461 }
462
463 const Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
464     return vbMap.shards[shardId]->getFlusher();
465 }
466
467 uint16_t EventuallyPersistentStore::getCommitInterval(uint16_t shardId) {
468     Flusher *flusher = vbMap.shards[shardId]->getFlusher();
469     return flusher->getCommitInterval();
470 }
471
472 uint16_t EventuallyPersistentStore::decrCommitInterval(uint16_t shardId) {
473     Flusher *flusher = vbMap.shards[shardId]->getFlusher();
474     return flusher->decrCommitInterval();
475 }
476
477 Warmup* EventuallyPersistentStore::getWarmup(void) const {
478     return warmupTask;
479 }
480
481 bool EventuallyPersistentStore::startFlusher() {
482     for (auto* shard : vbMap.shards) {
483         shard->getFlusher()->start();
484     }
485     return true;
486 }
487
488 void EventuallyPersistentStore::stopFlusher() {
489     for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
490         Flusher *flusher = vbMap.shards[i]->getFlusher();
491         LOG(EXTENSION_LOG_NOTICE, "Attempting to stop the flusher for "
492             "shard:%" PRIu16, i);
493         bool rv = flusher->stop(stats.forceShutdown);
494         if (rv && !stats.forceShutdown) {
495             flusher->wait();
496         }
497     }
498 }
499
500 bool EventuallyPersistentStore::pauseFlusher() {
501     bool rv = true;
502     for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
503         Flusher *flusher = vbMap.shards[i]->getFlusher();
504         if (!flusher->pause()) {
505             LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
506                 "[%s], shard = %d", flusher->stateName(), i);
507             rv = false;
508         }
509     }
510     return rv;
511 }
512
513 bool EventuallyPersistentStore::resumeFlusher() {
514     bool rv = true;
515     for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
516         Flusher *flusher = vbMap.shards[i]->getFlusher();
517         if (!flusher->resume()) {
518             LOG(EXTENSION_LOG_WARNING,
519                     "Attempted to resume flusher in state [%s], "
520                     "shard = %d", flusher->stateName(), i);
521             rv = false;
522         }
523     }
524     return rv;
525 }
526
527 void EventuallyPersistentStore::wakeUpFlusher() {
528     if (stats.diskQueueSize.load() == 0) {
529         for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
530             Flusher *flusher = vbMap.shards[i]->getFlusher();
531             flusher->wake();
532         }
533     }
534 }
535
536 bool EventuallyPersistentStore::startBgFetcher() {
537     for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
538         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
539         if (bgfetcher == NULL) {
540             LOG(EXTENSION_LOG_WARNING,
541                 "Failed to start bg fetcher for shard %d", i);
542             return false;
543         }
544         bgfetcher->start();
545     }
546     return true;
547 }
548
549 void EventuallyPersistentStore::stopBgFetcher() {
550     for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
551         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
552         if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
553             LOG(EXTENSION_LOG_WARNING,
554                 "Shutting down engine while there are still pending data "
555                 "read for shard %d from database storage", i);
556         }
557         LOG(EXTENSION_LOG_NOTICE, "Stopping bg fetcher for shard:%" PRIu16, i);
558         bgfetcher->stop();
559     }
560 }
561
562 void
563 EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
564                                              time_t startTime,
565                                              uint64_t revSeqno,
566                                              exp_type_t source) {
567     RCPtr<VBucket> vb = getVBucket(vbid);
568     if (vb) {
569         // Obtain reader access to the VB state change lock so that
570         // the VB can't switch state whilst we're processing
571         ReaderLockHolder rlh(vb->getStateLock());
572         if (vb->getState() == vbucket_state_active) {
573             int bucket_num(0);
574             LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
575             StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
576             if (v) {
577                 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
578                     // This is a temporary item whose background fetch for metadata
579                     // has completed.
580                     bool deleted = vb->ht.unlocked_del(key, bucket_num);
581                     if (!deleted) {
582                         throw std::logic_error("EPStore::deleteExpiredItem: "
583                                 "Failed to delete key '" + key + "' from bucket "
584                                 + std::to_string(bucket_num));
585                     }
586                 } else if (v->isExpired(startTime) && !v->isDeleted()) {
587                     vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
588                     queueDirty(vb, v, &lh, NULL);
589                 }
590             } else {
591                 if (eviction_policy == FULL_EVICTION) {
592                     // Create a temp item and delete and push it
593                     // into the checkpoint queue, only if the bloomfilter
594                     // predicts that the item may exist on disk.
595                     if (vb->maybeKeyExistsInFilter(key)) {
596                         add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
597                                                                     eviction_policy);
598                         if (rv == ADD_NOMEM) {
599                             return;
600                         }
601                         v = vb->ht.unlocked_find(key, bucket_num, true, false);
602                         v->setDeleted();
603                         v->setRevSeqno(revSeqno);
604                         vb->ht.unlocked_softDelete(v, 0, eviction_policy);
605                         queueDirty(vb, v, &lh, NULL);
606                     }
607                 }
608             }
609             incExpirationStat(vb, source);
610         }
611     }
612 }
613
614 void
615 EventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
616                                                         std::string> > &keys,
617                                               exp_type_t source) {
618     std::list<std::pair<uint16_t, std::string> >::iterator it;
619     time_t startTime = ep_real_time();
620     for (it = keys.begin(); it != keys.end(); it++) {
621         deleteExpiredItem(it->first, it->second, startTime, 0, source);
622     }
623 }
624
625 StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
626                                                         const const_sized_buffer key,
627                                                         int bucket_num,
628                                                         bool wantDeleted,
629                                                         bool trackReference,
630                                                         bool queueExpired) {
631     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
632                                           trackReference);
633     if (v && !v->isDeleted() && !v->isTempItem()) {
634         // In the deleted case, we ignore expiration time.
635         if (v->isExpired(ep_real_time())) {
636             if (vb->getState() != vbucket_state_active) {
637                 return wantDeleted ? v : NULL;
638             }
639
640             // queueDirty only allowed on active VB
641             if (queueExpired && vb->getState() == vbucket_state_active) {
642                 incExpirationStat(vb, EXP_BY_ACCESS);
643                 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
644                 queueDirty(vb, v, NULL, NULL);
645             }
646             return wantDeleted ? v : NULL;
647         }
648     }
649     return v;
650 }
651
652 bool EventuallyPersistentStore::isMetaDataResident(RCPtr<VBucket> &vb,
653                                                    const std::string &key) {
654
655     if (!vb) {
656         throw std::invalid_argument("EPStore::isMetaDataResident: vb is NULL");
657     }
658
659     int bucket_num(0);
660     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
661     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, false, false);
662
663     if (v && !v->isTempItem()) {
664         return true;
665     } else {
666         return false;
667     }
668 }
669
670 protocol_binary_response_status EventuallyPersistentStore::evictKey(
671                                                         const std::string &key,
672                                                         uint16_t vbucket,
673                                                         const char **msg,
674                                                         size_t *msg_size,
675                                                         bool force) {
676     RCPtr<VBucket> vb = getVBucket(vbucket);
677     if (!vb || (vb->getState() != vbucket_state_active && !force)) {
678         return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
679     }
680
681     int bucket_num(0);
682     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
683     StoredValue *v = fetchValidValue(vb, key, bucket_num, force, false);
684
685     protocol_binary_response_status rv(PROTOCOL_BINARY_RESPONSE_SUCCESS);
686
687     *msg_size = 0;
688     if (v) {
689         if (force)  {
690             v->markClean();
691         }
692         if (v->isResident()) {
693             if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
694                 *msg = "Ejected.";
695
696                 // Add key to bloom filter incase of full eviction mode
697                 if (getItemEvictionPolicy() == FULL_EVICTION) {
698                     vb->addToFilter(key);
699                 }
700             } else {
701                 *msg = "Can't eject: Dirty object.";
702                 rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
703             }
704         } else {
705             *msg = "Already ejected.";
706         }
707     } else {
708         if (eviction_policy == VALUE_ONLY) {
709             *msg = "Not found.";
710             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
711         } else {
712             *msg = "Already ejected.";
713         }
714     }
715
716     return rv;
717 }
718
719 ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
720                                                         LockHolder &lock,
721                                                         int bucket_num,
722                                                         const const_sized_buffer key,
723                                                         RCPtr<VBucket> &vb,
724                                                         const void *cookie,
725                                                         bool metadataOnly,
726                                                         bool isReplication) {
727
728     add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
729                                                 eviction_policy,
730                                                 isReplication);
731     switch(rv) {
732         case ADD_NOMEM:
733             return ENGINE_ENOMEM;
734
735         case ADD_EXISTS:
736         case ADD_UNDEL:
737         case ADD_SUCCESS:
738         case ADD_TMP_AND_BG_FETCH:
739             // Since the hashtable bucket is locked, we shouldn't get here
740             throw std::logic_error("EventuallyPersistentStore::addTempItemForBgFetch: "
741                     "Invalid result from addTempItem: " + std::to_string(rv));
742
743         case ADD_BG_FETCH:
744             lock.unlock();
745             bgFetch(key, vb->getId(), cookie, metadataOnly);
746     }
747     return ENGINE_EWOULDBLOCK;
748 }
749
750 ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
751                                                  const void *cookie,
752                                                  bool force,
753                                                  uint8_t nru) {
754
755     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
756     if (!vb) {
757         ++stats.numNotMyVBuckets;
758         return ENGINE_NOT_MY_VBUCKET;
759     }
760
761     // Obtain read-lock on VB state to ensure VB state changes are interlocked
762     // with this set
763     ReaderLockHolder rlh(vb->getStateLock());
764     if (vb->getState() == vbucket_state_dead) {
765         ++stats.numNotMyVBuckets;
766         return ENGINE_NOT_MY_VBUCKET;
767     } else if (vb->getState() == vbucket_state_replica && !force) {
768         ++stats.numNotMyVBuckets;
769         return ENGINE_NOT_MY_VBUCKET;
770     } else if (vb->getState() == vbucket_state_pending && !force) {
771         if (vb->addPendingOp(cookie)) {
772             return ENGINE_EWOULDBLOCK;
773         }
774     } else if (vb->isTakeoverBackedUp()) {
775         LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a set op"
776                 ", becuase takeover is lagging", vb->getId());
777         return ENGINE_TMPFAIL;
778     }
779
780     bool cas_op = (itm.getCas() != 0);
781     int bucket_num(0);
782     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
783     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num,
784                                           /*wantsDeleted*/true,
785                                           /*trackReference*/false);
786     if (v && v->isLocked(ep_current_time()) &&
787         (vb->getState() == vbucket_state_replica ||
788          vb->getState() == vbucket_state_pending)) {
789         v->unlock();
790     }
791
792     bool maybeKeyExists = true;
793     // If we didn't find a valid item, check Bloomfilter's prediction if in
794     // full eviction policy and for a CAS operation.
795     if ((v == nullptr || v->isTempInitialItem()) &&
796         (eviction_policy == FULL_EVICTION) &&
797         (itm.getCas() != 0)) {
798         // Check Bloomfilter's prediction
799         if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
800             maybeKeyExists = false;
801         }
802     }
803
804     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(), true, false,
805                                                 eviction_policy, nru,
806                                                 maybeKeyExists);
807
808     Item& it = const_cast<Item&>(itm);
809     uint64_t seqno = 0;
810     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
811     switch (mtype) {
812     case NOMEM:
813         ret = ENGINE_ENOMEM;
814         break;
815     case INVALID_CAS:
816     case IS_LOCKED:
817         ret = ENGINE_KEY_EEXISTS;
818         break;
819     case NOT_FOUND:
820         if (cas_op) {
821             ret = ENGINE_KEY_ENOENT;
822             break;
823         }
824         // FALLTHROUGH
825     case WAS_DIRTY:
826         // Even if the item was dirty, push it into the vbucket's open
827         // checkpoint.
828     case WAS_CLEAN:
829         // We keep lh held as we need to do v->getCas()
830         queueDirty(vb, v, nullptr, &seqno);
831         it.setBySeqno(seqno);
832         it.setCas(v->getCas());
833         break;
834     case NEED_BG_FETCH:
835     {   // CAS operation with non-resident item + full eviction.
836         if (v) {
837             // temp item is already created. Simply schedule a bg fetch job
838             lh.unlock();
839             bgFetch(itm.getKey(), vb->getId(), cookie, true);
840             return ENGINE_EWOULDBLOCK;
841         }
842         ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
843                                     cookie, true);
844         break;
845     }
846     case INVALID_VBUCKET:
847         ret = ENGINE_NOT_MY_VBUCKET;
848         break;
849     }
850
851     return ret;
852 }
853
854 ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
855                                                  const void *cookie)
856 {
857     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
858     if (!vb) {
859         ++stats.numNotMyVBuckets;
860         return ENGINE_NOT_MY_VBUCKET;
861     }
862
863     // Obtain read-lock on VB state to ensure VB state changes are interlocked
864     // with this add
865     ReaderLockHolder rlh(vb->getStateLock());
866     if (vb->getState() == vbucket_state_dead ||
867         vb->getState() == vbucket_state_replica) {
868         ++stats.numNotMyVBuckets;
869         return ENGINE_NOT_MY_VBUCKET;
870     } else if (vb->getState() == vbucket_state_pending) {
871         if (vb->addPendingOp(cookie)) {
872             return ENGINE_EWOULDBLOCK;
873         }
874     } else if (vb->isTakeoverBackedUp()) {
875         LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a add op"
876                 ", becuase takeover is lagging", vb->getId());
877         return ENGINE_TMPFAIL;
878     }
879
880     if (itm.getCas() != 0) {
881         // Adding with a cas value doesn't make sense..
882         return ENGINE_NOT_STORED;
883     }
884
885     int bucket_num(0);
886     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
887     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
888                                           false);
889
890     bool maybeKeyExists = true;
891     if ((v == nullptr || v->isTempInitialItem()) &&
892         (eviction_policy == FULL_EVICTION)) {
893         // Check bloomfilter's prediction
894         if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
895             maybeKeyExists = false;
896         }
897     }
898
899     add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
900                                            eviction_policy,
901                                            true, true,
902                                            maybeKeyExists);
903
904     Item& it = const_cast<Item&>(itm);
905     uint64_t seqno = 0;
906     switch (atype) {
907     case ADD_NOMEM:
908         return ENGINE_ENOMEM;
909     case ADD_EXISTS:
910         return ENGINE_NOT_STORED;
911     case ADD_TMP_AND_BG_FETCH:
912         return addTempItemForBgFetch(lh, bucket_num, it.getKey(), vb,
913                                      cookie, true);
914     case ADD_BG_FETCH:
915         lh.unlock();
916         bgFetch(it.getKey(), vb->getId(), cookie, true);
917         return ENGINE_EWOULDBLOCK;
918     case ADD_SUCCESS:
919     case ADD_UNDEL:
920         // We need to keep lh as we will do v->getCas()
921         queueDirty(vb, v, nullptr, &seqno);
922         it.setBySeqno(seqno);
923         it.setCas(v->getCas());
924         break;
925     }
926
927     return ENGINE_SUCCESS;
928 }
929
930 ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
931                                                      const void *cookie) {
932     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
933     if (!vb) {
934         ++stats.numNotMyVBuckets;
935         return ENGINE_NOT_MY_VBUCKET;
936     }
937
938     // Obtain read-lock on VB state to ensure VB state changes are interlocked
939     // with this replace
940     ReaderLockHolder rlh(vb->getStateLock());
941     if (vb->getState() == vbucket_state_dead ||
942         vb->getState() == vbucket_state_replica) {
943         ++stats.numNotMyVBuckets;
944         return ENGINE_NOT_MY_VBUCKET;
945     } else if (vb->getState() == vbucket_state_pending) {
946         if (vb->addPendingOp(cookie)) {
947             return ENGINE_EWOULDBLOCK;
948         }
949     }
950
951     int bucket_num(0);
952     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
953     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
954                                           false);
955     if (v) {
956         if (v->isDeleted() || v->isTempDeletedItem() ||
957             v->isTempNonExistentItem()) {
958             return ENGINE_KEY_ENOENT;
959         }
960
961         mutation_type_t mtype;
962         if (eviction_policy == FULL_EVICTION && v->isTempInitialItem()) {
963             mtype = NEED_BG_FETCH;
964         } else {
965             mtype = vb->ht.unlocked_set(v, itm, 0, true, false, eviction_policy,
966                                         0xff);
967         }
968
969         Item& it = const_cast<Item&>(itm);
970         uint64_t seqno = 0;
971         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
972         switch (mtype) {
973             case NOMEM:
974                 ret = ENGINE_ENOMEM;
975                 break;
976             case IS_LOCKED:
977                 ret = ENGINE_KEY_EEXISTS;
978                 break;
979             case INVALID_CAS:
980             case NOT_FOUND:
981                 ret = ENGINE_NOT_STORED;
982                 break;
983                 // FALLTHROUGH
984             case WAS_DIRTY:
985                 // Even if the item was dirty, push it into the vbucket's open
986                 // checkpoint.
987             case WAS_CLEAN:
988                 // Keep lh as we need to do v->getCas()
989                 queueDirty(vb, v, nullptr, &seqno);
990                 it.setBySeqno(seqno);
991                 it.setCas(v->getCas());
992                 break;
993             case NEED_BG_FETCH:
994             {
995                 // temp item is already created. Simply schedule a bg fetch job
996                 lh.unlock();
997                 bgFetch(it.getKey(), vb->getId(), cookie, true);
998                 ret = ENGINE_EWOULDBLOCK;
999                 break;
1000             }
1001             case INVALID_VBUCKET:
1002                 ret = ENGINE_NOT_MY_VBUCKET;
1003                 break;
1004         }
1005
1006         return ret;
1007     } else {
1008         if (eviction_policy == VALUE_ONLY) {
1009             return ENGINE_KEY_ENOENT;
1010         }
1011
1012         if (vb->maybeKeyExistsInFilter(itm.getKey())) {
1013             return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1014                                          cookie, false);
1015         } else {
1016             // As bloomfilter predicted that item surely doesn't exist
1017             // on disk, return ENOENT for replace().
1018             return ENGINE_KEY_ENOENT;
1019         }
1020     }
1021 }
1022
1023 ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(
1024                                                         const Item &itm,
1025                                                         uint8_t nru,
1026                                                         bool genBySeqno,
1027                                                         ExtendedMetaData *emd) {
1028
1029     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
1030     if (!vb) {
1031         ++stats.numNotMyVBuckets;
1032         return ENGINE_NOT_MY_VBUCKET;
1033     }
1034
1035     // Obtain read-lock on VB state to ensure VB state changes are interlocked
1036     // with this add-tapbackfill
1037     ReaderLockHolder rlh(vb->getStateLock());
1038     if (vb->getState() == vbucket_state_dead ||
1039         vb->getState() == vbucket_state_active) {
1040         ++stats.numNotMyVBuckets;
1041         return ENGINE_NOT_MY_VBUCKET;
1042     }
1043
1044     //check for the incoming item's CAS validity
1045     if (!Item::isValidCas(itm.getCas())) {
1046         return ENGINE_KEY_EEXISTS;
1047     }
1048
1049     int bucket_num(0);
1050     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
1051     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
1052                                           false);
1053
1054     // Note that this function is only called on replica or pending vbuckets.
1055     if (v && v->isLocked(ep_current_time())) {
1056         v->unlock();
1057     }
1058     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, 0, true, true,
1059                                                 eviction_policy, nru);
1060
1061     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1062     switch (mtype) {
1063     case NOMEM:
1064         ret = ENGINE_ENOMEM;
1065         break;
1066     case INVALID_CAS:
1067     case IS_LOCKED:
1068         ret = ENGINE_KEY_EEXISTS;
1069         break;
1070     case WAS_DIRTY:
1071         // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
1072         // set correctly, and also the sequence numbers are ordered correctly.
1073         // (MB-14003)
1074     case NOT_FOUND:
1075         // FALLTHROUGH
1076     case WAS_CLEAN:
1077         vb->setMaxCas(v->getCas());
1078         tapQueueDirty(*vb, v, lh, NULL,
1079                       genBySeqno ? GenerateBySeqno::Yes : GenerateBySeqno::No);
1080         break;
1081     case INVALID_VBUCKET:
1082         ret = ENGINE_NOT_MY_VBUCKET;
1083         break;
1084     case NEED_BG_FETCH:
1085         throw std::logic_error("EventuallyPersistentStore::addTAPBackfillItem: "
1086                 "SET on a non-active vbucket should not require a "
1087                 "bg_metadata_fetch.");
1088     }
1089
1090     return ret;
1091 }
1092
1093 class KVStatsCallback : public Callback<kvstats_ctx> {
1094     public:
1095         KVStatsCallback(EventuallyPersistentStore *store)
1096             : epstore(store) { }
1097
1098        void callback(kvstats_ctx &ctx) {
1099             RCPtr<VBucket> vb = epstore->getVBucket(ctx.vbucket);
1100             if (vb) {
1101                 vb->fileSpaceUsed = ctx.fileSpaceUsed;
1102                 vb->fileSize = ctx.fileSize;
1103             }
1104         }
1105
1106     private:
1107         EventuallyPersistentStore *epstore;
1108 };
1109
1110 ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
1111                                                            vbucket_state_t to,
1112                                                            bool transfer,
1113                                                            bool notify_dcp) {
1114     LockHolder lh(vbsetMutex);
1115     return setVBucketState_UNLOCKED(vbid, to, transfer, notify_dcp, lh);
1116 }
1117
1118 ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState_UNLOCKED(uint16_t vbid,
1119                                                            vbucket_state_t to,
1120                                                            bool transfer,
1121                                                            bool notify_dcp,
1122                                                            LockHolder& vbset) {
1123     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1124     if (vb && to == vb->getState()) {
1125         return ENGINE_SUCCESS;
1126     }
1127
1128     if (vb) {
1129         vbucket_state_t oldstate = vb->getState();
1130
1131         vb->setState(to);
1132
1133         if (oldstate != to && notify_dcp) {
1134             bool closeInboundStreams = false;
1135             if (to == vbucket_state_active && !transfer) {
1136                 /**
1137                  * Close inbound (passive) streams into the vbucket
1138                  * only in case of a failover.
1139                  */
1140                 closeInboundStreams = true;
1141             }
1142             engine.getDcpConnMap().vbucketStateChanged(vbid, to,
1143                                                        closeInboundStreams);
1144         }
1145
1146         if (to == vbucket_state_active && oldstate == vbucket_state_replica) {
1147             /**
1148              * Update snapshot range when vbucket goes from being a replica
1149              * to active, to maintain the correct snapshot sequence numbers
1150              * even in a failover scenario.
1151              */
1152             vb->checkpointManager.resetSnapshotRange();
1153         }
1154
1155         if (to == vbucket_state_active && !transfer) {
1156             const snapshot_range_t range = vb->getPersistedSnapshot();
1157             if (range.end == vbMap.getPersistenceSeqno(vbid)) {
1158                 vb->failovers->createEntry(range.end);
1159             } else {
1160                 vb->failovers->createEntry(range.start);
1161             }
1162         }
1163
1164         if (oldstate == vbucket_state_pending &&
1165             to == vbucket_state_active) {
1166             ExTask notifyTask = new PendingOpsNotification(engine, vb);
1167             ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
1168         }
1169         scheduleVBStatePersist(vbid);
1170     } else if (vbid < vbMap.getSize()) {
1171         FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
1172         KVShard* shard = vbMap.getShardByVbId(vbid);
1173         std::shared_ptr<Callback<uint16_t> > cb(new NotifyFlusherCB(shard));
1174         Configuration& config = engine.getConfiguration();
1175         RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
1176                                          engine.getCheckpointConfig(),
1177                                          shard, 0, 0, 0, ft, cb,
1178                                          config));
1179
1180         if (config.isBfilterEnabled()) {
1181             // Initialize bloom filters upon vbucket creation during
1182             // bucket creation and rebalance
1183             newvb->createFilter(config.getBfilterKeyCount(),
1184                                 config.getBfilterFpProb());
1185         }
1186
1187         // The first checkpoint for active vbucket should start with id 2.
1188         uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
1189         newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
1190         if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
1191             return ENGINE_ERANGE;
1192         }
1193         vbMap.setPersistenceCheckpointId(vbid, 0);
1194         vbMap.setPersistenceSeqno(vbid, 0);
1195         vbMap.setBucketCreation(vbid, true);
1196         scheduleVBStatePersist(vbid);
1197     } else {
1198         return ENGINE_ERANGE;
1199     }
1200     return ENGINE_SUCCESS;
1201 }
1202
1203 void EventuallyPersistentStore::scheduleVBStatePersist() {
1204     for (auto vbid : vbMap.getBuckets()) {
1205         scheduleVBStatePersist(vbid);
1206     }
1207 }
1208
1209 void EventuallyPersistentStore::scheduleVBStatePersist(VBucket::id_type vbid) {
1210     RCPtr<VBucket> vb = getVBucket(vbid);
1211
1212     if (!vb) {
1213         LOG(EXTENSION_LOG_WARNING,
1214             "EPStore::scheduleVBStatePersist: vb:%" PRIu16
1215             " does not not exist. Unable to schedule persistence.", vbid);
1216         return;
1217     }
1218
1219     vb->checkpointManager.queueSetVBState(*vb);
1220 }
1221
1222 bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
1223                                                         const void* cookie) {
1224     LockHolder lh(vbsetMutex);
1225
1226     hrtime_t start_time(gethrtime());
1227     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1228     if (!vb || vb->getState() == vbucket_state_dead ||
1229          vbMap.isBucketDeletion(vbid)) {
1230         lh.unlock();
1231         LockHolder vlh(vb_mutexes[vbid]);
1232         getRWUnderlying(vbid)->delVBucket(vbid);
1233         vbMap.setBucketDeletion(vbid, false);
1234         vbMap.setBucketCreation(vbid, false);
1235         vbMap.setPersistenceSeqno(vbid, 0);
1236         ++stats.vbucketDeletions;
1237     }
1238
1239     hrtime_t spent(gethrtime() - start_time);
1240     hrtime_t wall_time = spent / 1000;
1241     BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
1242     stats.diskVBDelHisto.add(wall_time);
1243     atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
1244     stats.vbucketDelTotWalltime.fetch_add(wall_time);
1245     if (cookie) {
1246         engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1247     }
1248
1249     return true;
1250 }
1251
1252 void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
1253                                                    const void* cookie,
1254                                                    double delay) {
1255     ExTask delTask = new VBucketMemoryDeletionTask(engine, vb, delay);
1256     ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
1257
1258     if (vbMap.setBucketDeletion(vb->getId(), true)) {
1259         ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie);
1260         ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1261     }
1262 }
1263
1264 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteVBucket(uint16_t vbid,
1265                                                            const void* c) {
1266     // Lock to prevent a race condition between a failed update and add
1267     // (and delete).
1268     LockHolder lh(vbsetMutex);
1269
1270     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1271     if (!vb) {
1272         return ENGINE_NOT_MY_VBUCKET;
1273     }
1274
1275     vb->setState(vbucket_state_dead);
1276     engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
1277     vbMap.removeBucket(vbid);
1278     lh.unlock();
1279     scheduleVBDeletion(vb, c);
1280     if (c) {
1281         return ENGINE_EWOULDBLOCK;
1282     }
1283     return ENGINE_SUCCESS;
1284 }
1285
1286 ENGINE_ERROR_CODE EventuallyPersistentStore::checkForDBExistence(DBFileId db_file_id) {
1287     std::string backend = engine.getConfiguration().getBackend();
1288     if (backend.compare("couchdb") == 0) {
1289         RCPtr<VBucket> vb = vbMap.getBucket(db_file_id);
1290         if (!vb) {
1291             return ENGINE_NOT_MY_VBUCKET;
1292         }
1293     } else if (backend.compare("forestdb") == 0) {
1294         if (db_file_id > (vbMap.getNumShards() - 1)) {
1295             //TODO: find a better error code
1296             return ENGINE_EINVAL;
1297         }
1298     } else {
1299         LOG(EXTENSION_LOG_WARNING,
1300             "Unknown backend specified for db file id: %d", db_file_id);
1301         return ENGINE_FAILED;
1302     }
1303
1304     return ENGINE_SUCCESS;
1305 }
1306
1307 ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
1308                                                        compaction_ctx c,
1309                                                        const void *cookie) {
1310     ENGINE_ERROR_CODE errCode = checkForDBExistence(c.db_file_id);
1311     if (errCode != ENGINE_SUCCESS) {
1312         return errCode;
1313     }
1314
1315     /* Obtain the vbucket so we can get the previous purge seqno */
1316     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1317     if (!vb) {
1318         return ENGINE_NOT_MY_VBUCKET;
1319     }
1320
1321     /* Update the compaction ctx with the previous purge seqno */
1322     c.max_purged_seq = vb->getPurgeSeqno();
1323
1324     LockHolder lh(compactionLock);
1325     ExTask task = new CompactTask(&engine, c, cookie);
1326     compactionTasks.push_back(std::make_pair(c.db_file_id, task));
1327     if (compactionTasks.size() > 1) {
1328         if ((stats.diskQueueSize > compactionWriteQueueCap &&
1329             compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
1330             engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
1331             // Snooze a new compaction task.
1332             // We will wake it up when one of the existing compaction tasks is done.
1333             task->snooze(60);
1334         }
1335     }
1336
1337     ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1338
1339     LOG(EXTENSION_LOG_DEBUG,
1340         "Scheduled compaction task %" PRIu64 " on db %d,"
1341         "purge_before_ts = %" PRIu64 ", purge_before_seq = %" PRIu64
1342         ", dropdeletes = %d",
1343         uint64_t(task->getId()),c.db_file_id, c.purge_before_ts,
1344         c.purge_before_seq, c.drop_deletes);
1345
1346    return ENGINE_EWOULDBLOCK;
1347 }
1348
1349 class ExpiredItemsCallback : public Callback<std::string&, uint64_t&> {
1350     public:
1351         ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid,
1352                              time_t start)
1353             : epstore(store), vbucket(vbid), startTime(start) { }
1354
1355         void callback(std::string& key, uint64_t& revSeqno) {
1356             if (epstore->compactionCanExpireItems()) {
1357                 epstore->deleteExpiredItem(vbucket, key, startTime, revSeqno,
1358                                            EXP_BY_COMPACTOR);
1359             }
1360         }
1361
1362     private:
1363         EventuallyPersistentStore *epstore;
1364         uint16_t vbucket;
1365         time_t startTime;
1366 };
1367
1368 bool EventuallyPersistentStore::doCompact(compaction_ctx *ctx,
1369                                           const void *cookie) {
1370     ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1371     const uint16_t vbid = ctx->db_file_id;
1372     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1373     if (vb) {
1374         LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1375         if (!lh.islocked()) {
1376             return true; // Schedule a compaction task again.
1377         }
1378
1379         Configuration &config = getEPEngine().getConfiguration();
1380         if (config.isBfilterEnabled()) {
1381             size_t initial_estimation = config.getBfilterKeyCount();
1382             size_t estimated_count;
1383             size_t num_deletes =
1384                     getROUnderlying(vbid)->getNumPersistedDeletes(vbid);
1385             if (eviction_policy == VALUE_ONLY) {
1386                 /**
1387                  * VALUE-ONLY EVICTION POLICY
1388                  * Obtain number of persisted deletes from underlying kvstore.
1389                  * Bloomfilter's estimated_key_count = 1.25 * deletes
1390                  */
1391
1392                 estimated_count = round(1.25 * num_deletes);
1393                 std::shared_ptr<Callback<std::string&, bool&> >
1394                     filter(new BloomFilterCallback(*this, vbid, false));
1395                 ctx->bloomFilterCallback = filter;
1396             } else {
1397                 /**
1398                  * FULL EVICTION POLICY
1399                  * First determine if the resident ratio of vbucket is less than
1400                  * the threshold from configuration.
1401                  */
1402
1403                 bool residentRatioAlert = vb->isResidentRatioUnderThreshold(
1404                                                 getBfiltersResidencyThreshold(),
1405                                                 eviction_policy);
1406                 std::shared_ptr<Callback<std::string&, bool&> >
1407                     filter(new BloomFilterCallback(*this, vbid, residentRatioAlert));
1408                 ctx->bloomFilterCallback = filter;
1409
1410                 /**
1411                  * Based on resident ratio against threshold, estimate count.
1412                  *
1413                  * 1. If resident ratio is greater than the threshold:
1414                  * Obtain number of persisted deletes from underlying kvstore.
1415                  * Obtain number of non-resident-items for vbucket.
1416                  * Bloomfilter's estimated_key_count =
1417                  *                              1.25 * (deletes + non-resident)
1418                  *
1419                  * 2. Otherwise:
1420                  * Obtain number of items for vbucket.
1421                  * Bloomfilter's estimated_key_count =
1422                  *                              1.25 * (num_items)
1423                  */
1424
1425                 if (residentRatioAlert) {
1426                     estimated_count = round(1.25 *
1427                                             vb->getNumItems(eviction_policy));
1428                 } else {
1429                     estimated_count = round(1.25 * (num_deletes +
1430                                 vb->getNumNonResidentItems(eviction_policy)));
1431                 }
1432             }
1433             if (estimated_count < initial_estimation) {
1434                 estimated_count = initial_estimation;
1435             }
1436             vb->initTempFilter(estimated_count, config.getBfilterFpProb());
1437         }
1438
1439         if (vb->getState() == vbucket_state_active) {
1440             // Set the current time ONLY for active vbuckets.
1441             ctx->curr_time = ep_real_time();
1442         } else {
1443             ctx->curr_time = 0;
1444         }
1445         std::shared_ptr<Callback<std::string&, uint64_t&> >
1446            expiry(new ExpiredItemsCallback(this, vbid, ctx->curr_time));
1447         ctx->expiryCallback = expiry;
1448
1449         KVStatsCallback kvcb(this);
1450         if (getRWUnderlying(vbid)->compactDB(ctx, kvcb)) {
1451             if (config.isBfilterEnabled()) {
1452                 vb->swapFilter();
1453             } else {
1454                 vb->clearFilter();
1455             }
1456         } else {
1457             LOG(EXTENSION_LOG_WARNING, "Compaction: Not successful for vb %u, "
1458                     "clearing bloom filter, if any.", vb->getId());
1459             vb->clearFilter();
1460         }
1461         vb->setPurgeSeqno(ctx->max_purged_seq);
1462     } else {
1463         err = ENGINE_NOT_MY_VBUCKET;
1464         engine.storeEngineSpecific(cookie, NULL);
1465         //Decrement session counter here, as memcached thread wouldn't
1466         //visit the engine interface in case of a NOT_MY_VB notification
1467         engine.decrementSessionCtr();
1468     }
1469
1470     updateCompactionTasks(ctx->db_file_id);
1471
1472     if (cookie) {
1473         engine.notifyIOComplete(cookie, err);
1474     }
1475     --stats.pendingCompactions;
1476     return false;
1477 }
1478
1479 void EventuallyPersistentStore::updateCompactionTasks(DBFileId db_file_id) {
1480     LockHolder lh(compactionLock);
1481     bool erased = false, woke = false;
1482     std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
1483     while (it != compactionTasks.end()) {
1484         if ((*it).first == db_file_id) {
1485             it = compactionTasks.erase(it);
1486             erased = true;
1487         } else {
1488             ExTask &task = (*it).second;
1489             if (task->getState() == TASK_SNOOZED) {
1490                 ExecutorPool::get()->wake(task->getId());
1491                 woke = true;
1492             }
1493             ++it;
1494         }
1495         if (erased && woke) {
1496             break;
1497         }
1498     }
1499 }
1500
1501 bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
1502     LockHolder lh(vbsetMutex);
1503     return resetVBucket_UNLOCKED(vbid, lh);
1504 }
1505
1506 bool EventuallyPersistentStore::resetVBucket_UNLOCKED(uint16_t vbid, LockHolder& vbset) {
1507     bool rv(false);
1508
1509     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1510     if (vb) {
1511         vbucket_state_t vbstate = vb->getState();
1512
1513         vbMap.removeBucket(vbid);
1514
1515         checkpointCursorInfoList cursors =
1516                                         vb->checkpointManager.getAllCursors();
1517         // Delete and recreate the vbucket database file
1518         scheduleVBDeletion(vb, NULL, 0);
1519         setVBucketState_UNLOCKED(vbid, vbstate,
1520                                  false/*transfer*/, true/*notifyDcp*/, vbset);
1521
1522         // Copy the all cursors from the old vbucket into the new vbucket
1523         RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
1524         newvb->checkpointManager.resetCursors(cursors);
1525
1526         rv = true;
1527     }
1528     return rv;
1529 }
1530
1531 extern "C" {
1532
1533     typedef struct {
1534         EventuallyPersistentEngine* engine;
1535         std::map<std::string, std::string> smap;
1536     } snapshot_stats_t;
1537
1538     static void add_stat(const char *key, const uint16_t klen,
1539                          const char *val, const uint32_t vlen,
1540                          const void *cookie) {
1541         if (cookie == nullptr) {
1542             throw std::invalid_argument("add_stat: cookie is NULL");
1543         }
1544         void *ptr = const_cast<void *>(cookie);
1545         snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1546         ObjectRegistry::onSwitchThread(snap->engine);
1547
1548         std::string k(key, klen);
1549         std::string v(val, vlen);
1550         snap->smap.insert(std::pair<std::string, std::string>(k, v));
1551     }
1552 }
1553
1554 void EventuallyPersistentStore::snapshotStats() {
1555     snapshot_stats_t snap;
1556     snap.engine = &engine;
1557     bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1558               engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1559               engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS;
1560
1561     if (rv && stats.isShutdown) {
1562         snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1563                                                               "true" : "false";
1564         std::stringstream ss;
1565         ss << ep_real_time();
1566         snap.smap["ep_shutdown_time"] = ss.str();
1567     }
1568     getOneRWUnderlying()->snapshotStats(snap.smap);
1569 }
1570
1571 void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
1572                                               const hrtime_t start,
1573                                               const hrtime_t stop) {
1574     if (stop >= start && start >= init) {
1575         // skip the measurement if the counter wrapped...
1576         ++stats.bgNumOperations;
1577         hrtime_t w = (start - init) / 1000;
1578         BlockTimer::log(start - init, "bgwait", stats.timingLog);
1579         stats.bgWaitHisto.add(w);
1580         stats.bgWait.fetch_add(w);
1581         atomic_setIfLess(stats.bgMinWait, w);
1582         atomic_setIfBigger(stats.bgMaxWait, w);
1583
1584         hrtime_t l = (stop - start) / 1000;
1585         BlockTimer::log(stop - start, "bgload", stats.timingLog);
1586         stats.bgLoadHisto.add(l);
1587         stats.bgLoad.fetch_add(l);
1588         atomic_setIfLess(stats.bgMinLoad, l);
1589         atomic_setIfBigger(stats.bgMaxLoad, l);
1590     }
1591 }
1592
1593 void EventuallyPersistentStore::completeBGFetch(const std::string &key,
1594                                                 uint16_t vbucket,
1595                                                 const void *cookie,
1596                                                 hrtime_t init,
1597                                                 bool isMeta) {
1598     hrtime_t start(gethrtime());
1599     // Go find the data
1600     RememberingCallback<GetValue> gcb;
1601     if (isMeta) {
1602         gcb.val.setPartial();
1603         ++stats.bg_meta_fetched;
1604     } else {
1605         ++stats.bg_fetched;
1606     }
1607     getROUnderlying(vbucket)->get(key, vbucket, gcb);
1608     gcb.waitForValue();
1609     ENGINE_ERROR_CODE status = gcb.val.getStatus();
1610
1611     // Lock to prevent a race condition between a fetch for restore and delete
1612     LockHolder lh(vbsetMutex);
1613
1614     RCPtr<VBucket> vb = getVBucket(vbucket);
1615     if (vb) {
1616         ReaderLockHolder rlh(vb->getStateLock());
1617         int bucket_num(0);
1618         LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1619         StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1620         if (isMeta) {
1621             if ((v && v->unlocked_restoreMeta(gcb.val.getValue(),
1622                                               gcb.val.getStatus(), vb->ht))
1623                 || ENGINE_KEY_ENOENT == status) {
1624                 /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1625                  key is removed from hash table by the time bgfetch returns
1626                  (in case multiple bgfetch is scheduled for a key), we still
1627                  need to return ENGINE_SUCCESS to the memcached worker thread,
1628                  so that the worker thread can visit the ep-engine and figure
1629                  out the correct flow */
1630                 status = ENGINE_SUCCESS;
1631             }
1632         } else {
1633             bool restore = false;
1634             if (v && v->isResident()) {
1635                 status = ENGINE_SUCCESS;
1636             } else if (v && v->isDeleted()) {
1637                 status = ENGINE_KEY_ENOENT;
1638             } else {
1639                 switch (eviction_policy) {
1640                     case VALUE_ONLY:
1641                         if (v && !v->isResident() && !v->isDeleted()) {
1642                             restore = true;
1643                         }
1644                         break;
1645                     case FULL_EVICTION:
1646                         if (v) {
1647                             if (v->isTempInitialItem() ||
1648                                 (!v->isResident() && !v->isDeleted())) {
1649                                 restore = true;
1650                             }
1651                         }
1652                         break;
1653                     default:
1654                         throw std::logic_error("Unknown eviction policy");
1655                 }
1656             }
1657
1658             if (restore) {
1659                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1660                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1661                     if (!v->isResident()) {
1662                         throw std::logic_error("EPStore::completeBGFetch: "
1663                                 "storedvalue (which has key " + v->getKey() +
1664                                 ") should be resident after calling restoreValue()");
1665                     }
1666                     if (vb->getState() == vbucket_state_active &&
1667                         v->getExptime() != gcb.val.getValue()->getExptime() &&
1668                         v->getCas() == gcb.val.getValue()->getCas()) {
1669                         // MB-9306: It is possible that by the time bgfetcher
1670                         // returns, the item may have been updated and queued
1671                         // Hence test the CAS value to be the same first.
1672                         // exptime mutated, schedule it into new checkpoint
1673                         queueDirty(vb, v, &hlh, NULL, GenerateBySeqno::Yes,
1674                                                     GenerateCas::No);
1675                     }
1676                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1677                     v->setNonExistent();
1678                     if (eviction_policy == FULL_EVICTION) {
1679                         // For the full eviction, we should notify
1680                         // ENGINE_SUCCESS to the memcached worker thread, so
1681                         // that the worker thread can visit the ep-engine and
1682                         // figure out the correct error code.
1683                         status = ENGINE_SUCCESS;
1684                     }
1685                 } else {
1686                     // underlying kvstore couldn't fetch requested data
1687                     // log returned error and notify TMPFAIL to client
1688                     LOG(EXTENSION_LOG_WARNING,
1689                         "Failed background fetch for vb=%d "
1690                         "seq=%" PRId64 " key=%s", vbucket, v->getBySeqno(),
1691                         key.c_str());
1692                     status = ENGINE_TMPFAIL;
1693                 }
1694             }
1695         }
1696     } else {
1697         LOG(EXTENSION_LOG_INFO, "VBucket %d's file was deleted in the middle of"
1698             " a bg fetch for key %s\n", vbucket, key.c_str());
1699         status = ENGINE_NOT_MY_VBUCKET;
1700     }
1701
1702     lh.unlock();
1703
1704     hrtime_t stop = gethrtime();
1705     updateBGStats(init, start, stop);
1706     bgFetchQueue--;
1707
1708     delete gcb.val.getValue();
1709     engine.notifyIOComplete(cookie, status);
1710 }
1711
1712 void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
1713                                  std::vector<bgfetched_item_t> &fetchedItems,
1714                                  hrtime_t startTime)
1715 {
1716     RCPtr<VBucket> vb = getVBucket(vbId);
1717     if (!vb) {
1718         std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1719         for (; itemItr != fetchedItems.end(); ++itemItr) {
1720             engine.notifyIOComplete((*itemItr).second->cookie,
1721                                     ENGINE_NOT_MY_VBUCKET);
1722         }
1723         LOG(EXTENSION_LOG_WARNING,
1724             "EP Store completes %d of batched background fetch for "
1725             "for vBucket = %d that is already deleted\n",
1726             (int)fetchedItems.size(), vbId);
1727         return;
1728     }
1729
1730     std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1731     for (; itemItr != fetchedItems.end(); ++itemItr) {
1732         VBucketBGFetchItem *bgitem = (*itemItr).second;
1733         ENGINE_ERROR_CODE status = bgitem->value.getStatus();
1734         Item *fetchedValue = bgitem->value.getValue();
1735         const std::string &key = (*itemItr).first;
1736         {   //locking scope
1737             ReaderLockHolder rlh(vb->getStateLock());
1738             int bucket = 0;
1739             LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
1740             StoredValue *v = fetchValidValue(vb, key, bucket, true);
1741             if (bgitem->metaDataOnly) {
1742                 if ((v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht))
1743                     || ENGINE_KEY_ENOENT == status) {
1744                     /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1745                      key is removed from hash table by the time bgfetch returns
1746                      (in case multiple bgfetch is scheduled for a key), we still
1747                      need to return ENGINE_SUCCESS to the memcached worker thread,
1748                      so that the worker thread can visit the ep-engine and figure
1749                      out the correct flow */
1750                     status = ENGINE_SUCCESS;
1751                 }
1752             } else {
1753                 bool restore = false;
1754                 if (v && v->isResident()) {
1755                     status = ENGINE_SUCCESS;
1756                 } else if (v && v->isDeleted()) {
1757                     status = ENGINE_KEY_ENOENT;
1758                 } else {
1759                     switch (eviction_policy) {
1760                         case VALUE_ONLY:
1761                             if (v && !v->isResident() && !v->isDeleted()) {
1762                                 restore = true;
1763                             }
1764                             break;
1765                         case FULL_EVICTION:
1766                             if (v) {
1767                                 if (v->isTempInitialItem() ||
1768                                     (!v->isResident() && !v->isDeleted())) {
1769                                     restore = true;
1770                                 }
1771                             }
1772                             break;
1773                         default:
1774                             throw std::logic_error("Unknown eviction policy");
1775                     }
1776                 }
1777
1778                 if (restore) {
1779                     if (status == ENGINE_SUCCESS) {
1780                         v->unlocked_restoreValue(fetchedValue, vb->ht);
1781                         if (!v->isResident()) {
1782                             throw std::logic_error("EPStore::completeBGFetchMulti: "
1783                                 "storedvalue (which has key " + v->getKey() +
1784                                 ") should be resident after calling restoreValue()");
1785                         }
1786                         if (vb->getState() == vbucket_state_active &&
1787                             v->getExptime() != fetchedValue->getExptime() &&
1788                             v->getCas() == fetchedValue->getCas()) {
1789                             // MB-9306: It is possible that by the time
1790                             // bgfetcher returns, the item may have been
1791                             // updated and queued
1792                             // Hence test the CAS value to be the same first.
1793                             // exptime mutated, schedule it into new checkpoint
1794                             queueDirty(vb, v, &blh, NULL, GenerateBySeqno::Yes,
1795                                                         GenerateCas::No);
1796                         }
1797                     } else if (status == ENGINE_KEY_ENOENT) {
1798                         v->setNonExistent();
1799                         if (eviction_policy == FULL_EVICTION) {
1800                             // For the full eviction, we should notify
1801                             // ENGINE_SUCCESS to the memcached worker thread,
1802                             // so that the worker thread can visit the
1803                             // ep-engine and figure out the correct error
1804                             // code.
1805                             status = ENGINE_SUCCESS;
1806                         }
1807                     } else {
1808                         // underlying kvstore couldn't fetch requested data
1809                         // log returned error and notify TMPFAIL to client
1810                         LOG(EXTENSION_LOG_WARNING,
1811                             "Failed background fetch for vb=%d "
1812                             "key=%s", vbId, key.c_str());
1813                         status = ENGINE_TMPFAIL;
1814                     }
1815                 }
1816             }
1817         } // locked scope ends
1818
1819         if (bgitem->metaDataOnly) {
1820             ++stats.bg_meta_fetched;
1821         } else {
1822             ++stats.bg_fetched;
1823         }
1824
1825         hrtime_t endTime = gethrtime();
1826         updateBGStats(bgitem->initTime, startTime, endTime);
1827         engine.notifyIOComplete(bgitem->cookie, status);
1828     }
1829
1830     LOG(EXTENSION_LOG_DEBUG,
1831         "EP Store completes %" PRIu64 " of batched background fetch "
1832         "for vBucket = %d endTime = %" PRIu64,
1833         uint64_t(fetchedItems.size()), vbId, gethrtime()/1000000);
1834 }
1835
1836 void EventuallyPersistentStore::bgFetch(const const_sized_buffer key,
1837                                         uint16_t vbucket,
1838                                         const void *cookie,
1839                                         bool isMeta) {
1840     if (multiBGFetchEnabled()) {
1841         RCPtr<VBucket> vb = getVBucket(vbucket);
1842         if (!vb) {
1843             throw std::invalid_argument("EPStore::bgFetch: vbucket (which is " +
1844                                         std::to_string(vbucket) +
1845                                         ") is not present in vbMap");
1846         }
1847         KVShard *myShard = vbMap.getShardByVbId(vbucket);
1848
1849         // schedule to the current batch of background fetch of the given
1850         // vbucket
1851         VBucketBGFetchItem * fetchThis = new VBucketBGFetchItem(cookie,
1852                                                                 isMeta);
1853         size_t bgfetch_size = vb->queueBGFetchItem(key, fetchThis,
1854                                                    myShard->getBgFetcher());
1855         myShard->getBgFetcher()->notifyBGEvent();
1856         LOG(EXTENSION_LOG_DEBUG, "Queued a background fetch, now at %" PRIu64,
1857             uint64_t(bgfetch_size));
1858     } else {
1859         bgFetchQueue++;
1860         stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
1861                                             bgFetchQueue.load());
1862         ExecutorPool* iom = ExecutorPool::get();
1863         ExTask task = new SingleBGFetcherTask(&engine, key, vbucket, cookie,
1864                                               isMeta, bgFetchDelay, false);
1865         iom->schedule(task, READER_TASK_IDX);
1866         LOG(EXTENSION_LOG_DEBUG, "Queued a background fetch, now at %" PRIu64,
1867             uint64_t(bgFetchQueue.load()));
1868     }
1869 }
1870
1871 GetValue EventuallyPersistentStore::getInternal(const const_sized_buffer key,
1872                                                 uint16_t vbucket,
1873                                                 const void *cookie,
1874                                                 vbucket_state_t allowedState,
1875                                                 get_options_t options) {
1876
1877     vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1878         vbucket_state_replica : vbucket_state_active;
1879     RCPtr<VBucket> vb = getVBucket(vbucket);
1880     if (!vb) {
1881         ++stats.numNotMyVBuckets;
1882         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1883     }
1884
1885     const bool honorStates = (options & HONOR_STATES);
1886
1887     ReaderLockHolder rlh(vb->getStateLock());
1888     if (honorStates) {
1889         vbucket_state_t vbState = vb->getState();
1890         if (vbState == vbucket_state_dead) {
1891             ++stats.numNotMyVBuckets;
1892             return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1893         } else if (vbState == disallowedState) {
1894             ++stats.numNotMyVBuckets;
1895             return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1896         } else if (vbState == vbucket_state_pending) {
1897             if (vb->addPendingOp(cookie)) {
1898                 return GetValue(NULL, ENGINE_EWOULDBLOCK);
1899             }
1900         }
1901     }
1902
1903     const bool trackReference = (options & TRACK_REFERENCE);
1904
1905     int bucket_num(0);
1906     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1907     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
1908                                      trackReference);
1909     if (v) {
1910         if (v->isDeleted()) {
1911             GetValue rv;
1912             return rv;
1913         }
1914         if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1915             // Delete a temp non-existent item to ensure that
1916             // if the get were issued over an item that doesn't
1917             // exist, then we dont preserve a temp item.
1918             if (options & DELETE_TEMP) {
1919                 vb->ht.unlocked_del(key, bucket_num);
1920             }
1921             GetValue rv;
1922             return rv;
1923         }
1924
1925         // If the value is not resident, wait for it...
1926         if (!v->isResident()) {
1927             if (options & QUEUE_BG_FETCH) {
1928                 bgFetch(key, vbucket, cookie);
1929             }
1930             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
1931                             true, v->getNRUValue());
1932         }
1933
1934         // Should we hide (return -1) for the items' CAS?
1935         const bool hide_cas = (options & HIDE_LOCKED_CAS) &&
1936                               v->isLocked(ep_current_time());
1937         GetValue rv(v->toItem(hide_cas, vbucket), ENGINE_SUCCESS,
1938                     v->getBySeqno(), false, v->getNRUValue());
1939         return rv;
1940     } else {
1941         if (eviction_policy == VALUE_ONLY || diskFlushAll) {
1942             GetValue rv;
1943             return rv;
1944         }
1945
1946         if (vb->maybeKeyExistsInFilter(key)) {
1947             ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1948             if (options & QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
1949                 ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
1950                                            cookie, false);
1951             }
1952             return GetValue(NULL, ec, -1, true);
1953         } else {
1954             // As bloomfilter predicted that item surely doesn't exist
1955             // on disk, return ENONET, for getInternal().
1956             GetValue rv;
1957             return rv;
1958         }
1959     }
1960 }
1961
1962 GetValue EventuallyPersistentStore::getRandomKey() {
1963     VBucketMap::id_type max = vbMap.getSize();
1964
1965     const long start = random() % max;
1966     long curr = start;
1967     Item *itm = NULL;
1968
1969     while (itm == NULL) {
1970         RCPtr<VBucket> vb = getVBucket(curr++);
1971         while (!vb || vb->getState() != vbucket_state_active) {
1972             if (curr == start) {
1973                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1974             }
1975             if (curr == max) {
1976                 curr = 0;
1977             }
1978
1979             vb = getVBucket(curr++);
1980         }
1981
1982         if ((itm = vb->ht.getRandomKey(random())) != NULL) {
1983             GetValue rv(itm, ENGINE_SUCCESS);
1984             return rv;
1985         }
1986
1987         if (curr == max) {
1988             curr = 0;
1989         }
1990
1991         if (curr == start) {
1992             return GetValue(NULL, ENGINE_KEY_ENOENT);
1993         }
1994         // Search next vbucket
1995     }
1996
1997     return GetValue(NULL, ENGINE_KEY_ENOENT);
1998 }
1999
2000
2001 ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
2002                                                         const std::string &key,
2003                                                         uint16_t vbucket,
2004                                                         const void *cookie,
2005                                                         ItemMetaData &metadata,
2006                                                         uint32_t &deleted,
2007                                                         bool trackReferenced)
2008 {
2009     (void) cookie;
2010     RCPtr<VBucket> vb = getVBucket(vbucket);
2011
2012     if (!vb) {
2013         ++stats.numNotMyVBuckets;
2014         return ENGINE_NOT_MY_VBUCKET;
2015     }
2016
2017     ReaderLockHolder rlh(vb->getStateLock());
2018     if (vb->getState() == vbucket_state_dead ||
2019         vb->getState() == vbucket_state_replica) {
2020         ++stats.numNotMyVBuckets;
2021         return ENGINE_NOT_MY_VBUCKET;
2022     }
2023
2024     int bucket_num(0);
2025     deleted = 0;
2026     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2027     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true,
2028                                           trackReferenced);
2029
2030     if (v) {
2031         stats.numOpsGetMeta++;
2032         if (v->isTempInitialItem()) { // Need bg meta fetch.
2033             bgFetch(key, vbucket, cookie, true);
2034             return ENGINE_EWOULDBLOCK;
2035         } else if (v->isTempNonExistentItem()) {
2036             metadata.cas = v->getCas();
2037             return ENGINE_KEY_ENOENT;
2038         } else {
2039             if (v->isTempDeletedItem() || v->isDeleted() ||
2040                 v->isExpired(ep_real_time())) {
2041                 deleted |= GET_META_ITEM_DELETED_FLAG;
2042             }
2043
2044             if (v->isLocked(ep_current_time())) {
2045                 metadata.cas = static_cast<uint64_t>(-1);
2046             } else {
2047                 metadata.cas = v->getCas();
2048             }
2049             metadata.flags = v->getFlags();
2050             metadata.exptime = v->getExptime();
2051             metadata.revSeqno = v->getRevSeqno();
2052             return ENGINE_SUCCESS;
2053         }
2054     } else {
2055         // The key wasn't found. However, this may be because it was previously
2056         // deleted or evicted with the full eviction strategy.
2057         // So, add a temporary item corresponding to the key to the hash table
2058         // and schedule a background fetch for its metadata from the persistent
2059         // store. The item's state will be updated after the fetch completes.
2060         //
2061         // Schedule this bgFetch only if the key is predicted to be may-be
2062         // existent on disk by the bloomfilter.
2063
2064         if (vb->maybeKeyExistsInFilter(key)) {
2065             return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
2066         } else {
2067             stats.numOpsGetMeta++;
2068             return ENGINE_KEY_ENOENT;
2069         }
2070     }
2071 }
2072
2073 ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
2074                                                      const Item &itm,
2075                                                      uint64_t cas,
2076                                                      uint64_t *seqno,
2077                                                      const void *cookie,
2078                                                      bool force,
2079                                                      bool allowExisting,
2080                                                      uint8_t nru,
2081                                                      GenerateBySeqno genBySeqno,
2082                                                      GenerateCas genCas,
2083                                                      ExtendedMetaData *emd,
2084                                                      bool isReplication)
2085 {
2086     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
2087     if (!vb) {
2088         ++stats.numNotMyVBuckets;
2089         return ENGINE_NOT_MY_VBUCKET;
2090     }
2091
2092     ReaderLockHolder rlh(vb->getStateLock());
2093     if (vb->getState() == vbucket_state_dead) {
2094         ++stats.numNotMyVBuckets;
2095         return ENGINE_NOT_MY_VBUCKET;
2096     } else if (vb->getState() == vbucket_state_replica && !force) {
2097         ++stats.numNotMyVBuckets;
2098         return ENGINE_NOT_MY_VBUCKET;
2099     } else if (vb->getState() == vbucket_state_pending && !force) {
2100         if (vb->addPendingOp(cookie)) {
2101             return ENGINE_EWOULDBLOCK;
2102         }
2103     } else if (vb->isTakeoverBackedUp()) {
2104         LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a setWithMeta op"
2105                 ", becuase takeover is lagging", vb->getId());
2106         return ENGINE_TMPFAIL;
2107     }
2108
2109     //check for the incoming item's CAS validity
2110     if (!Item::isValidCas(itm.getCas())) {
2111         return ENGINE_KEY_EEXISTS;
2112     }
2113
2114     int bucket_num(0);
2115     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
2116     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
2117                                           false);
2118
2119     bool maybeKeyExists = true;
2120     if (!force) {
2121         if (v)  {
2122             if (v->isTempInitialItem()) {
2123                 bgFetch(itm.getKey(), itm.getVBucketId(), cookie, true);
2124                 return ENGINE_EWOULDBLOCK;
2125             }
2126
2127             if (!conflictResolver->resolve(*v, itm.getMetaData(), false)) {
2128                 ++stats.numOpsSetMetaResolutionFailed;
2129                 return ENGINE_KEY_EEXISTS;
2130             }
2131         } else {
2132             if (vb->maybeKeyExistsInFilter(itm.getKey())) {
2133                 return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
2134                                              cookie, true, isReplication);
2135             } else {
2136                 maybeKeyExists = false;
2137             }
2138         }
2139     } else {
2140         if (eviction_policy == FULL_EVICTION) {
2141             // Check Bloomfilter's prediction
2142             if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
2143                 maybeKeyExists = false;
2144             }
2145         }
2146     }
2147
2148     if (v && v->isLocked(ep_current_time()) &&
2149         (vb->getState() == vbucket_state_replica ||
2150          vb->getState() == vbucket_state_pending)) {
2151         v->unlock();
2152     }
2153
2154     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
2155                                                 true, eviction_policy, nru,
2156                                                 maybeKeyExists, isReplication);
2157
2158     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2159     switch (mtype) {
2160     case NOMEM:
2161         ret = ENGINE_ENOMEM;
2162         break;
2163     case INVALID_CAS:
2164     case IS_LOCKED:
2165         ret = ENGINE_KEY_EEXISTS;
2166         break;
2167     case INVALID_VBUCKET:
2168         ret = ENGINE_NOT_MY_VBUCKET;
2169         break;
2170     case WAS_DIRTY:
2171     case WAS_CLEAN:
2172         vb->setMaxCasAndTrackDrift(v->getCas());
2173         queueDirty(vb, v, &lh, seqno, genBySeqno, genCas);
2174         break;
2175     case NOT_FOUND:
2176         ret = ENGINE_KEY_ENOENT;
2177         break;
2178     case NEED_BG_FETCH:
2179         {            // CAS operation with non-resident item + full eviction.
2180             if (v) { // temp item is already created. Simply schedule a
2181                 lh.unlock(); // bg fetch job.
2182                 bgFetch(itm.getKey(), vb->getId(), cookie, true);
2183                 return ENGINE_EWOULDBLOCK;
2184             }
2185
2186             ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
2187                                         cookie, true, isReplication);
2188         }
2189     }
2190
2191     return ret;
2192 }
2193
2194 GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
2195                                                     uint16_t vbucket,
2196                                                     const void *cookie,
2197                                                     time_t exptime)
2198 {
2199     RCPtr<VBucket> vb = getVBucket(vbucket);
2200     if (!vb) {
2201         ++stats.numNotMyVBuckets;
2202         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2203     }
2204
2205     ReaderLockHolder rlh(vb->getStateLock());
2206     if (vb->getState() == vbucket_state_dead) {
2207         ++stats.numNotMyVBuckets;
2208         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2209     } else if (vb->getState() == vbucket_state_replica) {
2210         ++stats.numNotMyVBuckets;
2211         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2212     } else if (vb->getState() == vbucket_state_pending) {
2213         if (vb->addPendingOp(cookie)) {
2214             return GetValue(NULL, ENGINE_EWOULDBLOCK);
2215         }
2216     }
2217
2218     int bucket_num(0);
2219     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2220     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2221
2222     if (v) {
2223         if (v->isDeleted() || v->isTempDeletedItem() ||
2224             v->isTempNonExistentItem()) {
2225             GetValue rv;
2226             return rv;
2227         }
2228
2229         if (!v->isResident()) {
2230             bgFetch(key, vbucket, cookie);
2231             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
2232         }
2233         if (v->isLocked(ep_current_time())) {
2234             GetValue rv(NULL, ENGINE_KEY_EEXISTS, 0);
2235             return rv;
2236         }
2237
2238         const bool exptime_mutated = exptime != v->getExptime();
2239         if (exptime_mutated) {
2240             v->markDirty();
2241             v->setExptime(exptime);
2242             v->setRevSeqno(v->getRevSeqno()+1);
2243         }
2244
2245         GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
2246                     ENGINE_SUCCESS, v->getBySeqno());
2247
2248         if (exptime_mutated) {
2249             queueDirty(vb, v, &lh, NULL);
2250         }
2251
2252         return rv;
2253     } else {
2254         if (eviction_policy == VALUE_ONLY) {
2255             GetValue rv;
2256             return rv;
2257         } else {
2258             if (vb->maybeKeyExistsInFilter(key)) {
2259                 ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
2260                                                              key, vb, cookie,
2261                                                              false);
2262                 return GetValue(NULL, ec, -1, true);
2263             } else {
2264                 // As bloomfilter predicted that item surely doesn't exist
2265                 // on disk, return ENOENT for getAndUpdateTtl().
2266                 GetValue rv;
2267                 return rv;
2268             }
2269         }
2270     }
2271 }
2272
2273 ENGINE_ERROR_CODE
2274 EventuallyPersistentStore::statsVKey(const std::string &key,
2275                                      uint16_t vbucket,
2276                                      const void *cookie) {
2277     RCPtr<VBucket> vb = getVBucket(vbucket);
2278     if (!vb) {
2279         return ENGINE_NOT_MY_VBUCKET;
2280     }
2281
2282     int bucket_num(0);
2283     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2284     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2285
2286     if (v) {
2287         if (v->isDeleted() || v->isTempDeletedItem() ||
2288             v->isTempNonExistentItem()) {
2289             return ENGINE_KEY_ENOENT;
2290         }
2291         bgFetchQueue++;
2292         ExecutorPool* iom = ExecutorPool::get();
2293         ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
2294                                            v->getBySeqno(), cookie,
2295                                            bgFetchDelay, false);
2296         iom->schedule(task, READER_TASK_IDX);
2297         return ENGINE_EWOULDBLOCK;
2298     } else {
2299         if (eviction_policy == VALUE_ONLY) {
2300             return ENGINE_KEY_ENOENT;
2301         } else {
2302             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2303                                                         eviction_policy);
2304             switch(rv) {
2305             case ADD_NOMEM:
2306                 return ENGINE_ENOMEM;
2307             case ADD_EXISTS:
2308             case ADD_UNDEL:
2309             case ADD_SUCCESS:
2310             case ADD_TMP_AND_BG_FETCH:
2311                 // Since the hashtable bucket is locked, we shouldn't get here
2312                 throw std::logic_error("EventuallyPersistentStore::statsVKey: "
2313                         "Invalid result from unlocked_addTempItem (" +
2314                         std::to_string(rv) + ")");
2315
2316             case ADD_BG_FETCH:
2317                 {
2318                     ++bgFetchQueue;
2319                     ExecutorPool* iom = ExecutorPool::get();
2320                     ExTask task = new VKeyStatBGFetchTask(&engine, key,
2321                                                           vbucket, -1, cookie,
2322                                                           bgFetchDelay, false);
2323                     iom->schedule(task, READER_TASK_IDX);
2324                 }
2325             }
2326             return ENGINE_EWOULDBLOCK;
2327         }
2328     }
2329 }
2330
2331 void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
2332                                                   std::string &key,
2333                                                   uint16_t vbid,
2334                                                   uint64_t bySeqNum) {
2335     RememberingCallback<GetValue> gcb;
2336
2337     getROUnderlying(vbid)->get(key, vbid, gcb);
2338     gcb.waitForValue();
2339
2340     if (eviction_policy == FULL_EVICTION) {
2341         RCPtr<VBucket> vb = getVBucket(vbid);
2342         if (vb) {
2343             int bucket_num(0);
2344             LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
2345             StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2346             if (v && v->isTempInitialItem()) {
2347                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2348                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
2349                     if (!v->isResident()) {
2350                         throw std::logic_error("EPStore::completeStatsVKey: "
2351                             "storedvalue (which has key " + v->getKey() +
2352                             ") should be resident after calling restoreValue()");
2353                     }
2354                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
2355                     v->setNonExistent();
2356                 } else {
2357                     // underlying kvstore couldn't fetch requested data
2358                     // log returned error and notify TMPFAIL to client
2359                     LOG(EXTENSION_LOG_WARNING,
2360                         "Failed background fetch for vb=%d "
2361                         "seq=%" PRId64 " key=%s", vbid, v->getBySeqno(),
2362                         key.c_str());
2363                 }
2364             }
2365         }
2366     }
2367
2368     if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2369         engine.addLookupResult(cookie, gcb.val.getValue());
2370     } else {
2371         engine.addLookupResult(cookie, NULL);
2372     }
2373
2374     bgFetchQueue--;
2375     engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
2376 }
2377
2378 GetValue EventuallyPersistentStore::getLocked(const std::string &key,
2379                                               uint16_t vbucket,
2380                                               rel_time_t currentTime,
2381                                               uint32_t lockTimeout,
2382                                               const void *cookie) {
2383     RCPtr<VBucket> vb = getVBucket(vbucket);
2384     if (!vb || vb->getState() != vbucket_state_active) {
2385         ++stats.numNotMyVBuckets;
2386         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2387     }
2388
2389     int bucket_num(0);
2390     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2391     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2392
2393     if (v) {
2394         if (v->isDeleted() || v->isTempNonExistentItem() ||
2395             v->isTempDeletedItem()) {
2396             return GetValue(NULL, ENGINE_KEY_ENOENT);
2397         }
2398
2399         // if v is locked return error
2400         if (v->isLocked(currentTime)) {
2401             return GetValue(NULL, ENGINE_TMPFAIL);
2402         }
2403
2404         // If the value is not resident, wait for it...
2405         if (!v->isResident()) {
2406             if (cookie) {
2407                 bgFetch(key, vbucket, cookie);
2408             }
2409             return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
2410         }
2411
2412         // acquire lock and increment cas value
2413         v->lock(currentTime + lockTimeout);
2414
2415         Item *it = v->toItem(false, vbucket);
2416         it->setCas(vb->nextHLCCas());
2417         v->setCas(it->getCas());
2418
2419         return GetValue(it);
2420
2421     } else {
2422         // No value found in the hashtable.
2423         switch (eviction_policy) {
2424         case VALUE_ONLY:
2425             return GetValue(NULL, ENGINE_KEY_ENOENT);
2426
2427         case FULL_EVICTION:
2428             if (vb->maybeKeyExistsInFilter(key)) {
2429                 ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
2430                                                              key, vb, cookie,
2431                                                              false);
2432                 return GetValue(NULL, ec, -1, true);
2433             } else {
2434                 // As bloomfilter predicted that item surely doesn't exist
2435                 // on disk, return ENOENT for getLocked().
2436                 return GetValue(NULL, ENGINE_KEY_ENOENT);
2437             }
2438         default:
2439             throw std::logic_error("Unknown eviction policy");
2440         }
2441     }
2442 }
2443
2444 ENGINE_ERROR_CODE
2445 EventuallyPersistentStore::unlockKey(const std::string &key,
2446                                      uint16_t vbucket,
2447                                      uint64_t cas,
2448                                      rel_time_t currentTime)
2449 {
2450
2451     RCPtr<VBucket> vb = getVBucket(vbucket);
2452     if (!vb || vb->getState() != vbucket_state_active) {
2453         ++stats.numNotMyVBuckets;
2454         return ENGINE_NOT_MY_VBUCKET;
2455     }
2456
2457     int bucket_num(0);
2458     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2459     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2460
2461     if (v) {
2462         if (v->isDeleted() || v->isTempNonExistentItem() ||
2463             v->isTempDeletedItem()) {
2464             return ENGINE_KEY_ENOENT;
2465         }
2466         if (v->isLocked(currentTime)) {
2467             if (v->getCas() == cas) {
2468                 v->unlock();
2469                 return ENGINE_SUCCESS;
2470             }
2471         }
2472         return ENGINE_TMPFAIL;
2473     } else {
2474         if (eviction_policy == VALUE_ONLY) {
2475             return ENGINE_KEY_ENOENT;
2476         } else {
2477             // With the full eviction, an item's lock is automatically
2478             // released when the item is evicted from memory. Therefore,
2479             // we simply return ENGINE_TMPFAIL when we receive unlockKey
2480             // for an item that is not in memocy cache. Note that we don't
2481             // spawn any bg fetch job to figure out if an item actually
2482             // exists in disk or not.
2483             return ENGINE_TMPFAIL;
2484         }
2485     }
2486 }
2487
2488
2489 ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
2490                                             const std::string &key,
2491                                             uint16_t vbucket,
2492                                             const void *cookie,
2493                                             struct key_stats &kstats,
2494                                             bool bgfetch,
2495                                             bool wantsDeleted)
2496 {
2497     RCPtr<VBucket> vb = getVBucket(vbucket);
2498     if (!vb) {
2499         return ENGINE_NOT_MY_VBUCKET;
2500     }
2501
2502     int bucket_num(0);
2503     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2504     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2505
2506     if (v) {
2507         if ((v->isDeleted() && !wantsDeleted) ||
2508             v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2509             return ENGINE_KEY_ENOENT;
2510         }
2511         if (eviction_policy == FULL_EVICTION &&
2512             v->isTempInitialItem() && bgfetch) {
2513             lh.unlock();
2514             bgFetch(key, vbucket, cookie, true);
2515             return ENGINE_EWOULDBLOCK;
2516         }
2517         kstats.logically_deleted = v->isDeleted();
2518         kstats.dirty = v->isDirty();
2519         kstats.exptime = v->getExptime();
2520         kstats.flags = v->getFlags();
2521         kstats.cas = v->getCas();
2522         kstats.vb_state = vb->getState();
2523         return ENGINE_SUCCESS;
2524     } else {
2525         if (eviction_policy == VALUE_ONLY) {
2526             return ENGINE_KEY_ENOENT;
2527         } else {
2528             if (bgfetch && vb->maybeKeyExistsInFilter(key)) {
2529                 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2530                                              cookie, true);
2531             } else {
2532                 // If bgFetch were false, or bloomfilter predicted that
2533                 // item surely doesn't exist on disk, return ENOENT for
2534                 // getKeyStats().
2535                 return ENGINE_KEY_ENOENT;
2536             }
2537         }
2538     }
2539 }
2540
2541 std::string EventuallyPersistentStore::validateKey(const std::string &key,
2542                                                    uint16_t vbucket,
2543                                                    Item &diskItem) {
2544     int bucket_num(0);
2545     RCPtr<VBucket> vb = getVBucket(vbucket);
2546     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2547     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
2548                                      false, true);
2549
2550     if (v) {
2551         if (v->isDeleted() || v->isTempNonExistentItem() ||
2552             v->isTempDeletedItem()) {
2553             return "item_deleted";
2554         }
2555
2556         if (diskItem.getFlags() != v->getFlags()) {
2557             return "flags_mismatch";
2558         } else if (v->isResident() && memcmp(diskItem.getData(),
2559                                              v->getValue()->getData(),
2560                                              diskItem.getNBytes())) {
2561             return "data_mismatch";
2562         } else {
2563             return "valid";
2564         }
2565     } else {
2566         return "item_deleted";
2567     }
2568
2569 }
2570
2571 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
2572                                                         uint64_t *cas,
2573                                                         uint16_t vbucket,
2574                                                         const void *cookie,
2575                                                         bool force,
2576                                                         ItemMetaData *itemMeta,
2577                                                         mutation_descr_t
2578                                                         *mutInfo)
2579 {
2580     RCPtr<VBucket> vb = getVBucket(vbucket);
2581     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2582         ++stats.numNotMyVBuckets;
2583         return ENGINE_NOT_MY_VBUCKET;
2584     } else if (vb->getState() == vbucket_state_replica && !force) {
2585         ++stats.numNotMyVBuckets;
2586         return ENGINE_NOT_MY_VBUCKET;
2587     } else if (vb->getState() == vbucket_state_pending && !force) {
2588         if (vb->addPendingOp(cookie)) {
2589             return ENGINE_EWOULDBLOCK;
2590         }
2591     } else if (vb->isTakeoverBackedUp()) {
2592         LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a delete op"
2593                 ", becuase takeover is lagging", vb->getId());
2594         return ENGINE_TMPFAIL;
2595     }
2596
2597     int bucket_num(0);
2598     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2599     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2600     if (!v || v->isDeleted() || v->isTempItem()) {
2601         if (eviction_policy == VALUE_ONLY) {
2602             return ENGINE_KEY_ENOENT;
2603         } else { // Full eviction.
2604             if (!force) {
2605                 if (!v) { // Item might be evicted from cache.
2606                     if (vb->maybeKeyExistsInFilter(key)) {
2607                         return addTempItemForBgFetch(lh, bucket_num, key, vb,
2608                                                      cookie, true);
2609                     } else {
2610                         // As bloomfilter predicted that item surely doesn't
2611                         // exist on disk, return ENOENT for deleteItem().
2612                         return ENGINE_KEY_ENOENT;
2613                     }
2614                 } else if (v->isTempInitialItem()) {
2615                     lh.unlock();
2616                     bgFetch(key, vbucket, cookie, true);
2617                     return ENGINE_EWOULDBLOCK;
2618                 } else { // Non-existent or deleted key.
2619                     if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2620                         // Delete a temp non-existent item to ensure that
2621                         // if a delete were issued over an item that doesn't
2622                         // exist, then we don't preserve a temp item.
2623                         vb->ht.unlocked_del(key, bucket_num);
2624                     }
2625                     return ENGINE_KEY_ENOENT;
2626                 }
2627             } else {
2628                 if (!v) { // Item might be evicted from cache.
2629                     // Create a temp item and delete it below as it is a
2630                     // force deletion, only if bloomfilter predicts that
2631                     // item may exist on disk.
2632                     if (vb->maybeKeyExistsInFilter(key)) {
2633                         add_type_t rv = vb->ht.unlocked_addTempItem(
2634                                                                bucket_num,
2635                                                                key,
2636                                                                eviction_policy);
2637                         if (rv == ADD_NOMEM) {
2638                             return ENGINE_ENOMEM;
2639                         }
2640                         v = vb->ht.unlocked_find(key, bucket_num, true, false);
2641                         v->setDeleted();
2642                     } else {
2643                         return ENGINE_KEY_ENOENT;
2644                     }
2645                 } else if (v->isTempInitialItem()) {
2646                     v->setDeleted();
2647                 } else { // Non-existent or deleted key.
2648                     if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2649                         // Delete a temp non-existent item to ensure that
2650                         // if a delete were issued over an item that doesn't
2651                         // exist, then we don't preserve a temp item.
2652                         vb->ht.unlocked_del(key, bucket_num);
2653                     }
2654                     return ENGINE_KEY_ENOENT;
2655                 }
2656             }
2657         }
2658     }
2659
2660     if (v && v->isLocked(ep_current_time()) &&
2661         (vb->getState() == vbucket_state_replica ||
2662          vb->getState() == vbucket_state_pending)) {
2663         v->unlock();
2664     }
2665     mutation_type_t delrv;
2666     delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
2667     if (v && (delrv == NOT_FOUND || delrv == WAS_DIRTY || delrv == WAS_CLEAN)) {
2668         if (itemMeta != nullptr) {
2669             itemMeta->revSeqno = v->getRevSeqno();
2670             itemMeta->cas = v->getCas();
2671             itemMeta->flags = v->getFlags();
2672             itemMeta->exptime = v->getExptime();
2673         }
2674     }
2675
2676     uint64_t seqno = 0;
2677     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2678     switch (delrv) {
2679     case NOMEM:
2680         ret = ENGINE_ENOMEM;
2681         break;
2682     case INVALID_VBUCKET:
2683         ret = ENGINE_NOT_MY_VBUCKET;
2684         break;
2685     case INVALID_CAS:
2686         ret = ENGINE_KEY_EEXISTS;
2687         break;
2688     case IS_LOCKED:
2689         ret = ENGINE_TMPFAIL;
2690         break;
2691     case NOT_FOUND:
2692         ret = ENGINE_KEY_ENOENT;
2693     case WAS_CLEAN:
2694     case WAS_DIRTY:
2695         if (v) {
2696             // Keep lh as we need to do v->getCas
2697             queueDirty(vb, v, nullptr, &seqno);
2698             *cas = v->getCas();
2699         }
2700
2701         if (delrv != NOT_FOUND) {
2702             mutInfo->seqno = seqno;
2703             mutInfo->vbucket_uuid = vb->failovers->getLatestUUID();
2704             if (itemMeta != nullptr) {
2705                 itemMeta->cas = v->getCas();
2706             }
2707         }
2708         break;
2709     case NEED_BG_FETCH:
2710         // We already figured out if a bg fetch is requred for a full-evicted
2711         // item above.
2712         throw std::logic_error("EventuallyPersistentStore::deleteItem: "
2713                 "Unexpected NEEDS_BG_FETCH from unlocked_softDelete");
2714     }
2715     return ret;
2716 }
2717
2718 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
2719                                                      const std::string &key,
2720                                                      uint64_t *cas,
2721                                                      uint64_t *seqno,
2722                                                      uint16_t vbucket,
2723                                                      const void *cookie,
2724                                                      bool force,
2725                                                      ItemMetaData *itemMeta,
2726                                                      bool tapBackfill,
2727                                                      GenerateBySeqno genBySeqno,
2728                                                      GenerateCas generateCas,
2729                                                      uint64_t bySeqno,
2730                                                      ExtendedMetaData *emd,
2731                                                      bool isReplication)
2732 {
2733     RCPtr<VBucket> vb = getVBucket(vbucket);
2734
2735     if (!vb) {
2736         ++stats.numNotMyVBuckets;
2737         return ENGINE_NOT_MY_VBUCKET;
2738     }
2739
2740     ReaderLockHolder rlh(vb->getStateLock());
2741     if (vb->getState() == vbucket_state_dead) {
2742         ++stats.numNotMyVBuckets;
2743         return ENGINE_NOT_MY_VBUCKET;
2744     } else if (vb->getState() == vbucket_state_replica && !force) {
2745         ++stats.numNotMyVBuckets;
2746         return ENGINE_NOT_MY_VBUCKET;
2747     } else if (vb->getState() == vbucket_state_pending && !force) {
2748         if (vb->addPendingOp(cookie)) {
2749             return ENGINE_EWOULDBLOCK;
2750         }
2751     } else if (vb->isTakeoverBackedUp()) {
2752         LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a deleteWithMeta "
2753                 "op, because takeover is lagging", vb->getId());
2754         return ENGINE_TMPFAIL;
2755     }
2756
2757     //check for the incoming item's CAS validity
2758     if (!Item::isValidCas(itemMeta->cas)) {
2759         return ENGINE_KEY_EEXISTS;
2760     }
2761
2762     int bucket_num(0);
2763     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2764     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2765     if (!force) { // Need conflict resolution.
2766         if (v)  {
2767             if (v->isTempInitialItem()) {
2768                 bgFetch(key, vbucket, cookie, true);
2769                 return ENGINE_EWOULDBLOCK;
2770             }
2771
2772             if (!conflictResolver->resolve(*v, *itemMeta, true)) {
2773                 ++stats.numOpsDelMetaResolutionFailed;
2774                 return ENGINE_KEY_EEXISTS;
2775             }
2776         } else {
2777             // Item is 1) deleted or not existent in the value eviction case OR
2778             // 2) deleted or evicted in the full eviction.
2779             if (vb->maybeKeyExistsInFilter(key)) {
2780                 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2781                                              cookie, true, isReplication);
2782             } else {
2783                 // Even though bloomfilter predicted that item doesn't exist
2784                 // on disk, we must put this delete on disk if the cas is valid.
2785                 add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2786                                                             eviction_policy,
2787                                                             isReplication);
2788                 if (rv == ADD_NOMEM) {
2789                     return ENGINE_ENOMEM;
2790                 }
2791                 v = vb->ht.unlocked_find(key, bucket_num, true, false);
2792                 v->setDeleted();
2793             }
2794         }
2795     } else {
2796         if (!v) {
2797             // We should always try to persist a delete here.
2798             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2799                                                         eviction_policy,
2800                                                         isReplication);
2801             if (rv == ADD_NOMEM) {
2802                 return ENGINE_ENOMEM;
2803             }
2804             v = vb->ht.unlocked_find(key, bucket_num, true, false);
2805             v->setDeleted();
2806             v->setCas(*cas);
2807         } else if (v->isTempInitialItem()) {
2808             v->setDeleted();
2809             v->setCas(*cas);
2810         }
2811     }
2812
2813     if (v && v->isLocked(ep_current_time()) &&
2814         (vb->getState() == vbucket_state_replica ||
2815          vb->getState() == vbucket_state_pending)) {
2816         v->unlock();
2817     }
2818     mutation_type_t delrv;
2819     delrv = vb->ht.unlocked_softDelete(v, *cas, *itemMeta,
2820                                        eviction_policy, true);
2821     *cas = v ? v->getCas() : 0;
2822
2823     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2824     switch (delrv) {
2825     case NOMEM:
2826         ret = ENGINE_ENOMEM;
2827         break;
2828     case INVALID_VBUCKET:
2829         ret = ENGINE_NOT_MY_VBUCKET;
2830         break;
2831     case INVALID_CAS:
2832         ret = ENGINE_KEY_EEXISTS;
2833         break;
2834     case IS_LOCKED:
2835         ret = ENGINE_TMPFAIL;
2836         break;
2837     case NOT_FOUND:
2838         ret = ENGINE_KEY_ENOENT;
2839         break;
2840     case WAS_DIRTY:
2841     case WAS_CLEAN:
2842         if (genBySeqno == GenerateBySeqno::No) {
2843             v->setBySeqno(bySeqno);
2844         }
2845
2846         vb->setMaxCasAndTrackDrift(v->getCas());
2847
2848         if (tapBackfill) {
2849             tapQueueDirty(*vb, v, lh, seqno, genBySeqno);
2850         } else {
2851             queueDirty(vb, v, &lh, seqno, genBySeqno, generateCas);
2852         }
2853         break;
2854     case NEED_BG_FETCH:
2855         lh.unlock();
2856         bgFetch(key, vbucket, cookie, true);
2857         ret = ENGINE_EWOULDBLOCK;
2858     }
2859
2860     return ret;
2861 }
2862
2863 void EventuallyPersistentStore::reset() {
2864     auto buckets = vbMap.getBuckets();
2865     for (auto vbid : buckets) {
2866         RCPtr<VBucket> vb = getVBucket(vbid);
2867         if (vb) {
2868             LockHolder lh(vb_mutexes[vb->getId()]);
2869             vb->ht.clear();
2870             vb->checkpointManager.clear(vb->getState());
2871             vb->resetStats();
2872             vb->setPersistedSnapshot(0, 0);
2873         }
2874     }
2875
2876     ++stats.diskQueueSize;
2877     bool inverse = true;
2878     flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, false);
2879     // Waking up (notifying) one flusher is good enough for diskFlushAll
2880     vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
2881 }
2882
2883 /**
2884  * Callback invoked after persisting an item from memory to disk.
2885  *
2886  * This class exists to create a closure around a few variables within
2887  * EventuallyPersistentStore::flushOne so that an object can be
2888  * requeued in case of failure to store in the underlying layer.
2889  */
2890 class PersistenceCallback : public Callback<mutation_result>,
2891                             public Callback<int> {
2892 public:
2893
2894     PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
2895                         EventuallyPersistentStore& st, EPStats& s, uint64_t c)
2896         : queuedItem(qi), vbucket(vb), store(st), stats(s), cas(c) {
2897         if (!vb) {
2898             throw std::invalid_argument("PersistenceCallback(): vb is NULL");
2899         }
2900     }
2901
2902     // This callback is invoked for set only.
2903     void callback(mutation_result &value) {
2904         if (value.first == 1) {
2905             int bucket_num(0);
2906             LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2907                                                         &bucket_num);
2908             StoredValue *v = store.fetchValidValue(vbucket,
2909                                                    queuedItem->getKey(),
2910                                                    bucket_num, true, false);
2911             if (v) {
2912                 if (v->getCas() == cas) {
2913                     // mark this item clean only if current and stored cas
2914                     // value match
2915                     v->markClean();
2916                 }
2917                 if (v->isNewCacheItem()) {
2918                     if (value.second) {
2919                         // Insert in value-only or full eviction mode.
2920                         ++vbucket->opsCreate;
2921                         vbucket->incrMetaDataDisk(*queuedItem);
2922                     } else { // Update in full eviction mode.
2923                         vbucket->ht.decrNumTotalItems();
2924                         ++vbucket->opsUpdate;
2925                     }
2926                     v->setNewCacheItem(false);
2927                 } else { // Update in value-only or full eviction mode.
2928                     ++vbucket->opsUpdate;
2929                 }
2930             }
2931
2932             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2933             stats.decrDiskQueueSize(1);
2934             stats.totalPersisted++;
2935         } else {
2936             // If the return was 0 here, we're in a bad state because
2937             // we do not know the rowid of this object.
2938             if (value.first == 0) {
2939                 int bucket_num(0);
2940                 LockHolder lh = vbucket->ht.getLockedBucket(
2941                                            queuedItem->getKey(), &bucket_num);
2942                 StoredValue *v = store.fetchValidValue(vbucket,
2943                                                        queuedItem->getKey(),
2944                                                        bucket_num, true,
2945                                                        false);
2946                 if (v) {
2947                     std::stringstream ss;
2948                     ss << "Persisting ``" << queuedItem->getKey() << "'' on vb"
2949                        << queuedItem->getVBucketId() << " (rowid="
2950                        << v->getBySeqno() << ") returned 0 updates\n";
2951                     LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2952                 } else {
2953                     LOG(EXTENSION_LOG_WARNING,
2954                         "Error persisting now missing ``%s'' from vb%d",
2955                         queuedItem->getKey().c_str(),
2956                         queuedItem->getVBucketId());
2957                 }
2958
2959                 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2960                 stats.decrDiskQueueSize(1);
2961             } else {
2962                 std::stringstream ss;
2963                 ss <<
2964                 "Fatal error in persisting SET ``" <<
2965                 queuedItem->getKey() << "'' on vb "
2966                    << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2967                 LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2968                 redirty();
2969             }
2970         }
2971     }
2972
2973     // This callback is invoked for deletions only.
2974     //
2975     // The boolean indicates whether the underlying storage
2976     // successfully deleted the item.
2977     void callback(int &value) {
2978         // > 1 would be bad.  We were only trying to delete one row.
2979         if (value > 1) {
2980             throw std::logic_error("PersistenceCallback::callback: value "
2981                     "(which is " + std::to_string(value) +
2982                     ") should be <= 1 for deletions");
2983         }
2984         // -1 means fail
2985         // 1 means we deleted one row
2986         // 0 means we did not delete a row, but did not fail (did not exist)
2987         if (value >= 0) {
2988             // We have successfully removed an item from the disk, we
2989             // may now remove it from the hash table.
2990             int bucket_num(0);
2991             LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2992                                                         &bucket_num);
2993             StoredValue *v = store.fetchValidValue(vbucket,
2994                                                    queuedItem->getKey(),
2995                                                    bucket_num, true, false);
2996             // Delete the item in the hash table iff:
2997             //  1. Item is existent in hashtable, and deleted flag is true
2998             //  2. rev seqno of queued item matches rev seqno of hash table item
2999             if (v && v->isDeleted() &&
3000                 (queuedItem->getRevSeqno() == v->getRevSeqno())) {
3001                 bool newCacheItem = v->isNewCacheItem();
3002                 bool deleted = vbucket->ht.unlocked_del(queuedItem->getKey(),
3003                                                         bucket_num);
3004                 if (!deleted) {
3005                     throw std::logic_error("PersistenceCallback:callback: "
3006                             "Failed to delete key '" + queuedItem->getKey() +
3007                             "' from bucket " + std::to_string(bucket_num));
3008                 }
3009                 if (newCacheItem && value > 0) {
3010                     // Need to decrement the item counter again for an item that
3011                     // exists on DB file, but not in memory (i.e., full eviction),
3012                     // because we created the temp item in memory and incremented
3013                     // the item counter when a deletion is pushed in the queue.
3014                     vbucket->ht.decrNumTotalItems();
3015                 }
3016
3017                 /**
3018                  * Deleted items are to be added to the bloomfilter,
3019                  * in either eviction policy.
3020                  */
3021                 vbucket->addToFilter(queuedItem->getKey());
3022             }
3023
3024             if (value > 0) {
3025                 ++stats.totalPersisted;
3026                 ++vbucket->opsDelete;
3027             }
3028             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
3029             stats.decrDiskQueueSize(1);
3030             vbucket->decrMetaDataDisk(*queuedItem);
3031         } else {
3032             std::stringstream ss;
3033             ss << "Fatal error in persisting DELETE ``" <<
3034             queuedItem->getKey() << "'' on vb "
3035                << queuedItem->getVBucketId() << "!!! Requeue it...\n";
3036             LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
3037             redirty();
3038         }
3039     }
3040
3041 private:
3042
3043     void redirty() {
3044         if (store.vbMap.isBucketDeletion(vbucket->getId())) {
3045             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
3046             stats.decrDiskQueueSize(1);
3047             return;
3048         }
3049         ++stats.flushFailed;
3050         store.invokeOnLockedStoredValue(queuedItem->getKey(),
3051                                          queuedItem->getVBucketId(),
3052                                          &StoredValue::reDirty);
3053         vbucket->rejectQueue.push(queuedItem);
3054         ++vbucket->opsReject;
3055     }
3056
3057     const queued_item queuedItem;
3058     RCPtr<VBucket> vbucket;
3059     EventuallyPersistentStore& store;
3060     EPStats& stats;
3061     uint64_t cas;
3062     DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
3063 };
3064
3065 bool EventuallyPersistentStore::scheduleFlushAllTask(const void* cookie,
3066                                                      time_t when) {
3067     bool inverse = false;
3068     if (diskFlushAll.compare_exchange_strong(inverse, true)) {
3069         flushAllTaskCtx.cookie = cookie;
3070         flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
3071         ExTask task = new FlushAllTask(&engine, static_cast<double>(when));
3072         ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
3073         return true;
3074     } else {
3075         return false;
3076     }
3077 }
3078
3079 void EventuallyPersistentStore::setFlushAllComplete() {
3080     // Notify memcached about flushAll task completion, and
3081     // set diskFlushall flag to false
3082     if (flushAllTaskCtx.cookie) {
3083         engine.notifyIOComplete(flushAllTaskCtx.cookie, ENGINE_SUCCESS);
3084     }
3085     bool inverse = false;
3086     flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
3087     inverse = true;
3088     diskFlushAll.compare_exchange_strong(inverse, false);
3089 }
3090
3091 void EventuallyPersistentStore::flushOneDeleteAll() {
3092     for (VBucketMap::id_type i = 0; i < vbMap.getSize(); ++i) {
3093         RCPtr<VBucket> vb = getVBucket(i);
3094         // Reset the vBucket if it's non-null and not already in the middle of
3095         // being created / destroyed.
3096         if (vb &&
3097             !(vbMap.isBucketCreation(i) || vbMap.isBucketDeletion(i))) {
3098             LockHolder lh(vb_mutexes[vb->getId()]);
3099             getRWUnderlying(vb->getId())->reset(i);
3100         }
3101     }
3102
3103     stats.decrDiskQueueSize(1);
3104     setFlushAllComplete();
3105 }
3106
3107 int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
3108     KVShard *shard = vbMap.getShardByVbId(vbid);
3109     if (diskFlushAll && !flushAllTaskCtx.delayFlushAll) {
3110         if (shard->getId() == EP_PRIMARY_SHARD) {
3111             flushOneDeleteAll();
3112         } else {
3113             // disk flush is pending just return
3114             return 0;
3115         }
3116     }
3117
3118     int items_flushed = 0;
3119     const rel_time_t flush_start = ep_current_time();
3120
3121     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3122     if (vb) {
3123         LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
3124         if (!lh.islocked()) { // Try another bucket if this one is locked
3125             return RETRY_FLUSH_VBUCKET; // to avoid blocking flusher
3126         }
3127
3128         std::vector<queued_item> items;
3129         KVStore *rwUnderlying = getRWUnderlying(vbid);
3130
3131         while (!vb->rejectQueue.empty()) {
3132             items.push_back(vb->rejectQueue.front());
3133             vb->rejectQueue.pop();
3134         }
3135
3136         // Append any 'backfill' items (mutations added by a TAP stream).
3137         vb->getBackfillItems(items);
3138
3139         // Append all items outstanding for the persistence cursor.
3140         snapshot_range_t range;
3141         range = vb->checkpointManager.getAllItemsForCursor(
3142                 CheckpointManager::pCursorName, items);
3143
3144         if (!items.empty()) {
3145             while (!rwUnderlying->begin()) {
3146                 ++stats.beginFailed;
3147                 LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
3148                     "Retry in 1 sec ...");
3149                 sleep(1);
3150             }
3151             rwUnderlying->optimizeWrites(items);
3152
3153             Item *prev = NULL;
3154             auto vbstate = vb->getVBucketState();
3155             uint64_t maxSeqno = 0;
3156             range.start = std::max(range.start, vbstate.lastSnapStart);
3157
3158             bool mustCheckpointVBState = false;
3159             std::list<PersistenceCallback*>& pcbs = rwUnderlying->getPersistenceCbList();
3160
3161             for (const auto& item : items) {
3162
3163                 if (!item->shouldPersist()) {
3164                     continue;
3165                 }
3166
3167                 if (item->getOperation() == queue_op::set_vbucket_state) {
3168                     // No actual item explicitly persisted to (this op exists
3169                     // to ensure a commit occurs with the current vbstate);
3170                     // flag that we must trigger a snapshot even if there are
3171                     // no 'real' items in the checkpoint.
3172                     mustCheckpointVBState = true;
3173
3174                     // Update queuing stats how this item has logically been
3175                     // processed.
3176                     stats.decrDiskQueueSize(1);
3177                     vb->doStatsForFlushing(*item, item->size());
3178
3179                 } else if (!prev || prev->getKey() != item->getKey()) {
3180                     prev = item.get();
3181                     ++items_flushed;
3182                     PersistenceCallback *cb = flushOneDelOrSet(item, vb);
3183                     if (cb) {
3184                         pcbs.push_back(cb);
3185                     }
3186
3187                     maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
3188                     vbstate.maxCas = std::max(vbstate.maxCas, item->getCas());
3189                     if (item->isDeleted()) {
3190                         vbstate.maxDeletedSeqno =
3191                                 std::max(vbstate.maxDeletedSeqno,
3192                                          item->getRevSeqno());
3193                     }
3194                     ++stats.flusher_todo;
3195
3196                 } else {
3197                     // Item is the same key as the previous[1] one - don't need
3198                     // to flush to disk.
3199                     // [1] Previous here really means 'next' - optimizeWrites()
3200                     //     above has actually re-ordered items such that items
3201                     //     with the same key are ordered from high->low seqno.
3202                     //     This means we only write the highest (i.e. newest)
3203                     //     item for a given key, and discard any duplicate,
3204                     //     older items.
3205                     stats.decrDiskQueueSize(1);
3206                     vb->doStatsForFlushing(*item, item->size());
3207                 }
3208             }
3209
3210
3211             {
3212                 ReaderLockHolder rlh(vb->getStateLock());
3213                 if (vb->getState() == vbucket_state_active) {
3214                     if (maxSeqno) {
3215                         range.start = maxSeqno;
3216                         range.end = maxSeqno;
3217                     }
3218                 }
3219
3220                 // Update VBstate based on the changes we have just made,
3221                 // then tell the rwUnderlying the 'new' state
3222                 // (which will persisted as part of the commit() below).
3223                 vbstate.lastSnapStart = range.start;
3224                 vbstate.lastSnapEnd = range.end;
3225
3226                 // Do we need to trigger a persist of the state?
3227                 // If there are no "real" items to flush, and we encountered
3228                 // a set_vbucket_state meta-item.
3229                 const bool persist = (items_flushed == 0) && mustCheckpointVBState;
3230
3231                 KVStatsCallback kvcb(this);
3232                 if (rwUnderlying->snapshotVBucket(vb->getId(), vbstate,
3233                                                   &kvcb, persist) != true) {
3234                     return RETRY_FLUSH_VBUCKET;
3235                 }
3236
3237                 if (vbMap.setBucketCreation(vbid, false)) {
3238                     LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
3239                 }
3240             }
3241
3242             // Commit all mutations to disk if there is a non-zero number
3243             // of items to flush, and the commit interval is zero.
3244             if ((items_flushed > 0) &&
3245                 (decrCommitInterval(shard->getId()) == 0)) {
3246
3247                 commit(shard->getId());
3248
3249                 // Now the commit is complete, vBucket file must exist.
3250                 if (vbMap.setBucketCreation(vbid, false)) {
3251                     LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
3252                 }
3253             }
3254
3255             hrtime_t end = gethrtime();
3256             uint64_t trans_time = (end - flush_start) / 1000000;
3257
3258             lastTransTimePerItem.store((items_flushed == 0) ? 0 :
3259                                        static_cast<double>(trans_time) /
3260                                        static_cast<double>(items_flushed));
3261             stats.cumulativeFlushTime.fetch_add(ep_current_time()
3262                                                 - flush_start);
3263             stats.flusher_todo.store(0);
3264             stats.totalPersistVBState++;
3265
3266             if (vb->rejectQueue.empty()) {
3267                 vb->setPersistedSnapshot(range.start, range.end);
3268                 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
3269                 if (highSeqno > 0 &&
3270                     highSeqno != vbMap.getPersistenceSeqno(vbid)) {
3271                     vbMap.setPersistenceSeqno(vbid, highSeqno);
3272                 }
3273             }
3274         }
3275
3276         rwUnderlying->pendingTasks();
3277
3278         if (vb->checkpointManager.getNumCheckpoints() > 1) {
3279             wakeUpCheckpointRemover();
3280         }
3281
3282         if (vb->rejectQueue.empty()) {
3283             vb->checkpointManager.itemsPersisted();
3284             uint64_t seqno = vbMap.getPersistenceSeqno(vbid);
3285             uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
3286             vb->notifyOnPersistence(engine, seqno, true);
3287             vb->notifyOnPersistence(engine, chkid, false);
3288             if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
3289                 vbMap.setPersistenceCheckpointId(vbid, chkid);
3290             }
3291         } else {
3292             return RETRY_FLUSH_VBUCKET;
3293         }
3294     }
3295
3296     return items_flushed;
3297 }
3298
3299 void EventuallyPersistentStore::commit(uint16_t shardId) {
3300     KVStore *rwUnderlying = getRWUnderlyingByShard(shardId);
3301     std::list<PersistenceCallback *>& pcbs = rwUnderlying->getPersistenceCbList();
3302     BlockTimer timer(&stats.diskCommitHisto, "disk_commit", stats.timingLog);
3303     hrtime_t commit_start = gethrtime();
3304
3305     KVStatsCallback cb(this);
3306     while (!rwUnderlying->commit(&cb)) {
3307         ++stats.commitFailed;
3308         LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
3309             "1 sec...\n");
3310         sleep(1);
3311     }
3312
3313     while (!pcbs.empty()) {
3314          delete pcbs.front();
3315          pcbs.pop_front();
3316     }
3317
3318     ++stats.flusherCommits;
3319     hrtime_t commit_end = gethrtime();
3320     uint64_t commit_time = (commit_end - commit_start) / 1000000;
3321     stats.commit_time.store(commit_time);
3322     stats.cumulativeCommitTime.fetch_add(commit_time);
3323 }
3324
3325 PersistenceCallback*
3326 EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
3327                                             RCPtr<VBucket> &vb) {
3328
3329     if (!vb) {
3330         stats.decrDiskQueueSize(1);
3331         return NULL;
3332     }
3333
3334     int64_t bySeqno = qi->getBySeqno();
3335     bool deleted = qi->isDeleted();
3336     rel_time_t queued(qi->getQueuedTime());
3337
3338     int dirtyAge = ep_current_time() - queued;
3339     stats.dirtyAgeHisto.add(dirtyAge * 1000000);
3340     stats.dirtyAge.store(dirtyAge);
3341     stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
3342                                          stats.dirtyAgeHighWat.load()));
3343
3344     // Wait until the VB is deleted before writing
3345     if (vbMap.isBucketDeletion(qi->getVBucketId())) {
3346         vb->rejectQueue.push(qi);
3347         ++vb->opsReject;
3348         return NULL;
3349     }
3350
3351     KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
3352     if (!deleted) {
3353         // TODO: Need to separate disk_insert from disk_update because
3354         // bySeqno doesn't give us that information.
3355         BlockTimer timer(bySeqno == -1 ?
3356                          &stats.diskInsertHisto : &stats.diskUpdateHisto,
3357                          bySeqno == -1 ? "disk_insert" : "disk_update",
3358                          stats.timingLog);