09449aed632d3a9fa2638aafdcdde5cd347f2608
[ep-engine.git] / src / ep.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <string.h>
21 #include <time.h>
22
23 #include <fstream>
24 #include <functional>
25 #include <iostream>
26 #include <map>
27 #include <sstream>
28 #include <string>
29 #include <utility>
30 #include <vector>
31
32 #include "access_scanner.h"
33 #include "checkpoint_remover.h"
34 #include "conflict_resolution.h"
35 #include "defragmenter.h"
36 #include "ep.h"
37 #include "ep_engine.h"
38 #include "failover-table.h"
39 #include "flusher.h"
40 #include "htresizer.h"
41 #include "kvshard.h"
42 #include "kvstore.h"
43 #include "locks.h"
44 #include "mutation_log.h"
45 #include "warmup.h"
46 #include "connmap.h"
47 #include "tapthrottle.h"
48
49 class StatsValueChangeListener : public ValueChangedListener {
50 public:
51     StatsValueChangeListener(EPStats &st, EventuallyPersistentStore &str)
52         : stats(st), store(str) {
53         // EMPTY
54     }
55
56     virtual void sizeValueChanged(const std::string &key, size_t value) {
57         if (key.compare("max_size") == 0) {
58             stats.setMaxDataSize(value);
59             store.getEPEngine().getDcpConnMap(). \
60                                      updateMaxActiveSnoozingBackfills(value);
61             size_t low_wat = static_cast<size_t>
62                              (static_cast<double>(value) * 0.75);
63             size_t high_wat = static_cast<size_t>(
64                               static_cast<double>(value) * 0.85);
65             stats.mem_low_wat.store(low_wat);
66             stats.mem_high_wat.store(high_wat);
67         } else if (key.compare("mem_low_wat") == 0) {
68             stats.mem_low_wat.store(value);
69         } else if (key.compare("mem_high_wat") == 0) {
70             stats.mem_high_wat.store(value);
71         } else if (key.compare("tap_throttle_threshold") == 0) {
72             stats.tapThrottleThreshold.store(
73                                           static_cast<double>(value) / 100.0);
74         } else if (key.compare("warmup_min_memory_threshold") == 0) {
75             stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
76         } else if (key.compare("warmup_min_items_threshold") == 0) {
77             stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
78         } else {
79             LOG(EXTENSION_LOG_WARNING,
80                 "Failed to change value for unknown variable, %s\n",
81                 key.c_str());
82         }
83     }
84
85 private:
86     EPStats &stats;
87     EventuallyPersistentStore &store;
88 };
89
90 /**
91  * A configuration value changed listener that responds to ep-engine
92  * parameter changes by invoking engine-specific methods on
93  * configuration change events.
94  */
95 class EPStoreValueChangeListener : public ValueChangedListener {
96 public:
97     EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
98     }
99
100     virtual void sizeValueChanged(const std::string &key, size_t value) {
101         if (key.compare("bg_fetch_delay") == 0) {
102             store.setBGFetchDelay(static_cast<uint32_t>(value));
103         } else if (key.compare("compaction_write_queue_cap") == 0) {
104             store.setCompactionWriteQueueCap(value);
105         } else if (key.compare("exp_pager_stime") == 0) {
106             store.setExpiryPagerSleeptime(value);
107         } else if (key.compare("alog_sleep_time") == 0) {
108             store.setAccessScannerSleeptime(value);
109         } else if (key.compare("alog_task_time") == 0) {
110             store.resetAccessScannerStartTime();
111         } else if (key.compare("mutation_mem_threshold") == 0) {
112             double mem_threshold = static_cast<double>(value) / 100;
113             StoredValue::setMutationMemoryThreshold(mem_threshold);
114         } else if (key.compare("backfill_mem_threshold") == 0) {
115             double backfill_threshold = static_cast<double>(value) / 100;
116             store.setBackfillMemoryThreshold(backfill_threshold);
117         } else if (key.compare("compaction_exp_mem_threshold") == 0) {
118             store.setCompactionExpMemThreshold(value);
119         } else if (key.compare("tap_throttle_queue_cap") == 0) {
120             store.getEPEngine().getTapThrottle().setQueueCap(value);
121         } else if (key.compare("tap_throttle_cap_pcnt") == 0) {
122             store.getEPEngine().getTapThrottle().setCapPercent(value);
123         } else {
124             LOG(EXTENSION_LOG_WARNING,
125                 "Failed to change value for unknown variable, %s\n",
126                 key.c_str());
127         }
128     }
129
130     virtual void booleanValueChanged(const std::string &key, bool value) {
131         if (key.compare("access_scanner_enabled") == 0) {
132             if (value) {
133                 store.enableAccessScannerTask();
134             } else {
135                 store.disableAccessScannerTask();
136             }
137         } else if (key.compare("bfilter_enabled") == 0) {
138             store.setAllBloomFilters(value);
139         }
140     }
141
142     virtual void floatValueChanged(const std::string &key, float value) {
143         if (key.compare("bfilter_residency_threshold") == 0) {
144             store.setBfiltersResidencyThreshold(value);
145         }
146     }
147
148 private:
149     EventuallyPersistentStore &store;
150 };
151
152 class VBucketMemoryDeletionTask : public GlobalTask {
153 public:
154     VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
155                               RCPtr<VBucket> &vb, double delay) :
156                               GlobalTask(&eng,
157                               TaskId::VBucketMemoryDeletionTask, delay, true),
158                               e(eng), vbucket(vb), vbid(vb->getId()) { }
159
160     std::string getDescription() {
161         std::stringstream ss;
162         ss << "Removing (dead) vbucket " << vbid << " from memory";
163         return ss.str();
164     }
165
166     bool run(void) {
167         vbucket->notifyAllPendingConnsFailed(e);
168         vbucket->ht.clear();
169         vbucket.reset();
170         return false;
171     }
172
173 private:
174     EventuallyPersistentEngine &e;
175     RCPtr<VBucket> vbucket;
176     uint16_t vbid;
177 };
178
179 class PendingOpsNotification : public GlobalTask {
180 public:
181     PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
182         GlobalTask(&e, TaskId::PendingOpsNotification, 0, false),
183         engine(e), vbucket(vb) { }
184
185     std::string getDescription() {
186         std::stringstream ss;
187         ss << "Notify pending operations for vbucket " << vbucket->getId();
188         return ss.str();
189     }
190
191     bool run(void) {
192         vbucket->fireAllOps(engine);
193         return false;
194     }
195
196 private:
197     EventuallyPersistentEngine &engine;
198     RCPtr<VBucket> vbucket;
199 };
200
201 EventuallyPersistentStore::EventuallyPersistentStore(
202     EventuallyPersistentEngine &theEngine) :
203     engine(theEngine), stats(engine.getEpStats()),
204     vbMap(theEngine.getConfiguration(), *this),
205     defragmenterTask(NULL),
206     bgFetchQueue(0),
207     diskFlushAll(false), bgFetchDelay(0),
208     backfillMemoryThreshold(0.95),
209     statsSnapshotTaskId(0), lastTransTimePerItem(0)
210 {
211     cachedResidentRatio.activeRatio.store(0);
212     cachedResidentRatio.replicaRatio.store(0);
213
214     Configuration &config = engine.getConfiguration();
215     MutationLog *shardlog;
216     for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
217         std::stringstream s;
218         s << i;
219         shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
220                                  "." + s.str(),
221                                  engine.getConfiguration().getAlogBlockSize());
222         accessLog.push_back(shardlog);
223     }
224
225     storageProperties = new StorageProperties(true, true, true, true);
226
227     stats.schedulingHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
228     stats.taskRuntimeHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
229
230     for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
231         stats.schedulingHisto[i].reset();
232         stats.taskRuntimeHisto[i].reset();
233     }
234
235     ExecutorPool::get()->registerBucket(ObjectRegistry::getCurrentEngine());
236
237     size_t num_vbs = config.getMaxVbuckets();
238     vb_mutexes = new Mutex[num_vbs];
239     schedule_vbstate_persist = new AtomicValue<bool>[num_vbs];
240     for (size_t i = 0; i < num_vbs; ++i) {
241         schedule_vbstate_persist[i] = false;
242     }
243
244     stats.memOverhead = sizeof(EventuallyPersistentStore);
245
246     if (config.getConflictResolutionType().compare("seqno") == 0) {
247         conflictResolver = new ConflictResolution();
248     }
249
250     stats.setMaxDataSize(config.getMaxSize());
251     config.addValueChangedListener("max_size",
252                                    new StatsValueChangeListener(stats, *this));
253     getEPEngine().getDcpConnMap().updateMaxActiveSnoozingBackfills(config.getMaxSize());
254
255     stats.mem_low_wat.store(config.getMemLowWat());
256     config.addValueChangedListener("mem_low_wat",
257                                    new StatsValueChangeListener(stats, *this));
258
259     stats.mem_high_wat.store(config.getMemHighWat());
260     config.addValueChangedListener("mem_high_wat",
261                                    new StatsValueChangeListener(stats, *this));
262
263     stats.tapThrottleThreshold.store(static_cast<double>
264                                     (config.getTapThrottleThreshold())
265                                      / 100.0);
266     config.addValueChangedListener("tap_throttle_threshold",
267                                    new StatsValueChangeListener(stats, *this));
268
269     stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
270     config.addValueChangedListener("tap_throttle_queue_cap",
271                                    new EPStoreValueChangeListener(*this));
272     config.addValueChangedListener("tap_throttle_cap_pcnt",
273                                    new EPStoreValueChangeListener(*this));
274
275     setBGFetchDelay(config.getBgFetchDelay());
276     config.addValueChangedListener("bg_fetch_delay",
277                                    new EPStoreValueChangeListener(*this));
278
279     stats.warmupMemUsedCap.store(static_cast<double>
280                                (config.getWarmupMinMemoryThreshold()) / 100.0);
281     config.addValueChangedListener("warmup_min_memory_threshold",
282                                    new StatsValueChangeListener(stats, *this));
283     stats.warmupNumReadCap.store(static_cast<double>
284                                 (config.getWarmupMinItemsThreshold()) / 100.0);
285     config.addValueChangedListener("warmup_min_items_threshold",
286                                    new StatsValueChangeListener(stats, *this));
287
288     double mem_threshold = static_cast<double>
289                                       (config.getMutationMemThreshold()) / 100;
290     StoredValue::setMutationMemoryThreshold(mem_threshold);
291     config.addValueChangedListener("mutation_mem_threshold",
292                                    new EPStoreValueChangeListener(*this));
293
294     double backfill_threshold = static_cast<double>
295                                       (config.getBackfillMemThreshold()) / 100;
296     setBackfillMemoryThreshold(backfill_threshold);
297     config.addValueChangedListener("backfill_mem_threshold",
298                                    new EPStoreValueChangeListener(*this));
299
300     config.addValueChangedListener("bfilter_enabled",
301                                    new EPStoreValueChangeListener(*this));
302
303     bfilterResidencyThreshold = config.getBfilterResidencyThreshold();
304     config.addValueChangedListener("bfilter_residency_threshold",
305                                    new EPStoreValueChangeListener(*this));
306
307     compactionExpMemThreshold = config.getCompactionExpMemThreshold();
308     config.addValueChangedListener("compaction_exp_mem_threshold",
309                                    new EPStoreValueChangeListener(*this));
310
311     compactionWriteQueueCap = config.getCompactionWriteQueueCap();
312     config.addValueChangedListener("compaction_write_queue_cap",
313                                    new EPStoreValueChangeListener(*this));
314
315     const std::string &policy = config.getItemEvictionPolicy();
316     if (policy.compare("value_only") == 0) {
317         eviction_policy = VALUE_ONLY;
318     } else {
319         eviction_policy = FULL_EVICTION;
320     }
321
322     warmupTask = new Warmup(this);
323 }
324
325 bool EventuallyPersistentStore::initialize() {
326     // We should nuke everything unless we want warmup
327     Configuration &config = engine.getConfiguration();
328     if (!config.isWarmup()) {
329         reset();
330     }
331
332     if (!startFlusher()) {
333         LOG(EXTENSION_LOG_WARNING,
334             "FATAL: Failed to create and start flushers");
335         return false;
336     }
337     if (!startBgFetcher()) {
338         LOG(EXTENSION_LOG_WARNING,
339            "FATAL: Failed to create and start bgfetchers");
340         return false;
341     }
342
343     warmupTask->start();
344
345     if (config.isFailpartialwarmup() && stats.warmOOM > 0) {
346         LOG(EXTENSION_LOG_WARNING,
347             "Warmup failed to load %d records due to OOM, exiting.\n",
348             static_cast<unsigned int>(stats.warmOOM));
349         return false;
350     }
351
352     itmpTask = new ItemPager(&engine, stats);
353     ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
354
355     size_t expiryPagerSleeptime = config.getExpPagerStime();
356     setExpiryPagerSleeptime(expiryPagerSleeptime);
357     config.addValueChangedListener("exp_pager_stime",
358                                    new EPStoreValueChangeListener(*this));
359
360     ExTask htrTask = new HashtableResizerTask(this, 10);
361     ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
362
363     size_t checkpointRemoverInterval = config.getChkRemoverStime();
364     chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
365                                                    checkpointRemoverInterval);
366     ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
367
368     ExTask vbSnapshotTask = new DaemonVBSnapshotTask(&engine);
369     ExecutorPool::get()->schedule(vbSnapshotTask, WRITER_TASK_IDX);
370
371     ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
372     ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
373
374 #if HAVE_JEMALLOC
375     /* Only create the defragmenter task if we have an underlying memory
376      * allocator which can facilitate defragmenting memory.
377      */
378     defragmenterTask = new DefragmenterTask(&engine, stats);
379     ExecutorPool::get()->schedule(defragmenterTask, NONIO_TASK_IDX);
380 #endif
381
382     return true;
383 }
384
385 EventuallyPersistentStore::~EventuallyPersistentStore() {
386     stopWarmup();
387     stopBgFetcher();
388     ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX,
389                                        stats.forceShutdown);
390
391     ExecutorPool::get()->cancel(statsSnapshotTaskId);
392     LockHolder lh(accessScanner.mutex);
393     ExecutorPool::get()->cancel(accessScanner.task);
394     lh.unlock();
395
396     stopFlusher();
397     ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine(),
398                                           stats.forceShutdown);
399
400     delete [] vb_mutexes;
401     delete [] schedule_vbstate_persist;
402     delete [] stats.schedulingHisto;
403     delete [] stats.taskRuntimeHisto;
404     delete conflictResolver;
405     delete warmupTask;
406     delete storageProperties;
407     defragmenterTask.reset();
408
409     std::vector<MutationLog*>::iterator it;
410     for (it = accessLog.begin(); it != accessLog.end(); it++) {
411         delete *it;
412     }
413 }
414
415 const Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
416     return vbMap.getShard(shardId)->getFlusher();
417 }
418
419 Warmup* EventuallyPersistentStore::getWarmup(void) const {
420     return warmupTask;
421 }
422
423 bool EventuallyPersistentStore::startFlusher() {
424     for (uint16_t i = 0; i < vbMap.numShards; ++i) {
425         Flusher *flusher = vbMap.shards[i]->getFlusher();
426         flusher->start();
427     }
428     return true;
429 }
430
431 void EventuallyPersistentStore::stopFlusher() {
432     for (uint16_t i = 0; i < vbMap.numShards; i++) {
433         Flusher *flusher = vbMap.shards[i]->getFlusher();
434         LOG(EXTENSION_LOG_WARNING, "Attempting to stop the flusher for "
435             "shard:%" PRIu16, i);
436         bool rv = flusher->stop(stats.forceShutdown);
437         if (rv && !stats.forceShutdown) {
438             flusher->wait();
439         }
440     }
441 }
442
443 bool EventuallyPersistentStore::pauseFlusher() {
444     bool rv = true;
445     for (uint16_t i = 0; i < vbMap.numShards; i++) {
446         Flusher *flusher = vbMap.shards[i]->getFlusher();
447         if (!flusher->pause()) {
448             LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
449                 "[%s], shard = %d", flusher->stateName(), i);
450             rv = false;
451         }
452     }
453     return rv;
454 }
455
456 bool EventuallyPersistentStore::resumeFlusher() {
457     bool rv = true;
458     for (uint16_t i = 0; i < vbMap.numShards; i++) {
459         Flusher *flusher = vbMap.shards[i]->getFlusher();
460         if (!flusher->resume()) {
461             LOG(EXTENSION_LOG_WARNING,
462                     "Warning: attempted to resume flusher in state [%s], "
463                     "shard = %d", flusher->stateName(), i);
464             rv = false;
465         }
466     }
467     return rv;
468 }
469
470 void EventuallyPersistentStore::wakeUpFlusher() {
471     if (stats.diskQueueSize.load() == 0) {
472         for (uint16_t i = 0; i < vbMap.numShards; i++) {
473             Flusher *flusher = vbMap.shards[i]->getFlusher();
474             flusher->wake();
475         }
476     }
477 }
478
479 bool EventuallyPersistentStore::startBgFetcher() {
480     for (uint16_t i = 0; i < vbMap.numShards; i++) {
481         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
482         if (bgfetcher == NULL) {
483             LOG(EXTENSION_LOG_WARNING,
484                 "Failed to start bg fetcher for shard %d", i);
485             return false;
486         }
487         bgfetcher->start();
488     }
489     return true;
490 }
491
492 void EventuallyPersistentStore::stopBgFetcher() {
493     for (uint16_t i = 0; i < vbMap.numShards; i++) {
494         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
495         if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
496             LOG(EXTENSION_LOG_WARNING,
497                 "Shutting down engine while there are still pending data "
498                 "read for shard %d from database storage", i);
499         }
500         LOG(EXTENSION_LOG_WARNING, "Stopping bg fetcher for shard:%" PRIu16, i);
501         bgfetcher->stop();
502     }
503 }
504
505 void
506 EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
507                                              time_t startTime,
508                                              uint64_t revSeqno) {
509     RCPtr<VBucket> vb = getVBucket(vbid);
510     if (vb) {
511         // Obtain reader access to the VB state change lock so that
512         // the VB can't switch state whilst we're processing
513         ReaderLockHolder rlh(vb->getStateLock());
514         if (vb->getState() == vbucket_state_active) {
515             int bucket_num(0);
516             incExpirationStat(vb);
517             LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
518             StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
519             if (v) {
520                 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
521                     // This is a temporary item whose background fetch for metadata
522                     // has completed.
523                     bool deleted = vb->ht.unlocked_del(key, bucket_num);
524                     cb_assert(deleted);
525                 } else if (v->isExpired(startTime) && !v->isDeleted()) {
526                     vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
527                     queueDirty(vb, v, &lh, NULL, false);
528                 }
529             } else {
530                 if (eviction_policy == FULL_EVICTION) {
531                     // Create a temp item and delete and push it
532                     // into the checkpoint queue, only if the bloomfilter
533                     // predicts that the item may exist on disk.
534                     if (vb->maybeKeyExistsInFilter(key)) {
535                         add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
536                                                                     eviction_policy);
537                         if (rv == ADD_NOMEM) {
538                             return;
539                         }
540                         v = vb->ht.unlocked_find(key, bucket_num, true, false);
541                         v->setStoredValueState(StoredValue::state_deleted_key);
542                         v->setRevSeqno(revSeqno);
543                         vb->ht.unlocked_softDelete(v, 0, eviction_policy);
544                         queueDirty(vb, v, &lh, NULL, false);
545                     }
546                 }
547             }
548         }
549     }
550 }
551
552 void
553 EventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
554                                                         std::string> > &keys) {
555     std::list<std::pair<uint16_t, std::string> >::iterator it;
556     time_t startTime = ep_real_time();
557     for (it = keys.begin(); it != keys.end(); it++) {
558         deleteExpiredItem(it->first, it->second, startTime, 0);
559     }
560 }
561
562 StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
563                                                         const std::string &key,
564                                                         int bucket_num,
565                                                         bool wantDeleted,
566                                                         bool trackReference,
567                                                         bool queueExpired) {
568     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
569                                           trackReference);
570     if (v && !v->isDeleted() && !v->isTempItem()) {
571         // In the deleted case, we ignore expiration time.
572         if (v->isExpired(ep_real_time())) {
573             if (vb->getState() != vbucket_state_active) {
574                 return wantDeleted ? v : NULL;
575             }
576
577             // queueDirty only allowed on active VB
578             if (queueExpired && vb->getState() == vbucket_state_active) {
579                 incExpirationStat(vb, false);
580                 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
581                 queueDirty(vb, v, NULL, NULL, false, true);
582             }
583             return wantDeleted ? v : NULL;
584         }
585     }
586     return v;
587 }
588
589 bool EventuallyPersistentStore::isMetaDataResident(RCPtr<VBucket> &vb,
590                                                    const std::string &key) {
591
592     cb_assert(vb);
593     int bucket_num(0);
594     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
595     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, false, false);
596
597     if (v && !v->isTempItem()) {
598         return true;
599     } else {
600         return false;
601     }
602 }
603
604 protocol_binary_response_status EventuallyPersistentStore::evictKey(
605                                                         const std::string &key,
606                                                         uint16_t vbucket,
607                                                         const char **msg,
608                                                         size_t *msg_size,
609                                                         bool force) {
610     RCPtr<VBucket> vb = getVBucket(vbucket);
611     if (!vb || (vb->getState() != vbucket_state_active && !force)) {
612         return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
613     }
614
615     int bucket_num(0);
616     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
617     StoredValue *v = fetchValidValue(vb, key, bucket_num, force, false);
618
619     protocol_binary_response_status rv(PROTOCOL_BINARY_RESPONSE_SUCCESS);
620
621     *msg_size = 0;
622     if (v) {
623         if (force)  {
624             v->markClean();
625         }
626         if (v->isResident()) {
627             if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
628                 *msg = "Ejected.";
629
630                 // Add key to bloom filter incase of full eviction mode
631                 if (getItemEvictionPolicy() == FULL_EVICTION) {
632                     vb->addToFilter(key);
633                 }
634             } else {
635                 *msg = "Can't eject: Dirty object.";
636                 rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
637             }
638         } else {
639             *msg = "Already ejected.";
640         }
641     } else {
642         if (eviction_policy == VALUE_ONLY) {
643             *msg = "Not found.";
644             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
645         } else {
646             *msg = "Already ejected.";
647         }
648     }
649
650     return rv;
651 }
652
653 ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
654                                                         LockHolder &lock,
655                                                         int bucket_num,
656                                                         const std::string &key,
657                                                         RCPtr<VBucket> &vb,
658                                                         const void *cookie,
659                                                         bool metadataOnly,
660                                                         bool isReplication) {
661
662     add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
663                                                 eviction_policy,
664                                                 isReplication);
665     switch(rv) {
666         case ADD_NOMEM:
667             return ENGINE_ENOMEM;
668         case ADD_EXISTS:
669         case ADD_UNDEL:
670         case ADD_SUCCESS:
671         case ADD_TMP_AND_BG_FETCH:
672             // Since the hashtable bucket is locked, we shouldn't get here
673             abort();
674         case ADD_BG_FETCH:
675             lock.unlock();
676             bgFetch(key, vb->getId(), cookie, metadataOnly);
677     }
678     return ENGINE_EWOULDBLOCK;
679 }
680
681 ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
682                                                  const void *cookie,
683                                                  bool force,
684                                                  uint8_t nru) {
685
686     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
687     if (!vb) {
688         ++stats.numNotMyVBuckets;
689         return ENGINE_NOT_MY_VBUCKET;
690     }
691
692     // Obtain read-lock on VB state to ensure VB state changes are interlocked
693     // with this set
694     ReaderLockHolder rlh(vb->getStateLock());
695     if (vb->getState() == vbucket_state_dead) {
696         ++stats.numNotMyVBuckets;
697         return ENGINE_NOT_MY_VBUCKET;
698     } else if (vb->getState() == vbucket_state_replica && !force) {
699         ++stats.numNotMyVBuckets;
700         return ENGINE_NOT_MY_VBUCKET;
701     } else if (vb->getState() == vbucket_state_pending && !force) {
702         if (vb->addPendingOp(cookie)) {
703             return ENGINE_EWOULDBLOCK;
704         }
705     }
706
707     bool cas_op = (itm.getCas() != 0);
708     int bucket_num(0);
709     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
710     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
711                                           false);
712     if (v && v->isLocked(ep_current_time()) &&
713         (vb->getState() == vbucket_state_replica ||
714          vb->getState() == vbucket_state_pending)) {
715         v->unlock();
716     }
717
718     bool maybeKeyExists = true;
719     // Check Bloomfilter's prediction if in full eviction policy
720     // and for a CAS operation only.
721     if (eviction_policy == FULL_EVICTION && itm.getCas() != 0) {
722         // Check Bloomfilter's prediction
723         if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
724             maybeKeyExists = false;
725         }
726     }
727
728     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(), true, false,
729                                                 eviction_policy, nru,
730                                                 maybeKeyExists);
731
732     Item& it = const_cast<Item&>(itm);
733     uint64_t seqno = 0;
734     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
735     switch (mtype) {
736     case NOMEM:
737         ret = ENGINE_ENOMEM;
738         break;
739     case INVALID_CAS:
740     case IS_LOCKED:
741         ret = ENGINE_KEY_EEXISTS;
742         break;
743     case NOT_FOUND:
744         if (cas_op) {
745             ret = ENGINE_KEY_ENOENT;
746             break;
747         }
748         // FALLTHROUGH
749     case WAS_DIRTY:
750         // Even if the item was dirty, push it into the vbucket's open
751         // checkpoint.
752     case WAS_CLEAN:
753         it.setCas(vb->nextHLCCas());
754         v->setCas(it.getCas());
755         queueDirty(vb, v, &lh, &seqno);
756         it.setBySeqno(seqno);
757         break;
758     case NEED_BG_FETCH:
759     {   // CAS operation with non-resident item + full eviction.
760         if (v) {
761             // temp item is already created. Simply schedule a bg fetch job
762             lh.unlock();
763             bgFetch(itm.getKey(), vb->getId(), cookie, true);
764             return ENGINE_EWOULDBLOCK;
765         }
766         ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
767                                     cookie, true);
768         break;
769     }
770     case INVALID_VBUCKET:
771         ret = ENGINE_NOT_MY_VBUCKET;
772         break;
773     }
774
775     return ret;
776 }
777
778 ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
779                                                  const void *cookie)
780 {
781     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
782     if (!vb) {
783         ++stats.numNotMyVBuckets;
784         return ENGINE_NOT_MY_VBUCKET;
785     }
786
787     // Obtain read-lock on VB state to ensure VB state changes are interlocked
788     // with this add
789     ReaderLockHolder rlh(vb->getStateLock());
790     if (vb->getState() == vbucket_state_dead ||
791         vb->getState() == vbucket_state_replica) {
792         ++stats.numNotMyVBuckets;
793         return ENGINE_NOT_MY_VBUCKET;
794     } else if(vb->getState() == vbucket_state_pending) {
795         if (vb->addPendingOp(cookie)) {
796             return ENGINE_EWOULDBLOCK;
797         }
798     }
799
800     if (itm.getCas() != 0) {
801         // Adding with a cas value doesn't make sense..
802         return ENGINE_NOT_STORED;
803     }
804
805     int bucket_num(0);
806     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
807     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
808                                           false);
809
810     bool maybeKeyExists = true;
811     if (eviction_policy == FULL_EVICTION) {
812         // Check bloomfilter's prediction
813         if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
814             maybeKeyExists = false;
815         }
816     }
817
818     add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
819                                            eviction_policy,
820                                            true, true,
821                                            maybeKeyExists);
822
823     Item& it = const_cast<Item&>(itm);
824     uint64_t seqno = 0;
825     switch (atype) {
826     case ADD_NOMEM:
827         return ENGINE_ENOMEM;
828     case ADD_EXISTS:
829         return ENGINE_NOT_STORED;
830     case ADD_TMP_AND_BG_FETCH:
831         return addTempItemForBgFetch(lh, bucket_num, it.getKey(), vb,
832                                      cookie, true);
833     case ADD_BG_FETCH:
834         lh.unlock();
835         bgFetch(it.getKey(), vb->getId(), cookie, true);
836         return ENGINE_EWOULDBLOCK;
837     case ADD_SUCCESS:
838     case ADD_UNDEL:
839         it.setCas(vb->nextHLCCas());
840         v->setCas(it.getCas());
841         queueDirty(vb, v, &lh, &seqno);
842         it.setBySeqno(seqno);
843         break;
844     }
845
846     return ENGINE_SUCCESS;
847 }
848
849 ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
850                                                      const void *cookie) {
851     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
852     if (!vb) {
853         ++stats.numNotMyVBuckets;
854         return ENGINE_NOT_MY_VBUCKET;
855     }
856
857     // Obtain read-lock on VB state to ensure VB state changes are interlocked
858     // with this replace
859     ReaderLockHolder rlh(vb->getStateLock());
860     if (vb->getState() == vbucket_state_dead ||
861         vb->getState() == vbucket_state_replica) {
862         ++stats.numNotMyVBuckets;
863         return ENGINE_NOT_MY_VBUCKET;
864     } else if (vb->getState() == vbucket_state_pending) {
865         if (vb->addPendingOp(cookie)) {
866             return ENGINE_EWOULDBLOCK;
867         }
868     }
869
870     int bucket_num(0);
871     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
872     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
873                                           false);
874     if (v) {
875         if (v->isDeleted() || v->isTempDeletedItem() ||
876             v->isTempNonExistentItem()) {
877             return ENGINE_KEY_ENOENT;
878         }
879
880         mutation_type_t mtype;
881         if (eviction_policy == FULL_EVICTION && v->isTempInitialItem()) {
882             mtype = NEED_BG_FETCH;
883         } else {
884             mtype = vb->ht.unlocked_set(v, itm, 0, true, false, eviction_policy,
885                                         0xff);
886         }
887
888         Item& it = const_cast<Item&>(itm);
889         uint64_t seqno = 0;
890         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
891         switch (mtype) {
892             case NOMEM:
893                 ret = ENGINE_ENOMEM;
894                 break;
895             case IS_LOCKED:
896                 ret = ENGINE_KEY_EEXISTS;
897                 break;
898             case INVALID_CAS:
899             case NOT_FOUND:
900                 ret = ENGINE_NOT_STORED;
901                 break;
902                 // FALLTHROUGH
903             case WAS_DIRTY:
904                 // Even if the item was dirty, push it into the vbucket's open
905                 // checkpoint.
906             case WAS_CLEAN:
907                 it.setCas(vb->nextHLCCas());
908                 v->setCas(it.getCas());
909                 queueDirty(vb, v, &lh, &seqno);
910                 it.setBySeqno(seqno);
911                 break;
912             case NEED_BG_FETCH:
913             {
914                 // temp item is already created. Simply schedule a bg fetch job
915                 lh.unlock();
916                 bgFetch(it.getKey(), vb->getId(), cookie, true);
917                 ret = ENGINE_EWOULDBLOCK;
918                 break;
919             }
920             case INVALID_VBUCKET:
921                 ret = ENGINE_NOT_MY_VBUCKET;
922                 break;
923         }
924
925         return ret;
926     } else {
927         if (eviction_policy == VALUE_ONLY) {
928             return ENGINE_KEY_ENOENT;
929         }
930
931         if (vb->maybeKeyExistsInFilter(itm.getKey())) {
932             return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
933                                          cookie, false);
934         } else {
935             // As bloomfilter predicted that item surely doesn't exist
936             // on disk, return ENOENT for replace().
937             return ENGINE_KEY_ENOENT;
938         }
939     }
940 }
941
942 ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(
943                                                         const Item &itm,
944                                                         uint8_t nru,
945                                                         bool genBySeqno,
946                                                         ExtendedMetaData *emd) {
947
948     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
949     if (!vb) {
950         ++stats.numNotMyVBuckets;
951         return ENGINE_NOT_MY_VBUCKET;
952     }
953
954     // Obtain read-lock on VB state to ensure VB state changes are interlocked
955     // with this add-tapbackfill
956     ReaderLockHolder rlh(vb->getStateLock());
957     if (vb->getState() == vbucket_state_dead ||
958         vb->getState() == vbucket_state_active) {
959         ++stats.numNotMyVBuckets;
960         return ENGINE_NOT_MY_VBUCKET;
961     }
962
963     //check for the incoming item's CAS validity
964     if (!Item::isValidCas(itm.getCas())) {
965         return ENGINE_KEY_EEXISTS;
966     }
967
968     int bucket_num(0);
969     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
970     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
971                                           false);
972
973     // Note that this function is only called on replica or pending vbuckets.
974     if (v && v->isLocked(ep_current_time())) {
975         v->unlock();
976     }
977     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, 0, true, true,
978                                                 eviction_policy, nru);
979
980     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
981     switch (mtype) {
982     case NOMEM:
983         ret = ENGINE_ENOMEM;
984         break;
985     case INVALID_CAS:
986     case IS_LOCKED:
987         ret = ENGINE_KEY_EEXISTS;
988         break;
989     case WAS_DIRTY:
990         // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
991         // set correctly, and also the sequence numbers are ordered correctly.
992         // (MB-14003)
993     case NOT_FOUND:
994         // FALLTHROUGH
995     case WAS_CLEAN:
996         /* set the conflict resolution mode from the extended meta data *
997          * Given that the mode is already set, we don't need to set the *
998          * conflict resolution mode in queueDirty */
999         if (emd) {
1000             v->setConflictResMode(
1001                  static_cast<enum conflict_resolution_mode>(
1002                                       emd->getConflictResMode()));
1003         }
1004         vb->setMaxCas(v->getCas());
1005         queueDirty(vb, v, &lh, NULL,true, true, genBySeqno, false);
1006         break;
1007     case INVALID_VBUCKET:
1008         ret = ENGINE_NOT_MY_VBUCKET;
1009         break;
1010     case NEED_BG_FETCH:
1011         // SET on a non-active vbucket should not require a bg_metadata_fetch.
1012         abort();
1013     }
1014
1015     // Update drift counter for vbucket upon a success only
1016     if (ret == ENGINE_SUCCESS && emd) {
1017         vb->setDriftCounter(emd->getAdjustedTime());
1018     }
1019
1020     return ret;
1021 }
1022
1023 class KVStatsCallback : public Callback<kvstats_ctx> {
1024     public:
1025         KVStatsCallback(EventuallyPersistentStore *store)
1026             : epstore(store) { }
1027
1028         void callback(kvstats_ctx &ctx) {
1029             RCPtr<VBucket> vb = epstore->getVBucket(ctx.vbucket);
1030             if (vb) {
1031                 vb->fileSpaceUsed = ctx.fileSpaceUsed;
1032                 vb->fileSize = ctx.fileSize;
1033             }
1034         }
1035
1036     private:
1037         EventuallyPersistentStore *epstore;
1038 };
1039
1040 void EventuallyPersistentStore::snapshotVBuckets(VBSnapshotTask::Priority prio,
1041                                                  uint16_t shardId) {
1042
1043     class VBucketStateVisitor : public VBucketVisitor {
1044     public:
1045         VBucketStateVisitor(VBucketMap &vb_map, uint16_t sid)
1046             : vbuckets(vb_map), shardId(sid) { }
1047         bool visitBucket(RCPtr<VBucket> &vb) {
1048             if (vbuckets.getShard(vb->getId())->getId() == shardId) {
1049                 snapshot_range_t range;
1050                 vb->getPersistedSnapshot(range);
1051                 std::string failovers = vb->failovers->toJSON();
1052                 uint64_t chkId = vbuckets.getPersistenceCheckpointId(vb->getId());
1053
1054                 vbucket_state vb_state(vb->getState(), chkId, 0,
1055                                        vb->getHighSeqno(), vb->getPurgeSeqno(),
1056                                        range.start, range.end, vb->getMaxCas(),
1057                                        vb->getDriftCounter() ,failovers);
1058                 states.insert(std::pair<uint16_t, vbucket_state>(vb->getId(), vb_state));
1059             }
1060             return false;
1061         }
1062
1063         void visit(StoredValue*) {
1064             cb_assert(false); // this does not happen
1065         }
1066
1067         std::map<uint16_t, vbucket_state> states;
1068
1069     private:
1070         VBucketMap &vbuckets;
1071         uint16_t shardId;
1072     };
1073
1074     KVShard *shard = vbMap.shards[shardId];
1075     if (prio == VBSnapshotTask::Priority::LOW) {
1076         shard->setLowPriorityVbSnapshotFlag(false);
1077     } else {
1078         shard->setHighPriorityVbSnapshotFlag(false);
1079     }
1080
1081     KVStatsCallback kvcb(this);
1082     VBucketStateVisitor v(vbMap, shard->getId());
1083     visit(v);
1084     hrtime_t start = gethrtime();
1085
1086     bool success = true;
1087     vbucket_map_t::reverse_iterator iter = v.states.rbegin();
1088     for (; iter != v.states.rend(); ++iter) {
1089         LockHolder lh(vb_mutexes[iter->first], true /*tryLock*/);
1090         if (!lh.islocked()) {
1091             continue;
1092         }
1093         KVStore *rwUnderlying = getRWUnderlying(iter->first);
1094         if (!rwUnderlying->snapshotVBucket(iter->first, iter->second,
1095                     &kvcb)) {
1096             LOG(EXTENSION_LOG_WARNING,
1097                     "VBucket snapshot task failed!!! Rescheduling");
1098             success = false;
1099             break;
1100         }
1101
1102         if (prio == VBSnapshotTask::Priority::HIGH) {
1103             if (vbMap.setBucketCreation(iter->first, false)) {
1104                 LOG(EXTENSION_LOG_INFO, "VBucket %d created", iter->first);
1105             }
1106         }
1107     }
1108
1109     if (!success) {
1110         scheduleVBSnapshot(prio, shard->getId());
1111     } else {
1112         stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
1113     }
1114 }
1115
1116 bool EventuallyPersistentStore::persistVBState(uint16_t vbid) {
1117     schedule_vbstate_persist[vbid] = false;
1118
1119     RCPtr<VBucket> vb = getVBucket(vbid);
1120     if (!vb) {
1121         LOG(EXTENSION_LOG_WARNING,
1122             "VBucket %d not exist!!! vb_state persistence task failed!!!", vbid);
1123         return false;
1124     }
1125
1126     bool inverse = false;
1127     LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1128     if (!lh.islocked()) {
1129         if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
1130                                                                    true)) {
1131             return true;
1132         } else {
1133             return false;
1134         }
1135     }
1136
1137     const hrtime_t start = gethrtime();
1138
1139     KVStatsCallback kvcb(this);
1140     uint64_t chkId = vbMap.getPersistenceCheckpointId(vbid);
1141     std::string failovers = vb->failovers->toJSON();
1142
1143     snapshot_range_t range;
1144     vb->getPersistedSnapshot(range);
1145     vbucket_state vb_state(vb->getState(), chkId, 0, vb->getHighSeqno(),
1146                            vb->getPurgeSeqno(), range.start, range.end,
1147                            vb->getMaxCas(), vb->getDriftCounter(),
1148                            failovers);
1149
1150     KVStore *rwUnderlying = getRWUnderlying(vbid);
1151     if (rwUnderlying->snapshotVBucket(vbid, vb_state, &kvcb)) {
1152         stats.persistVBStateHisto.add((gethrtime() - start) / 1000);
1153         if (vbMap.setBucketCreation(vbid, false)) {
1154             LOG(EXTENSION_LOG_INFO, "VBucket %d created", vbid);
1155         }
1156     } else {
1157         LOG(EXTENSION_LOG_WARNING,
1158             "VBucket %d: vb_state persistence task failed!!! Rescheduling", vbid);
1159
1160         if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
1161                                                                    true)) {
1162             return true;
1163         } else {
1164             return false;
1165         }
1166     }
1167     return false;
1168 }
1169
1170 ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
1171                                                            vbucket_state_t to,
1172                                                            bool transfer,
1173                                                            bool notify_dcp) {
1174     // Lock to prevent a race condition between a failed update and add.
1175     LockHolder lh(vbsetMutex);
1176     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1177     if (vb && to == vb->getState()) {
1178         return ENGINE_SUCCESS;
1179     }
1180
1181     if (vb) {
1182         vbucket_state_t oldstate = vb->getState();
1183         if (oldstate != to && notify_dcp) {
1184             engine.getDcpConnMap().vbucketStateChanged(vbid, to);
1185         }
1186
1187         vb->setState(to, engine.getServerApi());
1188
1189         if (to == vbucket_state_active && oldstate == vbucket_state_replica) {
1190             /**
1191              * Update snapshot range when vbucket goes from being a replica
1192              * to active, to maintain the correct snapshot sequence numbers
1193              * even in a failover scenario.
1194              */
1195             vb->checkpointManager.resetSnapshotRange();
1196         }
1197
1198         if (to == vbucket_state_active && !transfer) {
1199             snapshot_range_t range;
1200             vb->getPersistedSnapshot(range);
1201             if (range.end == vbMap.getPersistenceSeqno(vbid)) {
1202                 vb->failovers->createEntry(range.end);
1203             } else {
1204                 vb->failovers->createEntry(range.start);
1205             }
1206         }
1207
1208         lh.unlock();
1209         if (oldstate == vbucket_state_pending &&
1210             to == vbucket_state_active) {
1211             ExTask notifyTask = new PendingOpsNotification(engine, vb);
1212             ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
1213         }
1214         scheduleVBStatePersist(VBStatePersistTask::Priority::LOW, vbid);
1215     } else {
1216         FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
1217         KVShard* shard = vbMap.getShard(vbid);
1218         shared_ptr<Callback<uint16_t> > cb(new NotifyFlusherCB(shard));
1219         RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
1220                                          engine.getCheckpointConfig(),
1221                                          shard, 0, 0, 0, ft, cb));
1222         // The first checkpoint for active vbucket should start with id 2.
1223         uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
1224         newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
1225         if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
1226             lh.unlock();
1227             return ENGINE_ERANGE;
1228         }
1229         vbMap.setPersistenceCheckpointId(vbid, 0);
1230         vbMap.setPersistenceSeqno(vbid, 0);
1231         vbMap.setBucketCreation(vbid, true);
1232         lh.unlock();
1233         scheduleVBStatePersist(VBStatePersistTask::Priority::HIGH, vbid);
1234     }
1235     return ENGINE_SUCCESS;
1236 }
1237
1238 bool EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio) {
1239     KVShard *shard = NULL;
1240     if (prio == VBSnapshotTask::Priority::HIGH) {
1241         for (size_t i = 0; i < vbMap.numShards; ++i) {
1242             shard = vbMap.shards[i];
1243             if (shard->setHighPriorityVbSnapshotFlag(true)) {
1244                 ExTask task = new VBSnapshotTaskHigh(&engine, i, true);
1245                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1246             }
1247         }
1248     } else {
1249         for (size_t i = 0; i < vbMap.numShards; ++i) {
1250             shard = vbMap.shards[i];
1251             if (shard->setLowPriorityVbSnapshotFlag(true)) {
1252                 ExTask task = new VBSnapshotTaskLow(&engine, i, true);
1253                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1254             }
1255         }
1256     }
1257     if (stats.isShutdown) {
1258         return false;
1259     }
1260     return true;
1261 }
1262
1263 void EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio,
1264                                                    uint16_t shardId,
1265                                                    bool force) {
1266     KVShard *shard = vbMap.shards[shardId];
1267     if (prio == VBSnapshotTask::Priority::HIGH) {
1268         if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
1269             ExTask task = new VBSnapshotTaskHigh(&engine, shardId, true);
1270             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1271         }
1272     } else {
1273         if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
1274             ExTask task = new VBSnapshotTaskLow(&engine, shardId, true);
1275             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1276         }
1277     }
1278 }
1279
1280 void EventuallyPersistentStore::scheduleVBStatePersist(VBStatePersistTask::Priority priority,
1281                                                        uint16_t vbid,
1282                                                        bool force) {
1283     bool inverse = false;
1284     if (force ||
1285         schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
1286         if (priority == VBStatePersistTask::Priority::HIGH) {
1287             ExecutorPool::get()->schedule(new VBStatePersistTaskHigh(&engine, vbid, true), WRITER_TASK_IDX);
1288         } else {
1289             ExecutorPool::get()->schedule(new VBStatePersistTaskLow(&engine, vbid, true), WRITER_TASK_IDX);
1290         }
1291     }
1292 }
1293
1294 bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
1295                                                         const void* cookie) {
1296     LockHolder lh(vbsetMutex);
1297
1298     hrtime_t start_time(gethrtime());
1299     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1300     if (!vb || vb->getState() == vbucket_state_dead ||
1301          vbMap.isBucketDeletion(vbid)) {
1302         lh.unlock();
1303         LockHolder vlh(vb_mutexes[vbid]);
1304         getRWUnderlying(vbid)->delVBucket(vbid);
1305         vbMap.setBucketDeletion(vbid, false);
1306         vbMap.setPersistenceSeqno(vbid, 0);
1307         ++stats.vbucketDeletions;
1308     }
1309
1310     hrtime_t spent(gethrtime() - start_time);
1311     hrtime_t wall_time = spent / 1000;
1312     BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
1313     stats.diskVBDelHisto.add(wall_time);
1314     atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
1315     stats.vbucketDelTotWalltime.fetch_add(wall_time);
1316     if (cookie) {
1317         engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1318     }
1319
1320     return true;
1321 }
1322
1323 void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
1324                                                    const void* cookie,
1325                                                    double delay) {
1326     ExTask delTask = new VBucketMemoryDeletionTask(engine, vb, delay);
1327     ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
1328
1329     if (vbMap.setBucketDeletion(vb->getId(), true)) {
1330         ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie);
1331         ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1332     }
1333 }
1334
1335 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteVBucket(uint16_t vbid,
1336                                                            const void* c) {
1337     // Lock to prevent a race condition between a failed update and add
1338     // (and delete).
1339     LockHolder lh(vbsetMutex);
1340
1341     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1342     if (!vb) {
1343         return ENGINE_NOT_MY_VBUCKET;
1344     }
1345
1346     engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
1347     vbMap.removeBucket(vbid);
1348     lh.unlock();
1349     scheduleVBDeletion(vb, c);
1350     if (c) {
1351         return ENGINE_EWOULDBLOCK;
1352     }
1353     return ENGINE_SUCCESS;
1354 }
1355
1356 ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
1357                                                        compaction_ctx c,
1358                                                        const void *cookie) {
1359     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1360     if (!vb) {
1361         return ENGINE_NOT_MY_VBUCKET;
1362     }
1363
1364     /* Update the compaction ctx with the previous purge seqno */
1365     c.max_purged_seq = vb->getPurgeSeqno();
1366
1367     LockHolder lh(compactionLock);
1368     ExTask task = new CompactVBucketTask(&engine, vbid, c, cookie);
1369     compactionTasks.push_back(std::make_pair(vbid, task));
1370     if (compactionTasks.size() > 1) {
1371         if ((stats.diskQueueSize > compactionWriteQueueCap &&
1372             compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
1373             engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
1374             // Snooze a new compaction task.
1375             // We will wake it up when one of the existing compaction tasks is done.
1376             task->snooze(60);
1377         }
1378     }
1379
1380     ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1381
1382     LOG(EXTENSION_LOG_DEBUG, "Scheduled compaction task %d on vbucket %d,"
1383         "purge_before_ts = %lld, purge_before_seq = %lld, dropdeletes = %d",
1384         task->getId(), vbid, c.purge_before_ts,
1385         c.purge_before_seq, c.drop_deletes);
1386
1387    return ENGINE_EWOULDBLOCK;
1388 }
1389
1390 class ExpiredItemsCallback : public Callback<compaction_ctx> {
1391     public:
1392         ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid)
1393             : epstore(store), vbucket(vbid) { }
1394
1395         void callback(compaction_ctx &ctx) {
1396             std::list<expiredItemCtx>::iterator it;
1397             for (it  = ctx.expiredItems.begin();
1398                  it != ctx.expiredItems.end(); it++) {
1399                 if (epstore->compactionCanExpireItems()) {
1400                     epstore->deleteExpiredItem(vbucket, it->keyStr,
1401                                                ctx.curr_time,
1402                                                it->revSeqno);
1403                 }
1404             }
1405         }
1406
1407     private:
1408         EventuallyPersistentStore *epstore;
1409         uint16_t vbucket;
1410 };
1411
1412 bool EventuallyPersistentStore::compactVBucket(const uint16_t vbid,
1413                                                compaction_ctx *ctx,
1414                                                const void *cookie) {
1415     ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1416     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1417     if (vb) {
1418         LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1419         if (!lh.islocked()) {
1420             return true; // Schedule a compaction task again.
1421         }
1422
1423         Configuration &config = getEPEngine().getConfiguration();
1424         if (config.isBfilterEnabled()) {
1425             size_t initial_estimation = config.getBfilterKeyCount();
1426             size_t estimated_count;
1427             size_t num_deletes =
1428                     getROUnderlying(vbid)->getNumPersistedDeletes(vbid);
1429             if (eviction_policy == VALUE_ONLY) {
1430                 /**
1431                  * VALUE-ONLY EVICTION POLICY
1432                  * Obtain number of persisted deletes from underlying kvstore.
1433                  * Bloomfilter's estimated_key_count = 1.25 * deletes
1434                  */
1435
1436                 estimated_count = round(1.25 * num_deletes);
1437                 ctx->bfcb = new BfilterCB(this, vbid, false);
1438             } else {
1439                 /**
1440                  * FULL EVICTION POLICY
1441                  * First determine if the resident ratio of vbucket is less than
1442                  * the threshold from configuration.
1443                  */
1444
1445                 bool residentRatioAlert = vb->isResidentRatioUnderThreshold(
1446                                                 getBfiltersResidencyThreshold(),
1447                                                 eviction_policy);
1448                 ctx->bfcb = new BfilterCB(this, vbid, residentRatioAlert);
1449
1450                 /**
1451                  * Based on resident ratio against threshold, estimate count.
1452                  *
1453                  * 1. If resident ratio is greater than the threshold:
1454                  * Obtain number of persisted deletes from underlying kvstore.
1455                  * Obtain number of non-resident-items for vbucket.
1456                  * Bloomfilter's estimated_key_count =
1457                  *                              1.25 * (deletes + non-resident)
1458                  *
1459                  * 2. Otherwise:
1460                  * Obtain number of items for vbucket.
1461                  * Bloomfilter's estimated_key_count =
1462                  *                              1.25 * (num_items)
1463                  */
1464
1465                 if (residentRatioAlert) {
1466                     estimated_count = round(1.25 *
1467                                             vb->getNumItems(eviction_policy));
1468                 } else {
1469                     estimated_count = round(1.25 * (num_deletes +
1470                                 vb->getNumNonResidentItems(eviction_policy)));
1471                 }
1472             }
1473             if (estimated_count < initial_estimation) {
1474                 estimated_count = initial_estimation;
1475             }
1476             vb->initTempFilter(estimated_count, config.getBfilterFpProb());
1477         }
1478
1479         if (vb->getState() == vbucket_state_active) {
1480             // Set the current time ONLY for active vbuckets.
1481             ctx->curr_time = ep_real_time();
1482         } else {
1483             ctx->curr_time = 0;
1484         }
1485         ExpiredItemsCallback cb(this, vbid);
1486         KVStatsCallback kvcb(this);
1487         if (getRWUnderlying(vbid)->compactVBucket(vbid, ctx, cb, kvcb)) {
1488             if (config.isBfilterEnabled()) {
1489                 vb->swapFilter();
1490             } else {
1491                 vb->clearFilter();
1492             }
1493         } else {
1494             LOG(EXTENSION_LOG_WARNING, "Compaction: Not successful for vb %u, "
1495                     "clearing bloom filter, if any.", vb->getId());
1496             vb->clearFilter();
1497         }
1498         vb->setPurgeSeqno(ctx->max_purged_seq);
1499     } else {
1500         err = ENGINE_NOT_MY_VBUCKET;
1501         engine.storeEngineSpecific(cookie, NULL);
1502         //Decrement session counter here, as memcached thread wouldn't
1503         //visit the engine interface in case of a NOT_MY_VB notification
1504         engine.decrementSessionCtr();
1505
1506     }
1507
1508     LockHolder lh(compactionLock);
1509     bool erased = false, woke = false;
1510     std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
1511     while (it != compactionTasks.end()) {
1512         if ((*it).first == vbid) {
1513             it = compactionTasks.erase(it);
1514             erased = true;
1515         } else {
1516             ExTask &task = (*it).second;
1517             if (task->getState() == TASK_SNOOZED) {
1518                 ExecutorPool::get()->wake(task->getId());
1519                 woke = true;
1520             }
1521             ++it;
1522         }
1523         if (erased && woke) {
1524             break;
1525         }
1526     }
1527     lh.unlock();
1528
1529     if (cookie) {
1530         engine.notifyIOComplete(cookie, err);
1531     }
1532     --stats.pendingCompactions;
1533     return false;
1534 }
1535
1536 bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
1537     LockHolder lh(vbsetMutex);
1538     bool rv(false);
1539
1540     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1541     if (vb) {
1542         vbucket_state_t vbstate = vb->getState();
1543
1544         vbMap.removeBucket(vbid);
1545         lh.unlock();
1546
1547         std::list<std::string> tap_cursors = vb->checkpointManager.
1548                                              getCursorNames();
1549         // Delete and recreate the vbucket database file
1550         scheduleVBDeletion(vb, NULL, 0);
1551         setVBucketState(vbid, vbstate, false);
1552
1553         // Copy the all cursors from the old vbucket into the new vbucket
1554         RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
1555         newvb->checkpointManager.resetCursors(tap_cursors);
1556
1557         rv = true;
1558     }
1559     return rv;
1560 }
1561
1562 extern "C" {
1563
1564     typedef struct {
1565         EventuallyPersistentEngine* engine;
1566         std::map<std::string, std::string> smap;
1567     } snapshot_stats_t;
1568
1569     static void add_stat(const char *key, const uint16_t klen,
1570                          const char *val, const uint32_t vlen,
1571                          const void *cookie) {
1572         cb_assert(cookie);
1573         void *ptr = const_cast<void *>(cookie);
1574         snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1575         ObjectRegistry::onSwitchThread(snap->engine);
1576
1577         std::string k(key, klen);
1578         std::string v(val, vlen);
1579         snap->smap.insert(std::pair<std::string, std::string>(k, v));
1580     }
1581 }
1582
1583 void EventuallyPersistentStore::snapshotStats() {
1584     snapshot_stats_t snap;
1585     snap.engine = &engine;
1586     std::map<std::string, std::string>  smap;
1587     bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1588               engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1589               engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS;
1590
1591     if (rv && stats.isShutdown) {
1592         snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1593                                                               "true" : "false";
1594         std::stringstream ss;
1595         ss << ep_real_time();
1596         snap.smap["ep_shutdown_time"] = ss.str();
1597     }
1598     getOneRWUnderlying()->snapshotStats(snap.smap);
1599 }
1600
1601 void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
1602                                               const hrtime_t start,
1603                                               const hrtime_t stop) {
1604     if (stop >= start && start >= init) {
1605         // skip the measurement if the counter wrapped...
1606         ++stats.bgNumOperations;
1607         hrtime_t w = (start - init) / 1000;
1608         BlockTimer::log(start - init, "bgwait", stats.timingLog);
1609         stats.bgWaitHisto.add(w);
1610         stats.bgWait.fetch_add(w);
1611         atomic_setIfLess(stats.bgMinWait, w);
1612         atomic_setIfBigger(stats.bgMaxWait, w);
1613
1614         hrtime_t l = (stop - start) / 1000;
1615         BlockTimer::log(stop - start, "bgload", stats.timingLog);
1616         stats.bgLoadHisto.add(l);
1617         stats.bgLoad.fetch_add(l);
1618         atomic_setIfLess(stats.bgMinLoad, l);
1619         atomic_setIfBigger(stats.bgMaxLoad, l);
1620     }
1621 }
1622
1623 void EventuallyPersistentStore::completeBGFetch(const std::string &key,
1624                                                 uint16_t vbucket,
1625                                                 const void *cookie,
1626                                                 hrtime_t init,
1627                                                 bool isMeta) {
1628     hrtime_t start(gethrtime());
1629     // Go find the data
1630     RememberingCallback<GetValue> gcb;
1631     if (isMeta) {
1632         gcb.val.setPartial();
1633         ++stats.bg_meta_fetched;
1634     } else {
1635         ++stats.bg_fetched;
1636     }
1637     getROUnderlying(vbucket)->get(key, vbucket, gcb);
1638     gcb.waitForValue();
1639     cb_assert(gcb.fired);
1640     ENGINE_ERROR_CODE status = gcb.val.getStatus();
1641
1642     // Lock to prevent a race condition between a fetch for restore and delete
1643     LockHolder lh(vbsetMutex);
1644
1645     RCPtr<VBucket> vb = getVBucket(vbucket);
1646     if (vb) {
1647         ReaderLockHolder rlh(vb->getStateLock());
1648         int bucket_num(0);
1649         LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1650         StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1651         if (isMeta) {
1652             if ((v && v->unlocked_restoreMeta(gcb.val.getValue(),
1653                                               gcb.val.getStatus(), vb->ht))
1654                 || ENGINE_KEY_ENOENT == status) {
1655                 /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1656                  key is removed from hash table by the time bgfetch returns
1657                  (in case multiple bgfetch is scheduled for a key), we still
1658                  need to return ENGINE_SUCCESS to the memcached worker thread,
1659                  so that the worker thread can visit the ep-engine and figure
1660                  out the correct flow */
1661                 status = ENGINE_SUCCESS;
1662             }
1663         } else {
1664             bool restore = false;
1665             if (v && v->isResident()) {
1666                 status = ENGINE_SUCCESS;
1667             } else {
1668                 switch (eviction_policy) {
1669                     case VALUE_ONLY:
1670                         if (v && !v->isResident() && !v->isDeleted()) {
1671                             restore = true;
1672                         }
1673                         break;
1674                     case FULL_EVICTION:
1675                         if (v) {
1676                             if (v->isTempInitialItem() ||
1677                                 (!v->isResident() && !v->isDeleted())) {
1678                                 restore = true;
1679                             }
1680                         }
1681                         break;
1682                     default:
1683                         throw std::logic_error("Unknown eviction policy");
1684                 }
1685             }
1686
1687             if (restore) {
1688                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1689                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1690                     cb_assert(v->isResident());
1691                     if (vb->getState() == vbucket_state_active &&
1692                         v->getExptime() != gcb.val.getValue()->getExptime() &&
1693                         v->getCas() == gcb.val.getValue()->getCas()) {
1694                         // MB-9306: It is possible that by the time bgfetcher
1695                         // returns, the item may have been updated and queued
1696                         // Hence test the CAS value to be the same first.
1697                         // exptime mutated, schedule it into new checkpoint
1698                         queueDirty(vb, v, &hlh, NULL);
1699                     }
1700                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1701                     v->setStoredValueState(
1702                                           StoredValue::state_non_existent_key);
1703                     if (eviction_policy == FULL_EVICTION) {
1704                         // For the full eviction, we should notify
1705                         // ENGINE_SUCCESS to the memcached worker thread, so
1706                         // that the worker thread can visit the ep-engine and
1707                         // figure out the correct error code.
1708                         status = ENGINE_SUCCESS;
1709                     }
1710                 } else {
1711                     // underlying kvstore couldn't fetch requested data
1712                     // log returned error and notify TMPFAIL to client
1713                     LOG(EXTENSION_LOG_WARNING,
1714                         "Warning: failed background fetch for vb=%d "
1715                         "seq=%" PRId64 " key=%s", vbucket, v->getBySeqno(),
1716                         key.c_str());
1717                     status = ENGINE_TMPFAIL;
1718                 }
1719             }
1720         }
1721     } else {
1722         LOG(EXTENSION_LOG_INFO, "VBucket %d's file was deleted in the middle of"
1723             " a bg fetch for key %s\n", vbucket, key.c_str());
1724         status = ENGINE_NOT_MY_VBUCKET;
1725     }
1726
1727     lh.unlock();
1728
1729     hrtime_t stop = gethrtime();
1730     updateBGStats(init, start, stop);
1731     bgFetchQueue--;
1732
1733     delete gcb.val.getValue();
1734     engine.notifyIOComplete(cookie, status);
1735 }
1736
1737 void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
1738                                  std::vector<bgfetched_item_t> &fetchedItems,
1739                                  hrtime_t startTime)
1740 {
1741     RCPtr<VBucket> vb = getVBucket(vbId);
1742     if (!vb) {
1743         std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1744         for (; itemItr != fetchedItems.end(); ++itemItr) {
1745             engine.notifyIOComplete((*itemItr).second->cookie,
1746                                     ENGINE_NOT_MY_VBUCKET);
1747         }
1748         LOG(EXTENSION_LOG_WARNING,
1749             "EP Store completes %d of batched background fetch for "
1750             "for vBucket = %d that is already deleted\n",
1751             (int)fetchedItems.size(), vbId);
1752         return;
1753     }
1754
1755     std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1756     for (; itemItr != fetchedItems.end(); ++itemItr) {
1757         VBucketBGFetchItem *bgitem = (*itemItr).second;
1758         ENGINE_ERROR_CODE status = bgitem->value.getStatus();
1759         Item *fetchedValue = bgitem->value.getValue();
1760         const std::string &key = (*itemItr).first;
1761         {   // locking scope
1762             ReaderLockHolder rlh(vb->getStateLock());
1763
1764             int bucket = 0;
1765             LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
1766             StoredValue *v = fetchValidValue(vb, key, bucket, true);
1767             if (bgitem->metaDataOnly) {
1768                 if ((v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht))
1769                     || ENGINE_KEY_ENOENT == status) {
1770                     /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1771                      key is removed from hash table by the time bgfetch returns
1772                      (in case multiple bgfetch is scheduled for a key), we still
1773                      need to return ENGINE_SUCCESS to the memcached worker thread,
1774                      so that the worker thread can visit the ep-engine and figure
1775                      out the correct flow */
1776                     status = ENGINE_SUCCESS;
1777                 }
1778             } else {
1779                 bool restore = false;
1780                 if (v && v->isResident()) {
1781                     status = ENGINE_SUCCESS;
1782                 } else {
1783                     switch (eviction_policy) {
1784                         case VALUE_ONLY:
1785                             if (v && !v->isResident() && !v->isDeleted()) {
1786                                 restore = true;
1787                             }
1788                             break;
1789                         case FULL_EVICTION:
1790                             if (v) {
1791                                 if (v->isTempInitialItem() ||
1792                                     (!v->isResident() && !v->isDeleted())) {
1793                                     restore = true;
1794                                 }
1795                             }
1796                             break;
1797                         default:
1798                             throw std::logic_error("Unknown eviction policy");
1799                     }
1800                 }
1801
1802                 if (restore) {
1803                     if (status == ENGINE_SUCCESS) {
1804                         v->unlocked_restoreValue(fetchedValue, vb->ht);
1805                         cb_assert(v->isResident());
1806                         ReaderLockHolder(vb->getStateLock());
1807                         if (vb->getState() == vbucket_state_active &&
1808                             v->getExptime() != fetchedValue->getExptime() &&
1809                             v->getCas() == fetchedValue->getCas()) {
1810                             // MB-9306: It is possible that by the time
1811                             // bgfetcher returns, the item may have been
1812                             // updated and queued
1813                             // Hence test the CAS value to be the same first.
1814                             // exptime mutated, schedule it into new checkpoint
1815                             queueDirty(vb, v, &blh, NULL);
1816                         }
1817                     } else if (status == ENGINE_KEY_ENOENT) {
1818                         v->setStoredValueState(StoredValue::state_non_existent_key);
1819                         if (eviction_policy == FULL_EVICTION) {
1820                             // For the full eviction, we should notify
1821                             // ENGINE_SUCCESS to the memcached worker thread,
1822                             // so that the worker thread can visit the
1823                             // ep-engine and figure out the correct error
1824                             // code.
1825                             status = ENGINE_SUCCESS;
1826                         }
1827                     } else {
1828                         // underlying kvstore couldn't fetch requested data
1829                         // log returned error and notify TMPFAIL to client
1830                         LOG(EXTENSION_LOG_WARNING,
1831                             "Warning: failed background fetch for vb=%d "
1832                             "key=%s", vbId, key.c_str());
1833                         status = ENGINE_TMPFAIL;
1834                     }
1835                 }
1836             }
1837         } // locking scope ends
1838
1839         if (bgitem->metaDataOnly) {
1840             ++stats.bg_meta_fetched;
1841         } else {
1842             ++stats.bg_fetched;
1843         }
1844
1845         hrtime_t endTime = gethrtime();
1846         updateBGStats(bgitem->initTime, startTime, endTime);
1847         engine.notifyIOComplete(bgitem->cookie, status);
1848     }
1849
1850     LOG(EXTENSION_LOG_DEBUG,
1851         "EP Store completes %d of batched background fetch "
1852         "for vBucket = %d endTime = %lld\n",
1853         fetchedItems.size(), vbId, gethrtime()/1000000);
1854 }
1855
1856 void EventuallyPersistentStore::bgFetch(const std::string &key,
1857                                         uint16_t vbucket,
1858                                         const void *cookie,
1859                                         bool isMeta) {
1860     std::stringstream ss;
1861
1862     if (multiBGFetchEnabled()) {
1863         RCPtr<VBucket> vb = getVBucket(vbucket);
1864         cb_assert(vb);
1865         KVShard *myShard = vbMap.getShard(vbucket);
1866
1867         // schedule to the current batch of background fetch of the given
1868         // vbucket
1869         VBucketBGFetchItem * fetchThis = new VBucketBGFetchItem(cookie,
1870                                                                 isMeta);
1871         vb->queueBGFetchItem(key, fetchThis, myShard->getBgFetcher());
1872         myShard->getBgFetcher()->notifyBGEvent();
1873         ss << "Queued a background fetch, now at "
1874            << vb->numPendingBGFetchItems() << std::endl;
1875         LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1876     } else {
1877         bgFetchQueue++;
1878         stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
1879                                             bgFetchQueue.load());
1880         ExecutorPool* iom = ExecutorPool::get();
1881         ExTask task = new SingleBGFetcherTask(&engine, key, vbucket, cookie,
1882                                               isMeta, bgFetchDelay, false);
1883         iom->schedule(task, READER_TASK_IDX);
1884         ss << "Queued a background fetch, now at " << bgFetchQueue.load()
1885            << std::endl;
1886         LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1887     }
1888 }
1889
1890 GetValue EventuallyPersistentStore::getInternal(const std::string &key,
1891                                                 uint16_t vbucket,
1892                                                 const void *cookie,
1893                                                 bool queueBG,
1894                                                 bool honorStates,
1895                                                 vbucket_state_t allowedState,
1896                                                 bool trackReference,
1897                                                 bool deleteTempItem,
1898                                                 bool hideLockedCAS) {
1899
1900     vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1901         vbucket_state_replica : vbucket_state_active;
1902     RCPtr<VBucket> vb = getVBucket(vbucket);
1903     if (!vb) {
1904         ++stats.numNotMyVBuckets;
1905         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1906     }
1907
1908     ReaderLockHolder rlh(vb->getStateLock());
1909     if (honorStates && vb->getState() == vbucket_state_dead) {
1910         ++stats.numNotMyVBuckets;
1911         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1912     } else if (honorStates && vb->getState() == disallowedState) {
1913         ++stats.numNotMyVBuckets;
1914         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1915     } else if (honorStates && vb->getState() == vbucket_state_pending) {
1916         if (vb->addPendingOp(cookie)) {
1917             return GetValue(NULL, ENGINE_EWOULDBLOCK);
1918         }
1919     }
1920
1921     int bucket_num(0);
1922     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1923     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
1924                                      trackReference);
1925     if (v) {
1926         if (v->isDeleted()) {
1927             GetValue rv;
1928             return rv;
1929         }
1930         if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1931             // Delete a temp non-existent item to ensure that
1932             // if the get were issued over an item that doesn't
1933             // exist, then we dont preserve a temp item.
1934             if (deleteTempItem) {
1935                 vb->ht.unlocked_del(key, bucket_num);
1936             }
1937             GetValue rv;
1938             return rv;
1939         }
1940
1941         // If the value is not resident, wait for it...
1942         if (!v->isResident()) {
1943             if (queueBG) {
1944                 bgFetch(key, vbucket, cookie);
1945             }
1946             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
1947                             true, v->getNRUValue());
1948         }
1949
1950         // Should we hide (return -1) for the items' CAS?
1951         const bool hide_cas = hideLockedCAS &&
1952                               v->isLocked(ep_current_time());
1953         GetValue rv(v->toItem(hide_cas, vbucket), ENGINE_SUCCESS,
1954                     v->getBySeqno(), false, v->getNRUValue());
1955         return rv;
1956     } else {
1957         if (eviction_policy == VALUE_ONLY || diskFlushAll) {
1958             GetValue rv;
1959             return rv;
1960         }
1961
1962         if (vb->maybeKeyExistsInFilter(key)) {
1963             ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1964             if (queueBG) { // Full eviction and need a bg fetch.
1965                 ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
1966                                            cookie, false);
1967             }
1968             return GetValue(NULL, ec, -1, true);
1969         } else {
1970             // As bloomfilter predicted that item surely doesn't exist
1971             // on disk, return ENONET, for getInternal().
1972             GetValue rv;
1973             return rv;
1974         }
1975     }
1976 }
1977
1978 GetValue EventuallyPersistentStore::getRandomKey() {
1979     long max = vbMap.getSize();
1980
1981     long start = random() % max;
1982     long curr = start;
1983     Item *itm = NULL;
1984
1985     while (itm == NULL) {
1986         RCPtr<VBucket> vb = getVBucket(curr++);
1987         while (!vb || vb->getState() != vbucket_state_active) {
1988             if (curr == start) {
1989                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1990             }
1991             if (curr == max) {
1992                 curr = 0;
1993             }
1994
1995             vb = getVBucket(curr++);
1996         }
1997
1998         if ((itm = vb->ht.getRandomKey(random())) != NULL) {
1999             GetValue rv(itm, ENGINE_SUCCESS);
2000             return rv;
2001         }
2002
2003         if (curr == max) {
2004             curr = 0;
2005         }
2006
2007         if (curr == start) {
2008             return GetValue(NULL, ENGINE_KEY_ENOENT);
2009         }
2010         // Search next vbucket
2011     }
2012
2013     return GetValue(NULL, ENGINE_KEY_ENOENT);
2014 }
2015
2016
2017 ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
2018                                                         const std::string &key,
2019                                                         uint16_t vbucket,
2020                                                         const void *cookie,
2021                                                         ItemMetaData &metadata,
2022                                                         uint32_t &deleted,
2023                                                         uint8_t &confResMode,
2024                                                         bool trackReferenced)
2025 {
2026     (void) cookie;
2027     RCPtr<VBucket> vb = getVBucket(vbucket);
2028     if (!vb) {
2029         ++stats.numNotMyVBuckets;
2030         return ENGINE_NOT_MY_VBUCKET;
2031     }
2032
2033     ReaderLockHolder rlh(vb->getStateLock());
2034     if (vb->getState() == vbucket_state_dead ||
2035         vb->getState() == vbucket_state_replica) {
2036         ++stats.numNotMyVBuckets;
2037         return ENGINE_NOT_MY_VBUCKET;
2038     }
2039
2040     int bucket_num(0);
2041     deleted = 0;
2042     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2043     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true,
2044                                           trackReferenced);
2045
2046     if (v) {
2047         stats.numOpsGetMeta++;
2048
2049         if (v->isTempInitialItem()) { // Need bg meta fetch.
2050             bgFetch(key, vbucket, cookie, true);
2051             return ENGINE_EWOULDBLOCK;
2052         } else if (v->isTempNonExistentItem()) {
2053             metadata.cas = v->getCas();
2054             return ENGINE_KEY_ENOENT;
2055         } else {
2056             if (v->isTempDeletedItem() || v->isDeleted() ||
2057                 v->isExpired(ep_real_time())) {
2058                 deleted |= GET_META_ITEM_DELETED_FLAG;
2059             }
2060
2061             if (v->isLocked(ep_current_time())) {
2062                 metadata.cas = static_cast<uint64_t>(-1);
2063             } else {
2064                 metadata.cas = v->getCas();
2065             }
2066             metadata.flags = v->getFlags();
2067             metadata.exptime = v->getExptime();
2068             metadata.revSeqno = v->getRevSeqno();
2069             confResMode = v->getConflictResMode();
2070             return ENGINE_SUCCESS;
2071         }
2072     } else {
2073         // The key wasn't found. However, this may be because it was previously
2074         // deleted or evicted with the full eviction strategy.
2075         // So, add a temporary item corresponding to the key to the hash table
2076         // and schedule a background fetch for its metadata from the persistent
2077         // store. The item's state will be updated after the fetch completes.
2078         //
2079         // Schedule this bgFetch only if the key is predicted to be may-be
2080         // existent on disk by the bloomfilter.
2081
2082         if (vb->maybeKeyExistsInFilter(key)) {
2083             return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
2084         } else {
2085             return ENGINE_KEY_ENOENT;
2086         }
2087     }
2088 }
2089
2090 ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
2091                                                      const Item &itm,
2092                                                      uint64_t cas,
2093                                                      uint64_t *seqno,
2094                                                      const void *cookie,
2095                                                      bool force,
2096                                                      bool allowExisting,
2097                                                      uint8_t nru,
2098                                                      bool genBySeqno,
2099                                                      ExtendedMetaData *emd,
2100                                                      bool isReplication)
2101 {
2102     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
2103     if (!vb) {
2104         ++stats.numNotMyVBuckets;
2105         return ENGINE_NOT_MY_VBUCKET;
2106     }
2107
2108     ReaderLockHolder rlh(vb->getStateLock());
2109     if (vb->getState() == vbucket_state_dead) {
2110         ++stats.numNotMyVBuckets;
2111         return ENGINE_NOT_MY_VBUCKET;
2112     } else if (vb->getState() == vbucket_state_replica && !force) {
2113         ++stats.numNotMyVBuckets;
2114         return ENGINE_NOT_MY_VBUCKET;
2115     } else if (vb->getState() == vbucket_state_pending && !force) {
2116         if (vb->addPendingOp(cookie)) {
2117             return ENGINE_EWOULDBLOCK;
2118         }
2119     }
2120
2121     //check for the incoming item's CAS validity
2122     if (!Item::isValidCas(itm.getCas())) {
2123         return ENGINE_KEY_EEXISTS;
2124     }
2125
2126     int bucket_num(0);
2127     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
2128     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
2129                                           false);
2130
2131     bool maybeKeyExists = true;
2132     if (!force) {
2133         if (v)  {
2134             if (v->isTempInitialItem()) {
2135                 bgFetch(itm.getKey(), itm.getVBucketId(), cookie, true);
2136                 return ENGINE_EWOULDBLOCK;
2137             }
2138
2139             enum conflict_resolution_mode confResMode = revision_seqno;
2140             if (emd) {
2141                 confResMode = static_cast<enum conflict_resolution_mode>(
2142                                                        emd->getConflictResMode());
2143             }
2144
2145             if (!conflictResolver->resolve(vb, v, itm.getMetaData(), false,
2146                                            confResMode)) {
2147                 ++stats.numOpsSetMetaResolutionFailed;
2148                 return ENGINE_KEY_EEXISTS;
2149             }
2150         } else {
2151             if (vb->maybeKeyExistsInFilter(itm.getKey())) {
2152                 return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
2153                                              cookie, true, isReplication);
2154             } else {
2155                 maybeKeyExists = false;
2156             }
2157         }
2158     } else {
2159         if (eviction_policy == FULL_EVICTION) {
2160             // Check Bloomfilter's prediction
2161             if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
2162                 maybeKeyExists = false;
2163             }
2164         }
2165     }
2166
2167     if (v && v->isLocked(ep_current_time()) &&
2168         (vb->getState() == vbucket_state_replica ||
2169          vb->getState() == vbucket_state_pending)) {
2170         v->unlock();
2171     }
2172
2173     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
2174                                                 true, eviction_policy, nru,
2175                                                 maybeKeyExists, isReplication);
2176
2177     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2178     switch (mtype) {
2179     case NOMEM:
2180         ret = ENGINE_ENOMEM;
2181         break;
2182     case INVALID_CAS:
2183     case IS_LOCKED:
2184         ret = ENGINE_KEY_EEXISTS;
2185         break;
2186     case INVALID_VBUCKET:
2187         ret = ENGINE_NOT_MY_VBUCKET;
2188         break;
2189     case WAS_DIRTY:
2190     case WAS_CLEAN:
2191         /* set the conflict resolution mode from the extended meta data *
2192          * Given that the mode is already set, we don't need to set the *
2193          * conflict resolution mode in queueDirty */
2194         if (emd) {
2195             v->setConflictResMode(
2196                       static_cast<enum conflict_resolution_mode>(
2197                                             emd->getConflictResMode()));
2198         }
2199         vb->setMaxCas(v->getCas());
2200         queueDirty(vb, v, &lh, seqno, false, true, genBySeqno, false);
2201         break;
2202     case NOT_FOUND:
2203         ret = ENGINE_KEY_ENOENT;
2204         break;
2205     case NEED_BG_FETCH:
2206         {            // CAS operation with non-resident item + full eviction.
2207             if (v) { // temp item is already created. Simply schedule a
2208                 lh.unlock(); // bg fetch job.
2209                 bgFetch(itm.getKey(), vb->getId(), cookie, true);
2210                 return ENGINE_EWOULDBLOCK;
2211             }
2212
2213             ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
2214                                         cookie, true, isReplication);
2215         }
2216     }
2217
2218     // Update drift counter for vbucket upon a success only
2219     if (ret == ENGINE_SUCCESS && emd) {
2220         vb->setDriftCounter(emd->getAdjustedTime());
2221     }
2222
2223     return ret;
2224 }
2225
2226 GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
2227                                                     uint16_t vbucket,
2228                                                     const void *cookie,
2229                                                     time_t exptime)
2230 {
2231     RCPtr<VBucket> vb = getVBucket(vbucket);
2232     if (!vb) {
2233         ++stats.numNotMyVBuckets;
2234         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2235     }
2236
2237     ReaderLockHolder rlh(vb->getStateLock());
2238     if (vb->getState() == vbucket_state_dead) {
2239         ++stats.numNotMyVBuckets;
2240         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2241     } else if (vb->getState() == vbucket_state_replica) {
2242         ++stats.numNotMyVBuckets;
2243         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2244     } else if (vb->getState() == vbucket_state_pending) {
2245         if (vb->addPendingOp(cookie)) {
2246             return GetValue(NULL, ENGINE_EWOULDBLOCK);
2247         }
2248     }
2249
2250     int bucket_num(0);
2251     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2252     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2253
2254     if (v) {
2255         if (v->isDeleted() || v->isTempDeletedItem() ||
2256             v->isTempNonExistentItem()) {
2257             GetValue rv;
2258             return rv;
2259         }
2260
2261         if (!v->isResident()) {
2262             bgFetch(key, vbucket, cookie);
2263             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
2264         }
2265         if (v->isLocked(ep_current_time())) {
2266             GetValue rv(NULL, ENGINE_KEY_EEXISTS, 0);
2267             return rv;
2268         }
2269
2270         bool exptime_mutated = exptime != v->getExptime() ? true : false;
2271         if (exptime_mutated) {
2272            v->markDirty();
2273            v->setExptime(exptime);
2274         }
2275
2276         GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
2277                     ENGINE_SUCCESS, v->getBySeqno());
2278
2279         if (exptime_mutated) {
2280             if (vb->getState() == vbucket_state_active) {
2281                 // persist the item in the underlying storage for
2282                 // mutated exptime but only if VB is active.
2283                 queueDirty(vb, v, &lh, NULL);
2284             }
2285         }
2286         return rv;
2287     } else {
2288         if (eviction_policy == VALUE_ONLY) {
2289             GetValue rv;
2290             return rv;
2291         } else {
2292             if (vb->maybeKeyExistsInFilter(key)) {
2293                 ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
2294                                                              key, vb, cookie,
2295                                                              false);
2296                 return GetValue(NULL, ec, -1, true);
2297             } else {
2298                 // As bloomfilter predicted that item surely doesn't exist
2299                 // on disk, return ENOENT for getAndUpdateTtl().
2300                 GetValue rv;
2301                 return rv;
2302             }
2303         }
2304     }
2305 }
2306
2307 ENGINE_ERROR_CODE
2308 EventuallyPersistentStore::statsVKey(const std::string &key,
2309                                      uint16_t vbucket,
2310                                      const void *cookie) {
2311     RCPtr<VBucket> vb = getVBucket(vbucket);
2312     if (!vb) {
2313         return ENGINE_NOT_MY_VBUCKET;
2314     }
2315
2316     int bucket_num(0);
2317     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2318     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2319
2320     if (v) {
2321         if (v->isDeleted() || v->isTempDeletedItem() ||
2322             v->isTempNonExistentItem()) {
2323             return ENGINE_KEY_ENOENT;
2324         }
2325         bgFetchQueue++;
2326         cb_assert(bgFetchQueue > 0);
2327         ExecutorPool* iom = ExecutorPool::get();
2328         ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
2329                                            v->getBySeqno(), cookie,
2330                                            bgFetchDelay, false);
2331         iom->schedule(task, READER_TASK_IDX);
2332         return ENGINE_EWOULDBLOCK;
2333     } else {
2334         if (eviction_policy == VALUE_ONLY) {
2335             return ENGINE_KEY_ENOENT;
2336         } else {
2337             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2338                                                         eviction_policy);
2339             switch(rv) {
2340             case ADD_NOMEM:
2341                 return ENGINE_ENOMEM;
2342             case ADD_EXISTS:
2343             case ADD_UNDEL:
2344             case ADD_SUCCESS:
2345             case ADD_TMP_AND_BG_FETCH:
2346                 // Since the hashtable bucket is locked, we shouldn't get here
2347                 abort();
2348             case ADD_BG_FETCH:
2349                 {
2350                     ++bgFetchQueue;
2351                     cb_assert(bgFetchQueue > 0);
2352                     ExecutorPool* iom = ExecutorPool::get();
2353                     ExTask task = new VKeyStatBGFetchTask(&engine, key,
2354                                                           vbucket, -1, cookie,
2355                                                           bgFetchDelay, false);
2356                     iom->schedule(task, READER_TASK_IDX);
2357                 }
2358             }
2359             return ENGINE_EWOULDBLOCK;
2360         }
2361     }
2362 }
2363
2364 void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
2365                                                   std::string &key,
2366                                                   uint16_t vbid,
2367                                                   uint64_t bySeqNum) {
2368     RememberingCallback<GetValue> gcb;
2369
2370     getROUnderlying(vbid)->get(key, vbid, gcb);
2371     gcb.waitForValue();
2372     cb_assert(gcb.fired);
2373
2374     if (eviction_policy == FULL_EVICTION) {
2375         RCPtr<VBucket> vb = getVBucket(vbid);
2376         if (vb) {
2377             int bucket_num(0);
2378             LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
2379             StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2380             if (v && v->isTempInitialItem()) {
2381                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2382                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
2383                     cb_assert(v->isResident());
2384                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
2385                     v->setStoredValueState(
2386                                           StoredValue::state_non_existent_key);
2387                 } else {
2388                     // underlying kvstore couldn't fetch requested data
2389                     // log returned error and notify TMPFAIL to client
2390                     LOG(EXTENSION_LOG_WARNING,
2391                         "Warning: failed background fetch for vb=%d "
2392                         "seq=%" PRId64 " key=%s", vbid, v->getBySeqno(),
2393                         key.c_str());
2394                 }
2395             }
2396         }
2397     }
2398
2399     if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2400         engine.addLookupResult(cookie, gcb.val.getValue());
2401     } else {
2402         engine.addLookupResult(cookie, NULL);
2403     }
2404
2405     bgFetchQueue--;
2406     engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
2407 }
2408
2409 bool EventuallyPersistentStore::getLocked(const std::string &key,
2410                                           uint16_t vbucket,
2411                                           Callback<GetValue> &cb,
2412                                           rel_time_t currentTime,
2413                                           uint32_t lockTimeout,
2414                                           const void *cookie) {
2415     RCPtr<VBucket> vb = getVBucket(vbucket);
2416     if (!vb || vb->getState() != vbucket_state_active) {
2417         ++stats.numNotMyVBuckets;
2418         GetValue rv(NULL, ENGINE_NOT_MY_VBUCKET);
2419         cb.callback(rv);
2420         return false;
2421     }
2422
2423     int bucket_num(0);
2424     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2425     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2426
2427     if (v) {
2428         if (v->isDeleted() || v->isTempNonExistentItem() ||
2429             v->isTempDeletedItem()) {
2430             GetValue rv;
2431             cb.callback(rv);
2432             return true;
2433         }
2434
2435         // if v is locked return error
2436         if (v->isLocked(currentTime)) {
2437             GetValue rv;
2438             cb.callback(rv);
2439             return false;
2440         }
2441
2442         // If the value is not resident, wait for it...
2443         if (!v->isResident()) {
2444             if (cookie) {
2445                 bgFetch(key, vbucket, cookie);
2446             }
2447             GetValue rv(NULL, ENGINE_EWOULDBLOCK, -1, true);
2448             cb.callback(rv);
2449             return false;
2450         }
2451
2452         // acquire lock and increment cas value
2453         v->lock(currentTime + lockTimeout);
2454
2455         Item *it = v->toItem(false, vbucket);
2456         it->setCas(vb->nextHLCCas());
2457         v->setCas(it->getCas());
2458
2459         GetValue rv(it);
2460         cb.callback(rv);
2461         return true;
2462     } else {
2463         if (eviction_policy == VALUE_ONLY) {
2464             GetValue rv;
2465             cb.callback(rv);
2466             return true;
2467         } else {
2468             if (vb->maybeKeyExistsInFilter(key)) {
2469                 ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
2470                                                              key, vb, cookie,
2471                                                              false);
2472                 GetValue rv(NULL, ec, -1, true);
2473                 cb.callback(rv);
2474                 return false;
2475             } else {
2476                 // As bloomfilter predicted that item surely doesn't exist
2477                 // on disk, return ENOENT for getLocked().
2478                 GetValue rv;
2479                 cb.callback(rv);
2480                 return true;
2481             }
2482         }
2483     }
2484 }
2485
2486 ENGINE_ERROR_CODE
2487 EventuallyPersistentStore::unlockKey(const std::string &key,
2488                                      uint16_t vbucket,
2489                                      uint64_t cas,
2490                                      rel_time_t currentTime)
2491 {
2492
2493     RCPtr<VBucket> vb = getVBucket(vbucket);
2494     if (!vb || vb->getState() != vbucket_state_active) {
2495         ++stats.numNotMyVBuckets;
2496         return ENGINE_NOT_MY_VBUCKET;
2497     }
2498
2499     int bucket_num(0);
2500     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2501     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2502
2503     if (v) {
2504         if (v->isDeleted() || v->isTempNonExistentItem() ||
2505             v->isTempDeletedItem()) {
2506             return ENGINE_KEY_ENOENT;
2507         }
2508         if (v->isLocked(currentTime)) {
2509             if (v->getCas() == cas) {
2510                 v->unlock();
2511                 return ENGINE_SUCCESS;
2512             }
2513         }
2514         return ENGINE_TMPFAIL;
2515     } else {
2516         if (eviction_policy == VALUE_ONLY) {
2517             return ENGINE_KEY_ENOENT;
2518         } else {
2519             // With the full eviction, an item's lock is automatically
2520             // released when the item is evicted from memory. Therefore,
2521             // we simply return ENGINE_TMPFAIL when we receive unlockKey
2522             // for an item that is not in memocy cache. Note that we don't
2523             // spawn any bg fetch job to figure out if an item actually
2524             // exists in disk or not.
2525             return ENGINE_TMPFAIL;
2526         }
2527     }
2528 }
2529
2530
2531 ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
2532                                             const std::string &key,
2533                                             uint16_t vbucket,
2534                                             const void *cookie,
2535                                             struct key_stats &kstats,
2536                                             bool bgfetch,
2537                                             bool wantsDeleted)
2538 {
2539     RCPtr<VBucket> vb = getVBucket(vbucket);
2540     if (!vb) {
2541         return ENGINE_NOT_MY_VBUCKET;
2542     }
2543
2544     int bucket_num(0);
2545     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2546     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2547
2548     if (v) {
2549         if ((v->isDeleted() && !wantsDeleted) ||
2550             v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2551             return ENGINE_KEY_ENOENT;
2552         }
2553         if (eviction_policy == FULL_EVICTION &&
2554             v->isTempInitialItem() && bgfetch) {
2555             lh.unlock();
2556             bgFetch(key, vbucket, cookie, true);
2557             return ENGINE_EWOULDBLOCK;
2558         }
2559         kstats.logically_deleted = v->isDeleted();
2560         kstats.dirty = v->isDirty();
2561         kstats.exptime = v->getExptime();
2562         kstats.flags = v->getFlags();
2563         kstats.cas = v->getCas();
2564         kstats.vb_state = vb->getState();
2565         return ENGINE_SUCCESS;
2566     } else {
2567         if (eviction_policy == VALUE_ONLY) {
2568             return ENGINE_KEY_ENOENT;
2569         } else {
2570             if (bgfetch && vb->maybeKeyExistsInFilter(key)) {
2571                 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2572                                              cookie, true);
2573             } else {
2574                 // If bgFetch were false, or bloomfilter predicted that
2575                 // item surely doesn't exist on disk, return ENOENT for
2576                 // getKeyStats().
2577                 return ENGINE_KEY_ENOENT;
2578             }
2579         }
2580     }
2581 }
2582
2583 std::string EventuallyPersistentStore::validateKey(const std::string &key,
2584                                                    uint16_t vbucket,
2585                                                    Item &diskItem) {
2586     int bucket_num(0);
2587     RCPtr<VBucket> vb = getVBucket(vbucket);
2588     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2589     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
2590                                      false, true);
2591
2592     if (v) {
2593         if (v->isDeleted() || v->isTempNonExistentItem() ||
2594             v->isTempDeletedItem()) {
2595             return "item_deleted";
2596         }
2597
2598         if (diskItem.getFlags() != v->getFlags()) {
2599             return "flags_mismatch";
2600         } else if (v->isResident() && memcmp(diskItem.getData(),
2601                                              v->getValue()->getData(),
2602                                              diskItem.getNBytes())) {
2603             return "data_mismatch";
2604         } else {
2605             return "valid";
2606         }
2607     } else {
2608         return "item_deleted";
2609     }
2610
2611 }
2612
2613 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
2614                                                         uint64_t *cas,
2615                                                         uint16_t vbucket,
2616                                                         const void *cookie,
2617                                                         bool force,
2618                                                         ItemMetaData *itemMeta,
2619                                                         mutation_descr_t *mutInfo,
2620                                                         bool tapBackfill)
2621 {
2622     RCPtr<VBucket> vb = getVBucket(vbucket);
2623     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2624         ++stats.numNotMyVBuckets;
2625         return ENGINE_NOT_MY_VBUCKET;
2626     } else if(vb->getState() == vbucket_state_replica && !force) {
2627         ++stats.numNotMyVBuckets;
2628         return ENGINE_NOT_MY_VBUCKET;
2629     } else if(vb->getState() == vbucket_state_pending && !force) {
2630         if (vb->addPendingOp(cookie)) {
2631             return ENGINE_EWOULDBLOCK;
2632         }
2633     }
2634
2635     int bucket_num(0);
2636     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2637     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2638     if (!v || v->isDeleted() || v->isTempItem()) {
2639         if (eviction_policy == VALUE_ONLY) {
2640             return ENGINE_KEY_ENOENT;
2641         } else { // Full eviction.
2642             if (!force) {
2643                 if (!v) { // Item might be evicted from cache.
2644                     if (vb->maybeKeyExistsInFilter(key)) {
2645                         return addTempItemForBgFetch(lh, bucket_num, key, vb,
2646                                                      cookie, true);
2647                     } else {
2648                         // As bloomfilter predicted that item surely doesn't
2649                         // exist on disk, return ENOENT for deleteItem().
2650                         return ENGINE_KEY_ENOENT;
2651                     }
2652                 } else if (v->isTempInitialItem()) {
2653                     lh.unlock();
2654                     bgFetch(key, vbucket, cookie, true);
2655                     return ENGINE_EWOULDBLOCK;
2656                 } else { // Non-existent or deleted key.
2657                     if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2658                         // Delete a temp non-existent item to ensure that
2659                         // if a delete were issued over an item that doesn't
2660                         // exist, then we don't preserve a temp item.
2661                         vb->ht.unlocked_del(key, bucket_num);
2662                     }
2663                     return ENGINE_KEY_ENOENT;
2664                 }
2665             } else {
2666                 if (!v) { // Item might be evicted from cache.
2667                     // Create a temp item and delete it below as it is a
2668                     // force deletion, only if bloomfilter predicts that
2669                     // item may exist on disk.
2670                     if (vb->maybeKeyExistsInFilter(key)) {
2671                         add_type_t rv = vb->ht.unlocked_addTempItem(
2672                                                                bucket_num,
2673                                                                key,
2674                                                                eviction_policy);
2675                         if (rv == ADD_NOMEM) {
2676                             return ENGINE_ENOMEM;
2677                         }
2678                         v = vb->ht.unlocked_find(key, bucket_num, true, false);
2679                         v->setStoredValueState(StoredValue::state_deleted_key);
2680                     } else {
2681                         return ENGINE_KEY_ENOENT;
2682                     }
2683                 } else if (v->isTempInitialItem()) {
2684                     v->setStoredValueState(StoredValue::state_deleted_key);
2685                 } else { // Non-existent or deleted key.
2686                     if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2687                         // Delete a temp non-existent item to ensure that
2688                         // if a delete were issued over an item that doesn't
2689                         // exist, then we don't preserve a temp item.
2690                         vb->ht.unlocked_del(key, bucket_num);
2691                     }
2692                     return ENGINE_KEY_ENOENT;
2693                 }
2694             }
2695         }
2696     }
2697
2698     if (v && v->isLocked(ep_current_time()) &&
2699         (vb->getState() == vbucket_state_replica ||
2700          vb->getState() == vbucket_state_pending)) {
2701         v->unlock();
2702     }
2703     mutation_type_t delrv;
2704     delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
2705
2706     if (itemMeta && v) {
2707         itemMeta->revSeqno = v->getRevSeqno();
2708         itemMeta->cas = v->getCas();
2709         itemMeta->flags = v->getFlags();
2710         itemMeta->exptime = v->getExptime();
2711     }
2712     *cas = v ? v->getCas() : 0;
2713
2714     uint64_t seqno = 0;
2715     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2716     switch (delrv) {
2717     case NOMEM:
2718         ret = ENGINE_ENOMEM;
2719         break;
2720     case INVALID_VBUCKET:
2721         ret = ENGINE_NOT_MY_VBUCKET;
2722         break;
2723     case INVALID_CAS:
2724         ret = ENGINE_KEY_EEXISTS;
2725         break;
2726     case IS_LOCKED:
2727         ret = ENGINE_TMPFAIL;
2728         break;
2729     case NOT_FOUND:
2730         ret = ENGINE_KEY_ENOENT;
2731         if (v) {
2732             queueDirty(vb, v, &lh, NULL, tapBackfill);
2733         }
2734         break;
2735     case WAS_DIRTY:
2736     case WAS_CLEAN:
2737         queueDirty(vb, v, &lh, &seqno, tapBackfill);
2738         mutInfo->seqno = seqno;
2739         mutInfo->vbucket_uuid = vb->failovers->getLatestUUID();
2740         break;
2741     case NEED_BG_FETCH:
2742         // We already figured out if a bg fetch is requred for a full-evicted
2743         // item above.
2744         abort();
2745     }
2746     return ret;
2747 }
2748
2749 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
2750                                                      const std::string &key,
2751                                                      uint64_t *cas,
2752                                                      uint64_t *seqno,
2753                                                      uint16_t vbucket,
2754                                                      const void *cookie,
2755                                                      bool force,
2756                                                      ItemMetaData *itemMeta,
2757                                                      bool tapBackfill,
2758                                                      bool genBySeqno,
2759                                                      uint64_t bySeqno,
2760                                                      ExtendedMetaData *emd,
2761                                                      bool isReplication)
2762 {
2763     RCPtr<VBucket> vb = getVBucket(vbucket);
2764     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2765         ++stats.numNotMyVBuckets;
2766         return ENGINE_NOT_MY_VBUCKET;
2767     } else if(vb->getState() == vbucket_state_replica && !force) {
2768         ++stats.numNotMyVBuckets;
2769         return ENGINE_NOT_MY_VBUCKET;
2770     } else if(vb->getState() == vbucket_state_pending && !force) {
2771         if (vb->addPendingOp(cookie)) {
2772             return ENGINE_EWOULDBLOCK;
2773         }
2774     }
2775
2776     //check for the incoming item's CAS validity
2777     if (!Item::isValidCas(itemMeta->cas)) {
2778         return ENGINE_KEY_EEXISTS;
2779     }
2780
2781     int bucket_num(0);
2782     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2783     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2784     if (!force) { // Need conflict resolution.
2785         if (v)  {
2786             if (v->isTempInitialItem()) {
2787                 bgFetch(key, vbucket, cookie, true);
2788                 return ENGINE_EWOULDBLOCK;
2789             }
2790
2791             enum conflict_resolution_mode confResMode = revision_seqno;
2792             if (emd) {
2793                 confResMode = static_cast<enum conflict_resolution_mode>(
2794                                                        emd->getConflictResMode());
2795             }
2796
2797             if (!conflictResolver->resolve(vb, v, *itemMeta, true, confResMode)) {
2798                 ++stats.numOpsDelMetaResolutionFailed;
2799                 return ENGINE_KEY_EEXISTS;
2800             }
2801         } else {
2802             // Item is 1) deleted or not existent in the value eviction case OR
2803             // 2) deleted or evicted in the full eviction.
2804             if (vb->maybeKeyExistsInFilter(key)) {
2805                 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2806                                              cookie, true, isReplication);
2807             } else {
2808                 // Even though bloomfilter predicted that item doesn't exist
2809                 // on disk, we must put this delete on disk if the cas is valid.
2810                 add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2811                                                             eviction_policy,
2812                                                             isReplication);
2813                 if (rv == ADD_NOMEM) {
2814                     return ENGINE_ENOMEM;
2815                 }
2816                 v = vb->ht.unlocked_find(key, bucket_num, true, false);
2817                 v->setStoredValueState(StoredValue::state_deleted_key);
2818             }
2819         }
2820     } else {
2821         if (!v) {
2822             // We should always try to persist a delete here.
2823             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2824                                                         eviction_policy,
2825                                                         isReplication);
2826             if (rv == ADD_NOMEM) {
2827                 return ENGINE_ENOMEM;
2828             }
2829             v = vb->ht.unlocked_find(key, bucket_num, true, false);
2830             v->setStoredValueState(StoredValue::state_deleted_key);
2831             v->setCas(*cas);
2832         } else if (v->isTempInitialItem()) {
2833             v->setStoredValueState(StoredValue::state_deleted_key);
2834             v->setCas(*cas);
2835         }
2836     }
2837
2838     if (v && v->isLocked(ep_current_time()) &&
2839         (vb->getState() == vbucket_state_replica ||
2840          vb->getState() == vbucket_state_pending)) {
2841         v->unlock();
2842     }
2843     mutation_type_t delrv;
2844     delrv = vb->ht.unlocked_softDelete(v, *cas, *itemMeta,
2845                                        eviction_policy, true);
2846     *cas = v ? v->getCas() : 0;
2847
2848     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2849     switch (delrv) {
2850     case NOMEM:
2851         ret = ENGINE_ENOMEM;
2852         break;
2853     case INVALID_VBUCKET:
2854         ret = ENGINE_NOT_MY_VBUCKET;
2855         break;
2856     case INVALID_CAS:
2857         ret = ENGINE_KEY_EEXISTS;
2858         break;
2859     case IS_LOCKED:
2860         ret = ENGINE_TMPFAIL;
2861         break;
2862     case NOT_FOUND:
2863         ret = ENGINE_KEY_ENOENT;
2864         break;
2865     case WAS_DIRTY:
2866     case WAS_CLEAN:
2867         if (!genBySeqno) {
2868             v->setBySeqno(bySeqno);
2869         }
2870
2871         /* set the conflict resolution mode from the extended meta data *
2872          * Given that the mode is already set, we don't need to set the *
2873          * conflict resolution mode in queueDirty */
2874         if (emd) {
2875             v->setConflictResMode(
2876                static_cast<enum conflict_resolution_mode>(
2877                                          emd->getConflictResMode()));
2878         }
2879         vb->setMaxCas(v->getCas());
2880         queueDirty(vb, v, &lh, seqno, tapBackfill, true, genBySeqno, false);
2881         break;
2882     case NEED_BG_FETCH:
2883         lh.unlock();
2884         bgFetch(key, vbucket, cookie, true);
2885         ret = ENGINE_EWOULDBLOCK;
2886     }
2887
2888     // Update drift counter for vbucket upon a success only
2889     if (ret == ENGINE_SUCCESS && emd) {
2890         vb->setDriftCounter(emd->getAdjustedTime());
2891     }
2892
2893     return ret;
2894 }
2895
2896 void EventuallyPersistentStore::reset() {
2897     std::vector<int> buckets = vbMap.getBuckets();
2898     std::vector<int>::iterator it;
2899     for (it = buckets.begin(); it != buckets.end(); ++it) {
2900         RCPtr<VBucket> vb = getVBucket(*it);
2901         if (vb) {
2902             LockHolder lh(vb_mutexes[vb->getId()]);
2903             vb->ht.clear();
2904             vb->checkpointManager.clear(vb->getState());
2905             vb->resetStats();
2906             vb->setPersistedSnapshot(0, 0);
2907         }
2908     }
2909
2910     ++stats.diskQueueSize;
2911     bool inverse = true;
2912     flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, false);
2913     // Waking up (notifying) one flusher is good enough for diskFlushAll
2914     vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
2915 }
2916
2917 /**
2918  * Callback invoked after persisting an item from memory to disk.
2919  *
2920  * This class exists to create a closure around a few variables within
2921  * EventuallyPersistentStore::flushOne so that an object can be
2922  * requeued in case of failure to store in the underlying layer.
2923  */
2924 class PersistenceCallback : public Callback<mutation_result>,
2925                             public Callback<int> {
2926 public:
2927
2928     PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
2929                         EventuallyPersistentStore *st, EPStats *s, uint64_t c)
2930         : queuedItem(qi), vbucket(vb), store(st), stats(s), cas(c) {
2931         cb_assert(vb);
2932         cb_assert(s);
2933     }
2934
2935     // This callback is invoked for set only.
2936     void callback(mutation_result &value) {
2937         if (value.first == 1) {
2938             int bucket_num(0);
2939             LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2940                                                         &bucket_num);
2941             StoredValue *v = store->fetchValidValue(vbucket,
2942                                                     queuedItem->getKey(),
2943                                                     bucket_num, true, false);
2944             if (v) {
2945                 if (v->getCas() == cas) {
2946                     // mark this item clean only if current and stored cas
2947                     // value match
2948                     v->markClean();
2949                 }
2950                 if (v->isNewCacheItem()) {
2951                     if (value.second) {
2952                         // Insert in value-only or full eviction mode.
2953                         ++vbucket->opsCreate;
2954                         vbucket->incrMetaDataDisk(*queuedItem);
2955                     } else { // Update in full eviction mode.
2956                         vbucket->ht.decrNumTotalItems();
2957                         ++vbucket->opsUpdate;
2958                     }
2959                     v->setNewCacheItem(false);
2960                 } else { // Update in value-only or full eviction mode.
2961                     ++vbucket->opsUpdate;
2962                 }
2963             }
2964
2965             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2966             stats->decrDiskQueueSize(1);
2967             stats->totalPersisted++;
2968         } else {
2969             // If the return was 0 here, we're in a bad state because
2970             // we do not know the rowid of this object.
2971             if (value.first == 0) {
2972                 int bucket_num(0);
2973                 LockHolder lh = vbucket->ht.getLockedBucket(
2974                                            queuedItem->getKey(), &bucket_num);
2975                 StoredValue *v = store->fetchValidValue(vbucket,
2976                                                         queuedItem->getKey(),
2977                                                         bucket_num, true,
2978                                                         false);
2979                 if (v) {
2980                     std::stringstream ss;
2981                     ss << "Persisting ``" << queuedItem->getKey() << "'' on vb"
2982                        << queuedItem->getVBucketId() << " (rowid="
2983                        << v->getBySeqno() << ") returned 0 updates\n";
2984                     LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2985                 } else {
2986                     LOG(EXTENSION_LOG_WARNING,
2987                         "Error persisting now missing ``%s'' from vb%d",
2988                         queuedItem->getKey().c_str(),
2989                         queuedItem->getVBucketId());
2990                 }
2991
2992                 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2993                 stats->decrDiskQueueSize(1);
2994             } else {
2995                 std::stringstream ss;
2996                 ss <<
2997                 "Fatal error in persisting SET ``" <<
2998                 queuedItem->getKey() << "'' on vb "
2999                    << queuedItem->getVBucketId() << "!!! Requeue it...\n";
3000                 LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
3001                 redirty();
3002             }
3003         }
3004     }
3005
3006     // This callback is invoked for deletions only.
3007     //
3008     // The boolean indicates whether the underlying storage
3009     // successfully deleted the item.
3010     void callback(int &value) {
3011         // > 1 would be bad.  We were only trying to delete one row.
3012         cb_assert(value < 2);
3013         // -1 means fail
3014         // 1 means we deleted one row
3015         // 0 means we did not delete a row, but did not fail (did not exist)
3016         if (value >= 0) {
3017             // We have succesfully removed an item from the disk, we
3018             // may now remove it from the hash table.
3019             int bucket_num(0);
3020             LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
3021                                                         &bucket_num);
3022             StoredValue *v = store->fetchValidValue(vbucket,
3023                                                     queuedItem->getKey(),
3024                                                     bucket_num, true, false);
3025             if (v && v->isDeleted()) {
3026                 bool newCacheItem = v->isNewCacheItem();
3027                 bool deleted = vbucket->ht.unlocked_del(queuedItem->getKey(),
3028                                                         bucket_num);
3029                 cb_assert(deleted);
3030                 if (newCacheItem && value > 0) {
3031                     // Need to decrement the item counter again for an item that
3032                     // exists on DB file, but not in memory (i.e., full eviction),
3033                     // because we created the temp item in memory and incremented
3034                     // the item counter when a deletion is pushed in the queue.
3035                     vbucket->ht.decrNumTotalItems();
3036                 }
3037             }
3038
3039             /**
3040              * Deleted items are to be added to the bloomfilter,
3041              * in either eviction policy.
3042              */
3043             vbucket->addToFilter(queuedItem->getKey());
3044
3045             if (value > 0) {
3046                 ++stats->totalPersisted;
3047                 ++vbucket->opsDelete;
3048             }
3049             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
3050             stats->decrDiskQueueSize(1);
3051             vbucket->decrMetaDataDisk(*queuedItem);
3052         } else {
3053             std::stringstream ss;
3054             ss << "Fatal error in persisting DELETE ``" <<
3055             queuedItem->getKey() << "'' on vb "
3056                << queuedItem->getVBucketId() << "!!! Requeue it...\n";
3057             LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
3058             redirty();
3059         }
3060     }
3061
3062 private:
3063
3064     void redirty() {
3065         if (store->vbMap.isBucketDeletion(vbucket->getId())) {
3066             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
3067             stats->decrDiskQueueSize(1);
3068             return;
3069         }
3070         ++stats->flushFailed;
3071         store->invokeOnLockedStoredValue(queuedItem->getKey(),
3072                                          queuedItem->getVBucketId(),
3073                                          &StoredValue::reDirty);
3074         vbucket->rejectQueue.push(queuedItem);
3075     }
3076
3077     const queued_item queuedItem;
3078     RCPtr<VBucket> &vbucket;
3079     EventuallyPersistentStore *store;
3080     EPStats *stats;
3081     uint64_t cas;
3082     DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
3083 };
3084
3085 bool EventuallyPersistentStore::scheduleFlushAllTask(const void* cookie,
3086                                                      time_t when) {
3087     bool inverse = false;
3088     if (diskFlushAll.compare_exchange_strong(inverse, true)) {
3089         flushAllTaskCtx.cookie = cookie;
3090         flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
3091         ExTask task = new FlushAllTask(&engine, static_cast<double>(when));
3092         ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
3093         return true;
3094     } else {
3095         return false;
3096     }
3097 }
3098
3099 void EventuallyPersistentStore::setFlushAllComplete() {
3100     // Notify memcached about flushAll task completion, and
3101     // set diskFlushall flag to false
3102     if (flushAllTaskCtx.cookie) {
3103         engine.notifyIOComplete(flushAllTaskCtx.cookie, ENGINE_SUCCESS);
3104     }
3105     bool inverse = false;
3106     flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
3107     inverse = true;
3108     diskFlushAll.compare_exchange_strong(inverse, false);
3109 }
3110
3111 void EventuallyPersistentStore::flushOneDeleteAll() {
3112     for (size_t i = 0; i < vbMap.getSize(); ++i) {
3113         RCPtr<VBucket> vb = getVBucket(i);
3114         if (vb) {
3115             LockHolder lh(vb_mutexes[vb->getId()]);
3116             getRWUnderlying(vb->getId())->reset(i);
3117         }
3118     }
3119
3120     stats.decrDiskQueueSize(1);
3121     setFlushAllComplete();
3122 }
3123
3124 int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
3125     KVShard *shard = vbMap.getShard(vbid);
3126     if (diskFlushAll && !flushAllTaskCtx.delayFlushAll) {
3127         if (shard->getId() == EP_PRIMARY_SHARD) {
3128             flushOneDeleteAll();
3129         } else {
3130             // disk flush is pending just return
3131             return 0;
3132         }
3133     }
3134
3135     if (vbMap.isBucketCreation(vbid)) {
3136         return RETRY_FLUSH_VBUCKET;
3137     }
3138
3139     int items_flushed = 0;
3140     rel_time_t flush_start = ep_current_time();
3141
3142     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3143     if (vb) {
3144         LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
3145         if (!lh.islocked()) { // Try another bucket if this one is locked
3146             return RETRY_FLUSH_VBUCKET; // to avoid blocking flusher
3147         }
3148
3149         KVStatsCallback cb(this);
3150         std::vector<queued_item> items;
3151         KVStore *rwUnderlying = getRWUnderlying(vbid);
3152
3153         while (!vb->rejectQueue.empty()) {
3154             items.push_back(vb->rejectQueue.front());
3155             vb->rejectQueue.pop();
3156         }
3157
3158         const std::string cursor(CheckpointManager::pCursorName);
3159         vb->getBackfillItems(items);
3160
3161         snapshot_range_t range;
3162         range = vb->checkpointManager.getAllItemsForCursor(cursor, items);
3163
3164         if (!items.empty()) {
3165             while (!rwUnderlying->begin()) {
3166                 ++stats.beginFailed;
3167                 LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
3168                     "Retry in 1 sec ...");
3169                 sleep(1);
3170             }
3171             rwUnderlying->optimizeWrites(items);
3172
3173             Item *prev = NULL;
3174             uint64_t maxSeqno = 0;
3175             uint64_t maxCas = 0;
3176             std::list<PersistenceCallback*> pcbs;
3177             std::vector<queued_item>::iterator it = items.begin();
3178             for(; it != items.end(); ++it) {
3179                 if ((*it)->getOperation() != queue_op_set &&
3180                     (*it)->getOperation() != queue_op_del) {
3181                     continue;
3182                 } else if (!prev || prev->getKey() != (*it)->getKey()) {
3183                     prev = (*it).get();
3184                     ++items_flushed;
3185                     PersistenceCallback *cb = flushOneDelOrSet(*it, vb);
3186                     if (cb) {
3187                         pcbs.push_back(cb);
3188                     }
3189
3190                     maxSeqno = std::max(maxSeqno, (uint64_t)(*it)->getBySeqno());
3191                     maxCas = std::max(maxCas, (uint64_t)(*it)->getCas());
3192                     ++stats.flusher_todo;
3193                 } else {
3194                     stats.decrDiskQueueSize(1);
3195                     vb->doStatsForFlushing(*(*it), (*it)->size());
3196                 }
3197             }
3198
3199             BlockTimer timer(&stats.diskCommitHisto, "disk_commit",
3200                              stats.timingLog);
3201             hrtime_t start = gethrtime();
3202
3203             if (vb->getState() == vbucket_state_active) {
3204                 range.start = maxSeqno;
3205                 range.end = maxSeqno;
3206             }
3207
3208             while (!rwUnderlying->commit(&cb, range.start, range.end, maxCas,
3209                                          vb->getDriftCounter())) {
3210                 ++stats.commitFailed;
3211                 LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
3212                     "1 sec...\n");
3213                 sleep(1);
3214
3215             }
3216
3217             if (vb->rejectQueue.empty()) {
3218                 vb->setPersistedSnapshot(range.start, range.end);
3219                 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
3220                 if (highSeqno > 0 &&
3221                     highSeqno != vbMap.getPersistenceSeqno(vbid)) {
3222                     vbMap.setPersistenceSeqno(vbid, highSeqno);
3223                     vb->notifySeqnoPersisted(highSeqno);
3224                 }
3225             }
3226
3227             while (!pcbs.empty()) {
3228                 delete pcbs.front();
3229                 pcbs.pop_front();
3230             }
3231
3232             ++stats.flusherCommits;
3233             hrtime_t end = gethrtime();
3234             uint64_t commit_time = (end - start) / 1000000;
3235             uint64_t trans_time = (end - flush_start) / 1000000;
3236
3237             lastTransTimePerItem.store((items_flushed == 0) ? 0 :
3238                                        static_cast<double>(trans_time) /
3239                                        static_cast<double>(items_flushed));
3240             stats.commit_time.store(commit_time);
3241             stats.cumulativeCommitTime.fetch_add(commit_time);
3242             stats.cumulativeFlushTime.fetch_add(ep_current_time()
3243                                                 - flush_start);
3244             stats.flusher_todo.store(0);
3245         }
3246
3247         rwUnderlying->pendingTasks();
3248
3249         if (vb->checkpointManager.getNumCheckpoints() > 1) {
3250             wakeUpCheckpointRemover();
3251         }
3252
3253         if (vb->rejectQueue.empty()) {
3254             vb->checkpointManager.itemsPersisted();
3255             uint64_t seqno = vbMap.getPersistenceSeqno(vbid);
3256             uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
3257             vb->notifyOnPersistence(engine, seqno, true);
3258             vb->notifyOnPersistence(engine, chkid, false);
3259             if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
3260                 vbMap.setPersistenceCheckpointId(vbid, chkid);
3261             }
3262         } else {
3263             return RETRY_FLUSH_VBUCKET;
3264         }
3265     }
3266
3267     return items_flushed;
3268 }
3269
3270 PersistenceCallback*
3271 EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
3272                                             RCPtr<VBucket> &vb) {
3273
3274     if (!vb) {
3275         stats.decrDiskQueueSize(1);
3276         return NULL;
3277     }
3278
3279     int64_t bySeqno = qi->getBySeqno();
3280     bool deleted = qi->isDeleted();
3281     rel_time_t queued(qi->getQueuedTime());
3282
3283     int dirtyAge = ep_current_time() - queued;
3284     stats.dirtyAgeHisto.add(dirtyAge * 1000000);
3285     stats.dirtyAge.store(dirtyAge);
3286     stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
3287                                          stats.dirtyAgeHighWat.load()));
3288
3289     // Wait until the vbucket database is created by the vbucket state
3290     // snapshot task.
3291     if (vbMap.isBucketCreation(qi->getVBucketId()) ||
3292         vbMap.isBucketDeletion(qi->getVBucketId())) {
3293         vb->rejectQueue.push(qi);
3294         ++vb->opsReject;
3295         return NULL;
3296     }
3297
3298     KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
3299     if (!deleted) {
3300         // TODO: Need to separate disk_insert from disk_update because
3301         // bySeqno doesn't give us that information.
3302         BlockTimer timer(bySeqno == -1 ?
3303                          &stats.diskInsertHisto : &stats.diskUpdateHisto,
3304                          bySeqno == -1 ? "disk_insert" : "disk_update",
3305                          stats.timingLog);
3306         PersistenceCallback *cb =
3307             new PersistenceCallback(qi, vb, this, &stats, qi->getCas());
3308         rwUnderlying->set(*qi, *cb);
3309         return cb;
3310     } else {
3311         BlockTimer timer(&stats.diskDelHisto, "disk_delete",
3312                          stats.timingLog);
3313         PersistenceCallback *cb =
3314             new PersistenceCallback(qi, vb, this, &stats, 0);
3315         rwUnderlying->del(*qi, *cb);
3316         return cb;
3317     }
3318 }
3319
3320 void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
3321                                            StoredValue* v,
3322                                            LockHolder *plh,
3323                                            uint64_t *seqno,
3324                                            bool tapBackfill,
3325                                            bool notifyReplicator,
3326                                            bool genBySeqno,
3327                                            bool setConflictMode) {
3328     if (vb) {
3329         if (setConflictMode && (v->getConflictResMode() != last_write_wins) &&
3330             vb->isTimeSyncEnabled()) {
3331             v->setConflictResMode(last_write_wins);
3332         }
3333
3334         queued_item qi(v->toItem(false, vb->getId()));
3335
3336         bool rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
3337                                 vb->checkpointManager.queueDirty(vb, qi,
3338                                                                  genBySeqno);
3339         v->setBySeqno(qi->getBySeqno());
3340
3341         if (seqno) {
3342             *seqno = v->getBySeqno();
3343         }
3344
3345         if (plh) {
3346             plh->unlock();
3347         }
3348
3349         if (rv) {
3350             KVShard* shard = vbMap.getShard(vb->getId());
3351             shard->getFlusher()->notifyFlushEvent();
3352
3353         }
3354         if (!tapBackfill && notifyReplicator) {
3355             engine.getTapConnMap().notifyVBConnections(vb->getId());
3356             engine.getDcpConnMap().notifyVBConnections(vb->getId(),
3357                                                        qi->getBySeqno());
3358         }
3359     }
3360 }
3361
3362 std::vector<vbucket_state *> EventuallyPersistentStore::loadVBucketState()
3363 {
3364     return getOneROUnderlying()->listPersistedVbuckets();
3365 }
3366
3367 void EventuallyPersistentStore::warmupCompleted() {
3368     // Run the vbucket state snapshot job once after the warmup
3369     scheduleVBSnapshot(VBSnapshotTask::Priority::HIGH);
3370
3371     if (engine.getConfiguration().getAlogPath().length() > 0) {
3372
3373         if (engine.getConfiguration().isAccessScannerEnabled()) {
3374             LockHolder lh(accessScanner.mutex);
3375             accessScanner.enabled = true;
3376             lh.unlock();
3377             LOG(EXTENSION_LOG_WARNING, "Access Scanner task enabled");
3378             size_t smin = engine.getConfiguration().getAlogSleepTime();
3379             setAccessScannerSleeptime(smin);
3380         } else {
3381             LockHolder lh(accessScanner.mutex);
3382             accessScanner.enabled = false;
3383             LOG(EXTENSION_LOG_WARNING, "Access Scanner task disabled");
3384         }
3385
3386         Configuration &config = engine.getConfiguration();
3387         config.addValueChangedListener("access_scanner_enabled",
3388                                        new EPStoreValueChangeListener(*this));
3389         config.addValueChangedListener("alog_sleep_time",
3390                                        new EPStoreValueChangeListener(*this));
3391         config.addValueChangedListener("alog_task_time",
3392                                        new EPStoreValueChangeListener(*this));
3393     }
3394
3395     // "0" sleep_time means that the first snapshot task will be executed
3396     // right after warmup. Subsequent snapshot tasks will be scheduled every
3397     // 60 sec by default.
3398     ExecutorPool *iom = ExecutorPool::get();
3399     ExTask task = new StatSnap(&engine, 0, false);
3400     statsSnapshotTaskId = iom->schedule(task, WRITER_TASK_IDX);
3401 }
3402
3403 bool EventuallyPersistentStore::maybeEnableTraffic()
3404 {
3405     // @todo rename.. skal vaere isTrafficDisabled elns
3406     double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
3407     double maxSize = static_cast<double>(stats.getMaxDataSize());
3408
3409     if (memoryUsed  >= stats.mem_low_wat) {
3410         LOG(EXTENSION_LOG_WARNING,
3411             "Total memory use reached to the low water mark, stop warmup");
3412         return true;
3413     } else if (memoryUsed > (maxSize * stats.warmupMemUsedCap)) {
3414         LOG(EXTENSION_LOG_WARNING,
3415                 "Enough MB of data loaded to enable traffic");
3416         return true;
3417     } else if (eviction_policy == VALUE_ONLY &&
3418                stats.warmedUpValues >=
3419                                (stats.warmedUpKeys * stats.warmupNumReadCap)) {
3420         // Let ep-engine think we're done with the warmup phase
3421         // (we should refactor this into "enableTraffic")
3422         LOG(EXTENSION_LOG_WARNING,
3423             "Enough number of items loaded to enable traffic");
3424         return true;
3425     } else if (eviction_policy == FULL_EVICTION &&
3426                stats.warmedUpValues >=
3427                             (warmupTask->getEstimatedItemCount() *
3428                              stats.warmupNumReadCap)) {
3429         // In case of FULL EVICTION, warmed up keys always matches the number
3430         // of warmed up values, therefore for honoring the min_item threshold
3431         // in this scenario, we can consider warmup's estimated item count.
3432         LOG(EXTENSION_LOG_WARNING,
3433             "Enough number of items loaded to enable traffic");
3434         return true;
3435     }
3436     return false;
3437 }
3438
3439 bool EventuallyPersistentStore::isWarmingUp() {
3440     return !warmupTask->isComplete();
3441 }
3442
3443 void EventuallyPersistentStore::stopWarmup(void)
3444 {
3445     // forcefully stop current warmup task
3446     if (isWarmingUp()) {
3447         LOG(EXTENSION_LOG_WARNING, "Stopping warmup while engine is loading "
3448             "data from underlying storage, shutdown = %s\n",
3449             stats.isShutdown ? "yes" : "no");
3450         warmupTask->stop();
3451     }
3452 }
3453
3454 bool EventuallyPersistentStore::isMemoryUsageTooHigh() {
3455     double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
3456     double maxSize = static_cast<double>(stats.getMaxDataSize());
3457     return memoryUsed > (maxSize * backfillMemoryThreshold);
3458 }
3459
3460 void EventuallyPersistentStore::setBackfillMemoryThreshold(
3461                                                 double threshold) {
3462     backfillMemoryThreshold = threshold;
3463 }
3464
3465 void EventuallyPersistentStore::setExpiryPagerSleeptime(size_t val) {
3466     LockHolder lh(expiryPager.mutex);
3467
3468     if (expiryPager.sleeptime != 0) {
3469         ExecutorPool::get()->cancel(expiryPager.task);
3470     }
3471
3472     expiryPager.sleeptime = val;
3473     if (val != 0) {
3474         ExTask expTask = new ExpiredItemPager(&engine, stats,
3475                                                 expiryPager.sleeptime);
3476         expiryPager.task = ExecutorPool::get()->schedule(expTask,
3477                                                         NONIO_TASK_IDX);
3478     }
3479 }
3480
3481 void EventuallyPersistentStore::enableAccessScannerTask() {
3482     LockHolder lh(accessScanner.mutex);
3483     if (!accessScanner.enabled) {
3484         accessScanner.enabled = true;
3485
3486         if (accessScanner.sleeptime != 0) {
3487             ExecutorPool::get()->cancel(accessScanner.task);
3488         }
3489
3490         size_t alogSleepTime = engine.getConfiguration().getAlogSleepTime();
3491         accessScanner.sleeptime = alogSleepTime * 60;
3492         if (accessScanner.sleeptime != 0) {
3493             ExTask task = new AccessScanner(*this, stats,
3494                                             accessScanner.sleeptime);
3495             accessScanner.task = ExecutorPool::get()->schedule(task,
3496                                                                AUXIO_TASK_IDX);
3497             struct timeval tv;
3498             gettimeofday(&tv, NULL);
3499             advance_tv(tv, accessScanner.sleeptime);
3500             stats.alogTime.store(tv.tv_sec);
3501         } else {
3502             LOG(EXTENSION_LOG_WARNING, "Did not enable access scanner task, "
3503                                        "as alog_sleep_time is set to zero!");
3504         }
3505     } else {
3506         LOG(EXTENSION_LOG_DEBUG, "Access scanner already enabled!");
3507     }
3508 }
3509
3510 void EventuallyPersistentStore::disableAccessScannerTask() {
3511     LockHolder lh(accessScanner.mutex);
3512     if (accessScanner.enabled) {
3513         ExecutorPool::get()->cancel(accessScanner.task);
3514         accessScanner.sleeptime = 0;
3515         accessScanner.enabled = false;
3516     } else {
3517         LOG(EXTENSION_LOG_DEBUG, "Access scanner already disabled!");
3518     }
3519 }
3520
3521 void EventuallyPersistentStore::setAccessScannerSleeptime(size_t val) {
3522     LockHolder lh(accessScanner.mutex);
3523
3524     if (accessScanner.enabled) {
3525         if (accessScanner.sleeptime != 0) {
3526             ExecutorPool::get()->cancel(accessScanner.task);
3527         }
3528
3529         // store sleeptime in seconds
3530         accessScanner.sleeptime = val * 60;
3531         if (accessScanner.sleeptime != 0) {
3532             ExTask task = new AccessScanner(*this, stats,
3533                                             accessScanner.sleeptime);
3534             accessScanner.task = ExecutorPool::get()->schedule(task,
3535                                                                AUXIO_TASK_IDX);
3536
3537             struct timeval tv;
3538             gettimeofday(&tv, NULL);
3539             advance_tv(tv, accessScanner.sleeptime);
3540             stats.alogTime.store(tv.tv_sec);
3541         }
3542     }
3543 }
3544
3545 void EventuallyPersistentStore::resetAccessScannerStartTime() {
3546     LockHolder lh(accessScanner.mutex);
3547
3548     if (accessScanner.enabled) {
3549         if (accessScanner.sleeptime != 0) {
3550             ExecutorPool::get()->cancel(accessScanner.task);
3551             // re-schedule task according to the new task start hour
3552             ExTask task = new AccessScanner(*this, stats,
3553                                             accessScanner.sleeptime);
3554             accessScanner.task = ExecutorPool::get()->schedule(task,
3555                                                                AUXIO_TASK_IDX);
3556
3557             struct timeval tv;
3558             gettimeofday(&tv, NULL);
3559             advance_tv(tv, accessScanner.sleeptime);
3560             stats.alogTime.store(tv.tv_sec);
3561         }
3562     }
3563 }
3564
3565 void EventuallyPersistentStore::setAllBloomFilters(bool to) {
3566     size_t maxSize = vbMap.getSize();
3567     cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
3568     for (size_t i = 0; i < maxSize; i++) {
3569         uint16_t vbid = static_cast<uint16_t>(i);
3570         RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3571         if (vb) {
3572             if (to) {
3573                 vb->setFilterStatus(BFILTER_ENABLED);
3574             } else {
3575                 vb->setFilterStatus(BFILTER_DISABLED);
3576             }
3577         }
3578     }
3579 }
3580
3581 void EventuallyPersistentStore::visit(VBucketVisitor &visitor)
3582 {
3583     size_t maxSize = vbMap.getSize();
3584     cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
3585     for (size_t i = 0; i < maxSize; ++i) {
3586         uint16_t vbid = static_cast<uint16_t>(i);
3587         RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3588         if (vb) {
3589             bool wantData = visitor.visitBucket(vb);
3590             // We could've lost this along the way.
3591             if (wantData) {
3592                 vb->ht.visit(visitor);
3593             }
3594         }
3595     }
3596     visitor.complete();
3597 }
3598
3599 EventuallyPersistentStore::Position
3600 EventuallyPersistentStore::pauseResumeVisit(PauseResumeEPStoreVisitor& visitor,
3601                                             Position& start_pos)
3602 {
3603     const size_t maxSize = vbMap.getSize();
3604
3605     uint16_t vbid = start_pos.vbucket_id;
3606     for (; vbid < maxSize; ++vbid) {
3607         RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3608         if (vb) {
3609             bool paused = !visitor.visit(vbid, vb->ht);
3610             if (paused) {
3611                 break;
3612             }
3613         }
3614     }
3615
3616     return EventuallyPersistentStore::Position(vbid);
3617 }
3618
3619 EventuallyPersistentStore::Position
3620 EventuallyPersistentStore::startPosition() const
3621 {
3622     return EventuallyPersistentStore::Position(0);
3623 }
3624
3625 EventuallyPersistentStore::Position
3626 EventuallyPersistentStore::endPosition() const
3627 {
3628     return EventuallyPersistentStore::Position(vbMap.getSize());
3629 }
3630
3631 VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s, TaskId id,
3632                          shared_ptr<VBucketVisitor> v,
3633                          const char *l, double sleep) :
3634     GlobalTask(&s->getEPEngine(), id, 0, false), store(s),
3635     visitor(v), label(l), sleepTime(sleep), currentvb(0)
3636 {
3637     const VBucketFilter &vbFilter = visitor->getVBucketFilter();
3638     size_t maxSize = store->vbMap.getSize();
3639     cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
3640     for (size_t i = 0; i < maxSize; ++i) {
3641         uint16_t vbid = static_cast<uint16_t>(i);
3642         RCPtr<VBucket> vb = store->vbMap.getBucket(vbid);
3643         if (vb && vbFilter(vbid)) {
3644             vbList.push(vbid);
3645         }
3646     }
3647 }
3648
3649 bool VBCBAdaptor::run(void) {
3650     if (!vbList.empty()) {
3651         currentvb.store(vbList.front());
3652         RCPtr<VBucket> vb = store->vbMap.getBucket(currentvb);
3653         if (vb) {
3654             if (visitor->pauseVisitor()) {
3655                 snooze(sleepTime);
3656                 return true;
3657             }
3658             if (visitor->visitBucket(vb)) {
3659                 vb->ht.visit(*visitor);
3660             }
3661         }
3662         vbList.pop();
3663     }
3664
3665     bool isdone = vbList.empty();
3666     if (isdone) {
3667         visitor->complete();
3668     }
3669     return !isdone;
3670 }
3671
3672 VBucketVisitorTask::VBucketVisitorTask(EventuallyPersistentStore *s,
3673                                        shared_ptr<VBucketVisitor> v,
3674                                        uint16_t sh, const char *l,
3675                                        double sleep, bool shutdown)
3676     : GlobalTask(&(s->getEPEngine()), TaskId::VBucketVisitorTask, 0, shutdown),
3677       store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
3678       shardID(sh) {
3679     const VBucketFilter &vbFilter = visitor->getVBucketFilter();
3680     std::vector<int> vbs = store->vbMap.getShard(shardID)->getVBuckets();
3681     cb_assert(vbs.size() <= std::numeric_limits<uint16_t>::max());
3682     std::vector<int>::iterator it;
3683     for (it = vbs.begin(); it != vbs.end(); ++it) {
3684         uint16_t vbid = static_cast<uint16_t>(*it);
3685         RCPtr<VBucket> vb = store->vbMap.getBucket(vbid);
3686         if (vb && vbFilter(vbid)) {
3687             vbList.push(vbid);
3688         }
3689     }
3690 }
3691
3692 bool VBucketVisitorTask::run() {
3693     if (!vbList.empty()) {
3694         currentvb = vbList.front();
3695         RCPtr<VBucket> vb = store->vbMap.getBucket(currentvb);
3696         if (vb) {
3697             if (visitor->pauseVisitor()) {
3698                 snooze(sleepTime);
3699                 return true;
3700             }
3701             if (visitor->visitBucket(vb)) {
3702                 vb->ht.visit(*visitor);
3703             }
3704         }
3705         vbList.pop();
3706     }
3707
3708     bool isDone = vbList.empty();
3709     if (isDone) {
3710         visitor->complete();
3711     }
3712     return !isDone;
3713 }
3714
3715 void EventuallyPersistentStore::resetUnderlyingStats(void)
3716 {
3717     for (size_t i = 0; i < vbMap.numShards; i++) {
3718         KVShard *shard = vbMap.shards[i];
3719         shard->getRWUnderlying()->resetStats();
3720         shard->getROUnderlying()->resetStats();
3721     }
3722
3723     for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
3724         stats.schedulingHisto[i].reset();
3725         stats.taskRuntimeHisto[i].reset();
3726     }
3727 }
3728
3729 void EventuallyPersistentStore::addKVStoreStats(ADD_STAT add_stat,
3730                                                 const void* cookie) {
3731     for (size_t i = 0; i < vbMap.numShards; i++) {
3732         std::stringstream rwPrefix;
3733         std::stringstream roPrefix;
3734         rwPrefix << "rw_" << i;
3735         roPrefix << "ro_" << i;
3736         vbMap.shards[i]->getRWUnderlying()->addStats(rwPrefix.str(), add_stat,
3737                                                      cookie);
3738         vbMap.shards[i]->getROUnderlying()->addStats(roPrefix.str(), add_stat,
3739                                                      cookie);
3740     }
3741 }
3742
3743 void EventuallyPersistentStore::addKVStoreTimingStats(ADD_STAT add_stat,
3744                                                       const void* cookie) {
3745     for (size_t i = 0; i < vbMap.numShards; i++) {
3746         std::stringstream rwPrefix;
3747         std::stringstream roPrefix;
3748         rwPrefix << "rw_" << i;
3749         roPrefix << "ro_" << i;
3750         vbMap.shards[i]->getRWUnderlying()->addTimingStats(rwPrefix.str(),
3751                                                            add_stat,
3752                                                            cookie);
3753         vbMap.shards[i]->getROUnderlying()->addTimingStats(roPrefix.str(),
3754                                                            add_stat,
3755                                                            cookie);
3756     }
3757 }
3758
3759 KVStore *EventuallyPersistentStore::getOneROUnderlying(void) {
3760     return vbMap.getShard(EP_PRIMARY_SHARD)->getROUnderlying();
3761 }
3762
3763 KVStore *EventuallyPersistentStore::getOneRWUnderlying(void) {
3764     return vbMap.getShard(EP_PRIMARY_SHARD)->getRWUnderlying();
3765 }
3766
3767 ENGINE_ERROR_CODE
3768 EventuallyPersistentStore::rollback(uint16_t vbid,
3769                                     uint64_t rollbackSeqno) {
3770     LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
3771     if (!lh.islocked()) {
3772         return ENGINE_TMPFAIL; // Reschedule a vbucket rollback task.
3773     }
3774
3775     if (rollbackSeqno != 0) {
3776         shared_ptr<RollbackCB> cb(new RollbackCB(engine));
3777         KVStore* rwUnderlying = vbMap.getShard(vbid)->getRWUnderlying();
3778         RollbackResult result = rwUnderlying->rollback(vbid, rollbackSeqno, cb);
3779
3780         if (result.success) {
3781             RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3782             vb->failovers->pruneEntries(result.highSeqno);
3783             vb->checkpointManager.clear(vb, result.highSeqno);
3784             vb->setPersistedSnapshot(result.snapStartSeqno, result.snapEndSeqno);
3785             return ENGINE_SUCCESS;
3786         }
3787     }
3788
3789     if (resetVBucket(vbid)) {
3790         return ENGINE_SUCCESS;
3791     }
3792     return ENGINE_NOT_MY_VBUCKET;
3793 }
3794
3795 void EventuallyPersistentStore::runDefragmenterTask() {
3796     defragmenterTask->run();
3797 }
3798
3799 std::ostream& operator<<(std::ostream& os,
3800                          const EventuallyPersistentStore::Position& pos) {
3801     os << "vbucket:" << pos.vbucket_id;
3802     return os;
3803 }