050a396553f05c5421f0ff305512475fb804474a
[ep-engine.git] / src / ep.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <string.h>
21 #include <time.h>
22
23 #include <fstream>
24 #include <functional>
25 #include <iostream>
26 #include <map>
27 #include <sstream>
28 #include <string>
29 #include <utility>
30 #include <vector>
31
32 #include "access_scanner.h"
33 #include "checkpoint_remover.h"
34 #include "conflict_resolution.h"
35 #include "ep.h"
36 #include "ep_engine.h"
37 #include "failover-table.h"
38 #include "flusher.h"
39 #include "htresizer.h"
40 #include "kvshard.h"
41 #include "kvstore.h"
42 #include "locks.h"
43 #include "mutation_log.h"
44 #include "warmup.h"
45 #include "connmap.h"
46 #include "tapthrottle.h"
47
48 class StatsValueChangeListener : public ValueChangedListener {
49 public:
50     StatsValueChangeListener(EPStats &st) : stats(st) {
51         // EMPTY
52     }
53
54     virtual void sizeValueChanged(const std::string &key, size_t value) {
55         if (key.compare("max_size") == 0) {
56             stats.setMaxDataSize(value);
57             size_t low_wat = static_cast<size_t>
58                              (static_cast<double>(value) * 0.6);
59             size_t high_wat = static_cast<size_t>(
60                               static_cast<double>(value) * 0.75);
61             stats.mem_low_wat.store(low_wat);
62             stats.mem_high_wat.store(high_wat);
63         } else if (key.compare("mem_low_wat") == 0) {
64             stats.mem_low_wat.store(value);
65         } else if (key.compare("mem_high_wat") == 0) {
66             stats.mem_high_wat.store(value);
67         } else if (key.compare("tap_throttle_threshold") == 0) {
68             stats.tapThrottleThreshold.store(
69                                           static_cast<double>(value) / 100.0);
70         } else if (key.compare("warmup_min_memory_threshold") == 0) {
71             stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
72         } else if (key.compare("warmup_min_items_threshold") == 0) {
73             stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
74         } else {
75             LOG(EXTENSION_LOG_WARNING,
76                 "Failed to change value for unknown variable, %s\n",
77                 key.c_str());
78         }
79     }
80
81 private:
82     EPStats &stats;
83 };
84
85 /**
86  * A configuration value changed listener that responds to ep-engine
87  * parameter changes by invoking engine-specific methods on
88  * configuration change events.
89  */
90 class EPStoreValueChangeListener : public ValueChangedListener {
91 public:
92     EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
93     }
94
95     virtual void sizeValueChanged(const std::string &key, size_t value) {
96         if (key.compare("bg_fetch_delay") == 0) {
97             store.setBGFetchDelay(static_cast<uint32_t>(value));
98         } else if (key.compare("compaction_write_queue_cap") == 0) {
99             store.setCompactionWriteQueueCap(value);
100         } else if (key.compare("exp_pager_stime") == 0) {
101             store.setExpiryPagerSleeptime(value);
102         } else if (key.compare("alog_sleep_time") == 0) {
103             store.setAccessScannerSleeptime(value);
104         } else if (key.compare("alog_task_time") == 0) {
105             store.resetAccessScannerStartTime();
106         } else if (key.compare("mutation_mem_threshold") == 0) {
107             double mem_threshold = static_cast<double>(value) / 100;
108             StoredValue::setMutationMemoryThreshold(mem_threshold);
109         } else if (key.compare("backfill_mem_threshold") == 0) {
110             double backfill_threshold = static_cast<double>(value) / 100;
111             store.setBackfillMemoryThreshold(backfill_threshold);
112         } else if (key.compare("compaction_exp_mem_threshold") == 0) {
113             store.setCompactionExpMemThreshold(value);
114         } else if (key.compare("tap_throttle_queue_cap") == 0) {
115             store.getEPEngine().getTapThrottle().setQueueCap(value);
116         } else if (key.compare("tap_throttle_cap_pcnt") == 0) {
117             store.getEPEngine().getTapThrottle().setCapPercent(value);
118         } else {
119             LOG(EXTENSION_LOG_WARNING,
120                 "Failed to change value for unknown variable, %s\n",
121                 key.c_str());
122         }
123     }
124
125     virtual void booleanValueChanged(const std::string &key, bool value) {
126         if (key.compare("access_scanner_enabled") == 0) {
127             if (value) {
128                 store.enableAccessScannerTask();
129             } else {
130                 store.disableAccessScannerTask();
131             }
132         }
133     }
134
135 private:
136     EventuallyPersistentStore &store;
137 };
138
139 class VBucketMemoryDeletionTask : public GlobalTask {
140 public:
141     VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
142                               RCPtr<VBucket> &vb, double delay) :
143                               GlobalTask(&eng,
144                               Priority::VBMemoryDeletionPriority, delay, true),
145                               e(eng), vbucket(vb), vbid(vb->getId()) { }
146
147     std::string getDescription() {
148         std::stringstream ss;
149         ss << "Removing (dead) vbucket " << vbid << " from memory";
150         return ss.str();
151     }
152
153     bool run(void) {
154         vbucket->notifyAllPendingConnsFailed(e);
155         vbucket->ht.clear();
156         vbucket.reset();
157         return false;
158     }
159
160 private:
161     EventuallyPersistentEngine &e;
162     RCPtr<VBucket> vbucket;
163     uint16_t vbid;
164 };
165
166 class PendingOpsNotification : public GlobalTask {
167 public:
168     PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
169         GlobalTask(&e, Priority::VBMemoryDeletionPriority, 0, false),
170         engine(e), vbucket(vb) { }
171
172     std::string getDescription() {
173         std::stringstream ss;
174         ss << "Notify pending operations for vbucket " << vbucket->getId();
175         return ss.str();
176     }
177
178     bool run(void) {
179         vbucket->fireAllOps(engine);
180         return false;
181     }
182
183 private:
184     EventuallyPersistentEngine &engine;
185     RCPtr<VBucket> vbucket;
186 };
187
188 EventuallyPersistentStore::EventuallyPersistentStore(
189     EventuallyPersistentEngine &theEngine) :
190     engine(theEngine), stats(engine.getEpStats()),
191     vbMap(theEngine.getConfiguration(), *this),
192     bgFetchQueue(0),
193     diskFlushAll(false), bgFetchDelay(0), backfillMemoryThreshold(0.95),
194     statsSnapshotTaskId(0), lastTransTimePerItem(0)
195 {
196     cachedResidentRatio.activeRatio.store(0);
197     cachedResidentRatio.replicaRatio.store(0);
198
199     Configuration &config = engine.getConfiguration();
200     MutationLog *shardlog;
201     for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
202         std::stringstream s;
203         s << i;
204         shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
205                                  "." + s.str(),
206                                  engine.getConfiguration().getAlogBlockSize());
207         accessLog.push_back(shardlog);
208     }
209
210     storageProperties = new StorageProperties(true, true, true, true);
211
212     stats.schedulingHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
213     stats.taskRuntimeHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
214
215     for (size_t i = 0; i < MAX_TYPE_ID; i++) {
216         stats.schedulingHisto[i].reset();
217         stats.taskRuntimeHisto[i].reset();
218     }
219
220     ExecutorPool::get()->registerBucket(ObjectRegistry::getCurrentEngine());
221
222     size_t num_vbs = config.getMaxVbuckets();
223     vb_mutexes = new Mutex[num_vbs];
224     schedule_vbstate_persist = new AtomicValue<bool>[num_vbs];
225     for (size_t i = 0; i < num_vbs; ++i) {
226         schedule_vbstate_persist[i] = false;
227     }
228
229     stats.memOverhead = sizeof(EventuallyPersistentStore);
230
231     if (config.getConflictResolutionType().compare("seqno") == 0) {
232         conflictResolver = new SeqBasedResolution();
233     }
234
235     stats.setMaxDataSize(config.getMaxSize());
236     config.addValueChangedListener("max_size",
237                                    new StatsValueChangeListener(stats));
238
239     stats.mem_low_wat.store(config.getMemLowWat());
240     config.addValueChangedListener("mem_low_wat",
241                                    new StatsValueChangeListener(stats));
242
243     stats.mem_high_wat.store(config.getMemHighWat());
244     config.addValueChangedListener("mem_high_wat",
245                                    new StatsValueChangeListener(stats));
246
247     stats.tapThrottleThreshold.store(static_cast<double>
248                                     (config.getTapThrottleThreshold())
249                                      / 100.0);
250     config.addValueChangedListener("tap_throttle_threshold",
251                                    new StatsValueChangeListener(stats));
252
253     stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
254     config.addValueChangedListener("tap_throttle_queue_cap",
255                                    new EPStoreValueChangeListener(*this));
256     config.addValueChangedListener("tap_throttle_cap_pcnt",
257                                    new EPStoreValueChangeListener(*this));
258
259     setBGFetchDelay(config.getBgFetchDelay());
260     config.addValueChangedListener("bg_fetch_delay",
261                                    new EPStoreValueChangeListener(*this));
262
263     stats.warmupMemUsedCap.store(static_cast<double>
264                                (config.getWarmupMinMemoryThreshold()) / 100.0);
265     config.addValueChangedListener("warmup_min_memory_threshold",
266                                    new StatsValueChangeListener(stats));
267     stats.warmupNumReadCap.store(static_cast<double>
268                                 (config.getWarmupMinItemsThreshold()) / 100.0);
269     config.addValueChangedListener("warmup_min_items_threshold",
270                                    new StatsValueChangeListener(stats));
271
272     double mem_threshold = static_cast<double>
273                                       (config.getMutationMemThreshold()) / 100;
274     StoredValue::setMutationMemoryThreshold(mem_threshold);
275     config.addValueChangedListener("mutation_mem_threshold",
276                                    new EPStoreValueChangeListener(*this));
277
278     double backfill_threshold = static_cast<double>
279                                       (config.getBackfillMemThreshold()) / 100;
280     setBackfillMemoryThreshold(backfill_threshold);
281     config.addValueChangedListener("backfill_mem_threshold",
282                                    new EPStoreValueChangeListener(*this));
283
284     compactionExpMemThreshold = config.getCompactionExpMemThreshold();
285     config.addValueChangedListener("compaction_exp_mem_threshold",
286                                    new EPStoreValueChangeListener(*this));
287
288     compactionWriteQueueCap = config.getCompactionWriteQueueCap();
289     config.addValueChangedListener("compaction_write_queue_cap",
290                                    new EPStoreValueChangeListener(*this));
291
292     const std::string &policy = config.getItemEvictionPolicy();
293     if (policy.compare("value_only") == 0) {
294         eviction_policy = VALUE_ONLY;
295     } else {
296         eviction_policy = FULL_EVICTION;
297     }
298
299     warmupTask = new Warmup(this);
300 }
301
302 bool EventuallyPersistentStore::initialize() {
303     // We should nuke everything unless we want warmup
304     Configuration &config = engine.getConfiguration();
305     if (!config.isWarmup()) {
306         reset();
307     }
308
309     if (!startFlusher()) {
310         LOG(EXTENSION_LOG_WARNING,
311             "FATAL: Failed to create and start flushers");
312         return false;
313     }
314     if (!startBgFetcher()) {
315         LOG(EXTENSION_LOG_WARNING,
316            "FATAL: Failed to create and start bgfetchers");
317         return false;
318     }
319
320     warmupTask->start();
321
322     if (config.isFailpartialwarmup() && stats.warmOOM > 0) {
323         LOG(EXTENSION_LOG_WARNING,
324             "Warmup failed to load %d records due to OOM, exiting.\n",
325             static_cast<unsigned int>(stats.warmOOM));
326         return false;
327     }
328
329     itmpTask = new ItemPager(&engine, stats);
330     ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
331
332     size_t expiryPagerSleeptime = config.getExpPagerStime();
333     setExpiryPagerSleeptime(expiryPagerSleeptime);
334     config.addValueChangedListener("exp_pager_stime",
335                                    new EPStoreValueChangeListener(*this));
336
337     ExTask htrTask = new HashtableResizerTask(this, 10);
338     ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
339
340     size_t checkpointRemoverInterval = config.getChkRemoverStime();
341     chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
342                                                    checkpointRemoverInterval);
343     ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
344
345     ExTask vbSnapshotTask = new DaemonVBSnapshotTask(&engine);
346     ExecutorPool::get()->schedule(vbSnapshotTask, WRITER_TASK_IDX);
347
348     ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
349     ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
350
351     return true;
352 }
353
354 EventuallyPersistentStore::~EventuallyPersistentStore() {
355     stopWarmup();
356     stopBgFetcher();
357     ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX);
358
359     ExecutorPool::get()->cancel(statsSnapshotTaskId);
360     LockHolder lh(accessScanner.mutex);
361     ExecutorPool::get()->cancel(accessScanner.task);
362     lh.unlock();
363
364     stopFlusher();
365     ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine());
366
367     delete [] vb_mutexes;
368     delete [] schedule_vbstate_persist;
369     delete [] stats.schedulingHisto;
370     delete [] stats.taskRuntimeHisto;
371     delete conflictResolver;
372     delete warmupTask;
373     delete storageProperties;
374
375     std::vector<MutationLog*>::iterator it;
376     for (it = accessLog.begin(); it != accessLog.end(); it++) {
377         delete *it;
378     }
379 }
380
381 const Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
382     return vbMap.getShard(shardId)->getFlusher();
383 }
384
385 Warmup* EventuallyPersistentStore::getWarmup(void) const {
386     return warmupTask;
387 }
388
389 bool EventuallyPersistentStore::startFlusher() {
390     for (uint16_t i = 0; i < vbMap.numShards; ++i) {
391         Flusher *flusher = vbMap.shards[i]->getFlusher();
392         flusher->start();
393     }
394     return true;
395 }
396
397 void EventuallyPersistentStore::stopFlusher() {
398     for (uint16_t i = 0; i < vbMap.numShards; i++) {
399         Flusher *flusher = vbMap.shards[i]->getFlusher();
400         bool rv = flusher->stop(stats.forceShutdown);
401         if (rv && !stats.forceShutdown) {
402             flusher->wait();
403         }
404     }
405 }
406
407 bool EventuallyPersistentStore::pauseFlusher() {
408     bool rv = true;
409     for (uint16_t i = 0; i < vbMap.numShards; i++) {
410         Flusher *flusher = vbMap.shards[i]->getFlusher();
411         if (!flusher->pause()) {
412             LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
413                 "[%s], shard = %d", flusher->stateName(), i);
414             rv = false;
415         }
416     }
417     return rv;
418 }
419
420 bool EventuallyPersistentStore::resumeFlusher() {
421     bool rv = true;
422     for (uint16_t i = 0; i < vbMap.numShards; i++) {
423         Flusher *flusher = vbMap.shards[i]->getFlusher();
424         if (!flusher->resume()) {
425             LOG(EXTENSION_LOG_WARNING,
426                     "Warning: attempted to resume flusher in state [%s], "
427                     "shard = %d", flusher->stateName(), i);
428             rv = false;
429         }
430     }
431     return rv;
432 }
433
434 void EventuallyPersistentStore::wakeUpFlusher() {
435     if (stats.diskQueueSize.load() == 0) {
436         for (uint16_t i = 0; i < vbMap.numShards; i++) {
437             Flusher *flusher = vbMap.shards[i]->getFlusher();
438             flusher->wake();
439         }
440     }
441 }
442
443 bool EventuallyPersistentStore::startBgFetcher() {
444     for (uint16_t i = 0; i < vbMap.numShards; i++) {
445         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
446         if (bgfetcher == NULL) {
447             LOG(EXTENSION_LOG_WARNING,
448                 "Falied to start bg fetcher for shard %d", i);
449             return false;
450         }
451         bgfetcher->start();
452     }
453     return true;
454 }
455
456 void EventuallyPersistentStore::stopBgFetcher() {
457     for (uint16_t i = 0; i < vbMap.numShards; i++) {
458         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
459         if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
460             LOG(EXTENSION_LOG_WARNING,
461                 "Shutting down engine while there are still pending data "
462                 "read for shard %d from database storage", i);
463         }
464         LOG(EXTENSION_LOG_INFO, "Stopping bg fetcher for underlying storage");
465         bgfetcher->stop();
466     }
467 }
468
469 RCPtr<VBucket> EventuallyPersistentStore::getVBucket(uint16_t vbid,
470                                                 vbucket_state_t wanted_state) {
471     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
472     vbucket_state_t found_state(vb ? vb->getState() : vbucket_state_dead);
473     if (found_state == wanted_state) {
474         return vb;
475     } else {
476         RCPtr<VBucket> rv;
477         return rv;
478     }
479 }
480
481 void
482 EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
483                                              time_t startTime,
484                                              uint64_t revSeqno) {
485     RCPtr<VBucket> vb = getVBucket(vbid);
486     if (vb) {
487         // Obtain reader access to the VB state change lock so that
488         // the VB can't switch state whilst we're processing
489         ReaderLockHolder rlh(vb->getStateLock());
490         if (vb->getState() == vbucket_state_active) {
491             int bucket_num(0);
492             incExpirationStat(vb);
493             LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
494             StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
495             if (v) {
496                 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
497                     // This is a temporary item whose background fetch for metadata
498                     // has completed.
499                     bool deleted = vb->ht.unlocked_del(key, bucket_num);
500                     cb_assert(deleted);
501                 } else if (v->isExpired(startTime) && !v->isDeleted()) {
502                     vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
503                     queueDirty(vb, v, &lh, false);
504                 }
505             } else {
506                 if (eviction_policy == FULL_EVICTION) {
507                     // Create a temp item and delete and push it
508                     // into the checkpoint queue.
509                     add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
510                                                                 eviction_policy);
511                     if (rv == ADD_NOMEM) {
512                         return;
513                     }
514                     v = vb->ht.unlocked_find(key, bucket_num, true, false);
515                     v->setStoredValueState(StoredValue::state_deleted_key);
516                     v->setRevSeqno(revSeqno);
517                     vb->ht.unlocked_softDelete(v, 0, eviction_policy);
518                     queueDirty(vb, v, &lh, false);
519                 }
520             }
521         }
522     }
523 }
524
525 void
526 EventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
527                                                         std::string> > &keys) {
528     std::list<std::pair<uint16_t, std::string> >::iterator it;
529     time_t startTime = ep_real_time();
530     for (it = keys.begin(); it != keys.end(); it++) {
531         deleteExpiredItem(it->first, it->second, startTime, 0);
532     }
533 }
534
535 StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
536                                                         const std::string &key,
537                                                         int bucket_num,
538                                                         bool wantDeleted,
539                                                         bool trackReference,
540                                                         bool queueExpired) {
541     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
542                                           trackReference);
543     if (v && !v->isDeleted() && !v->isTempItem()) {
544         // In the deleted case, we ignore expiration time.
545         if (v->isExpired(ep_real_time())) {
546             if (vb->getState() != vbucket_state_active) {
547                 return wantDeleted ? v : NULL;
548             }
549             if (queueExpired) {
550                 incExpirationStat(vb, false);
551                 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
552                 queueDirty(vb, v, NULL, false, true);
553             }
554             return wantDeleted ? v : NULL;
555         }
556     }
557     return v;
558 }
559
560 protocol_binary_response_status EventuallyPersistentStore::evictKey(
561                                                         const std::string &key,
562                                                         uint16_t vbucket,
563                                                         const char **msg,
564                                                         size_t *msg_size,
565                                                         bool force) {
566     RCPtr<VBucket> vb = getVBucket(vbucket);
567     if (!vb || (vb->getState() != vbucket_state_active && !force)) {
568         return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
569     }
570
571     int bucket_num(0);
572     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
573     StoredValue *v = fetchValidValue(vb, key, bucket_num, force, false);
574
575     protocol_binary_response_status rv(PROTOCOL_BINARY_RESPONSE_SUCCESS);
576
577     *msg_size = 0;
578     if (v) {
579         if (force)  {
580             v->markClean();
581         }
582         if (v->isResident()) {
583             if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
584                 *msg = "Ejected.";
585             } else {
586                 *msg = "Can't eject: Dirty object.";
587                 rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
588             }
589         } else {
590             *msg = "Already ejected.";
591         }
592     } else {
593         if (eviction_policy == VALUE_ONLY) {
594             *msg = "Not found.";
595             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
596         } else {
597             *msg = "Already ejected.";
598         }
599     }
600
601     return rv;
602 }
603
604 ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
605                                                         LockHolder &lock,
606                                                         int bucket_num,
607                                                         const std::string &key,
608                                                         RCPtr<VBucket> &vb,
609                                                         const void *cookie,
610                                                         bool metadataOnly,
611                                                         bool isReplication) {
612
613     add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
614                                                 eviction_policy,
615                                                 isReplication);
616     switch(rv) {
617         case ADD_NOMEM:
618             return ENGINE_ENOMEM;
619         case ADD_EXISTS:
620         case ADD_UNDEL:
621         case ADD_SUCCESS:
622         case ADD_TMP_AND_BG_FETCH:
623             // Since the hashtable bucket is locked, we shouldn't get here
624             abort();
625         case ADD_BG_FETCH:
626             lock.unlock();
627             bgFetch(key, vb->getId(), -1, cookie, metadataOnly);
628     }
629     return ENGINE_EWOULDBLOCK;
630 }
631
632 ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
633                                                  const void *cookie,
634                                                  bool force,
635                                                  uint8_t nru) {
636
637     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
638     if (!vb || vb->getState() == vbucket_state_dead) {
639         ++stats.numNotMyVBuckets;
640         return ENGINE_NOT_MY_VBUCKET;
641     } else if (vb->getState() == vbucket_state_replica && !force) {
642         ++stats.numNotMyVBuckets;
643         return ENGINE_NOT_MY_VBUCKET;
644     } else if (vb->getState() == vbucket_state_pending && !force) {
645         if (vb->addPendingOp(cookie)) {
646             return ENGINE_EWOULDBLOCK;
647         }
648     }
649
650     bool cas_op = (itm.getCas() != 0);
651     int bucket_num(0);
652     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
653     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
654                                           false);
655     if (v && v->isLocked(ep_current_time()) &&
656         (vb->getState() == vbucket_state_replica ||
657          vb->getState() == vbucket_state_pending)) {
658         v->unlock();
659     }
660     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(),
661                                                 true, false,
662                                                 eviction_policy, nru);
663
664     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
665     switch (mtype) {
666     case NOMEM:
667         ret = ENGINE_ENOMEM;
668         break;
669     case INVALID_CAS:
670     case IS_LOCKED:
671         ret = ENGINE_KEY_EEXISTS;
672         break;
673     case NOT_FOUND:
674         if (cas_op) {
675             ret = ENGINE_KEY_ENOENT;
676             break;
677         }
678         // FALLTHROUGH
679     case WAS_DIRTY:
680         // Even if the item was dirty, push it into the vbucket's open
681         // checkpoint.
682     case WAS_CLEAN:
683         queueDirty(vb, v, &lh);
684         break;
685     case NEED_BG_FETCH:
686         { // CAS operation with non-resident item + full eviction.
687             if (v) {
688                 // temp item is already created. Simply schedule a bg fetch job
689                 lh.unlock();
690                 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
691                 return ENGINE_EWOULDBLOCK;
692             }
693             ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
694                                         cookie, true);
695             break;
696         }
697     case INVALID_VBUCKET:
698         ret = ENGINE_NOT_MY_VBUCKET;
699         break;
700     }
701
702     return ret;
703 }
704
705 ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
706                                                  const void *cookie)
707 {
708     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
709     if (!vb || vb->getState() == vbucket_state_dead ||
710         vb->getState() == vbucket_state_replica) {
711         ++stats.numNotMyVBuckets;
712         return ENGINE_NOT_MY_VBUCKET;
713     } else if(vb->getState() == vbucket_state_pending) {
714         if (vb->addPendingOp(cookie)) {
715             return ENGINE_EWOULDBLOCK;
716         }
717     }
718
719     if (itm.getCas() != 0) {
720         // Adding with a cas value doesn't make sense..
721         return ENGINE_NOT_STORED;
722     }
723
724     int bucket_num(0);
725     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
726     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
727                                           false);
728     add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
729                                            eviction_policy);
730
731     switch (atype) {
732     case ADD_NOMEM:
733         return ENGINE_ENOMEM;
734     case ADD_EXISTS:
735         return ENGINE_NOT_STORED;
736     case ADD_TMP_AND_BG_FETCH:
737         return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
738                                      cookie, true);
739     case ADD_BG_FETCH:
740         lh.unlock();
741         bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
742         return ENGINE_EWOULDBLOCK;
743     case ADD_SUCCESS:
744     case ADD_UNDEL:
745         queueDirty(vb, v, &lh);
746         break;
747     }
748     return ENGINE_SUCCESS;
749 }
750
751 ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
752                                                      const void *cookie) {
753     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
754     if (!vb || vb->getState() == vbucket_state_dead ||
755         vb->getState() == vbucket_state_replica) {
756         ++stats.numNotMyVBuckets;
757         return ENGINE_NOT_MY_VBUCKET;
758     } else if (vb->getState() == vbucket_state_pending) {
759         if (vb->addPendingOp(cookie)) {
760             return ENGINE_EWOULDBLOCK;
761         }
762     }
763
764     int bucket_num(0);
765     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
766     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
767                                           false);
768     if (v) {
769         if (v->isDeleted() || v->isTempDeletedItem() ||
770             v->isTempNonExistentItem()) {
771             return ENGINE_KEY_ENOENT;
772         }
773
774         mutation_type_t mtype;
775         if (eviction_policy == FULL_EVICTION && v->isTempInitialItem()) {
776             mtype = NEED_BG_FETCH;
777         } else {
778             mtype = vb->ht.unlocked_set(v, itm, 0, true, false, eviction_policy,
779                                         0xff);
780         }
781
782         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
783         switch (mtype) {
784             case NOMEM:
785                 ret = ENGINE_ENOMEM;
786                 break;
787             case IS_LOCKED:
788                 ret = ENGINE_KEY_EEXISTS;
789                 break;
790             case INVALID_CAS:
791             case NOT_FOUND:
792                 ret = ENGINE_NOT_STORED;
793                 break;
794                 // FALLTHROUGH
795             case WAS_DIRTY:
796                 // Even if the item was dirty, push it into the vbucket's open
797                 // checkpoint.
798             case WAS_CLEAN:
799                 queueDirty(vb, v, &lh);
800                 break;
801             case NEED_BG_FETCH:
802             {
803                 // temp item is already created. Simply schedule a bg fetch job
804                 lh.unlock();
805                 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
806                 ret = ENGINE_EWOULDBLOCK;
807                 break;
808             }
809             case INVALID_VBUCKET:
810                 ret = ENGINE_NOT_MY_VBUCKET;
811                 break;
812         }
813
814         return ret;
815     } else {
816         if (eviction_policy == VALUE_ONLY) {
817             return ENGINE_KEY_ENOENT;
818         }
819
820         return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
821                                      cookie, false);
822     }
823 }
824
825 ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
826                                                                 uint8_t nru,
827                                                                 bool genBySeqno) {
828
829     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
830     if (!vb ||
831         vb->getState() == vbucket_state_dead ||
832         vb->getState() == vbucket_state_active) {
833         ++stats.numNotMyVBuckets;
834         return ENGINE_NOT_MY_VBUCKET;
835     }
836
837     int bucket_num(0);
838     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
839     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
840                                           false);
841
842     // Note that this function is only called on replica or pending vbuckets.
843     if (v && v->isLocked(ep_current_time())) {
844         v->unlock();
845     }
846     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, 0, true, true,
847                                                 eviction_policy, nru);
848
849     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
850     switch (mtype) {
851     case NOMEM:
852         ret = ENGINE_ENOMEM;
853         break;
854     case INVALID_CAS:
855     case IS_LOCKED:
856         ret = ENGINE_KEY_EEXISTS;
857         break;
858     case WAS_DIRTY:
859         // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
860         // set correctly, and also the sequence numbers are ordered correctly.
861         // (MB-14003)
862     case NOT_FOUND:
863         // FALLTHROUGH
864     case WAS_CLEAN:
865         queueDirty(vb, v, &lh, true, true, genBySeqno);
866         break;
867     case INVALID_VBUCKET:
868         ret = ENGINE_NOT_MY_VBUCKET;
869         break;
870     case NEED_BG_FETCH:
871         // SET on a non-active vbucket should not require a bg_metadata_fetch.
872         abort();
873     }
874
875     return ret;
876 }
877
878 class KVStatsCallback : public Callback<kvstats_ctx> {
879     public:
880         KVStatsCallback(EventuallyPersistentStore *store)
881             : epstore(store) { }
882
883         void callback(kvstats_ctx &ctx) {
884             RCPtr<VBucket> vb = epstore->getVBucket(ctx.vbucket);
885             if (vb) {
886                 vb->fileSpaceUsed = ctx.fileSpaceUsed;
887                 vb->fileSize = ctx.fileSize;
888             }
889         }
890
891     private:
892         EventuallyPersistentStore *epstore;
893 };
894
895 void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
896                                                  uint16_t shardId) {
897
898     class VBucketStateVisitor : public VBucketVisitor {
899     public:
900         VBucketStateVisitor(VBucketMap &vb_map, uint16_t sid)
901             : vbuckets(vb_map), shardId(sid) { }
902         bool visitBucket(RCPtr<VBucket> &vb) {
903             if (vbuckets.getShard(vb->getId())->getId() == shardId) {
904                 uint64_t snapStart = 0;
905                 uint64_t snapEnd = 0;
906                 std::string failovers = vb->failovers->toJSON();
907                 uint64_t chkId = vbuckets.getPersistenceCheckpointId(vb->getId());
908
909                 vb->getCurrentSnapshot(snapStart, snapEnd);
910                 vbucket_state vb_state(vb->getState(), chkId, 0,
911                                        vb->getHighSeqno(), vb->getPurgeSeqno(),
912                                        snapStart, snapEnd, failovers);
913                 states.insert(std::pair<uint16_t, vbucket_state>(vb->getId(), vb_state));
914             }
915             return false;
916         }
917
918         void visit(StoredValue*) {
919             cb_assert(false); // this does not happen
920         }
921
922         std::map<uint16_t, vbucket_state> states;
923
924     private:
925         VBucketMap &vbuckets;
926         uint16_t shardId;
927     };
928
929     KVShard *shard = vbMap.shards[shardId];
930     if (priority == Priority::VBucketPersistLowPriority) {
931         shard->setLowPriorityVbSnapshotFlag(false);
932     } else {
933         shard->setHighPriorityVbSnapshotFlag(false);
934     }
935
936     KVStatsCallback kvcb(this);
937     VBucketStateVisitor v(vbMap, shard->getId());
938     visit(v);
939     hrtime_t start = gethrtime();
940
941     bool success = true;
942     vbucket_map_t::reverse_iterator iter = v.states.rbegin();
943     for (; iter != v.states.rend(); ++iter) {
944         LockHolder lh(vb_mutexes[iter->first], true /*tryLock*/);
945         if (!lh.islocked()) {
946             continue;
947         }
948         KVStore *rwUnderlying = getRWUnderlying(iter->first);
949         if (!rwUnderlying->snapshotVBucket(iter->first, iter->second,
950                     &kvcb)) {
951             LOG(EXTENSION_LOG_WARNING,
952                     "VBucket snapshot task failed!!! Rescheduling");
953             success = false;
954             break;
955         }
956
957         if (priority == Priority::VBucketPersistHighPriority) {
958             if (vbMap.setBucketCreation(iter->first, false)) {
959                 LOG(EXTENSION_LOG_INFO, "VBucket %d created", iter->first);
960             }
961         }
962     }
963
964     if (!success) {
965         scheduleVBSnapshot(priority, shard->getId());
966     } else {
967         stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
968     }
969 }
970
971 bool EventuallyPersistentStore::persistVBState(const Priority &priority,
972                                                uint16_t vbid) {
973     schedule_vbstate_persist[vbid] = false;
974
975     RCPtr<VBucket> vb = getVBucket(vbid);
976     if (!vb) {
977         LOG(EXTENSION_LOG_WARNING,
978             "VBucket %d not exist!!! vb_state persistence task failed!!!", vbid);
979         return false;
980     }
981
982     bool inverse = false;
983     LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
984     if (!lh.islocked()) {
985         if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
986                                                                    true)) {
987             return true;
988         } else {
989             return false;
990         }
991     }
992
993     KVStatsCallback kvcb(this);
994     uint64_t chkId = vbMap.getPersistenceCheckpointId(vbid);
995     std::string failovers = vb->failovers->toJSON();
996     uint64_t snapStart = 0;
997     uint64_t snapEnd = 0;
998
999     vb->getCurrentSnapshot(snapStart, snapEnd);
1000     vbucket_state vb_state(vb->getState(), chkId, 0, vb->getHighSeqno(),
1001                            vb->getPurgeSeqno(), snapStart, snapEnd, failovers);
1002
1003     KVStore *rwUnderlying = getRWUnderlying(vbid);
1004     if (rwUnderlying->snapshotVBucket(vbid, vb_state, &kvcb)) {
1005         if (vbMap.setBucketCreation(vbid, false)) {
1006             LOG(EXTENSION_LOG_INFO, "VBucket %d created", vbid);
1007         }
1008     } else {
1009         LOG(EXTENSION_LOG_WARNING,
1010             "VBucket %d: vb_state persistence task failed!!! Rescheduling", vbid);
1011
1012         if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
1013                                                                    true)) {
1014             return true;
1015         } else {
1016             return false;
1017         }
1018     }
1019     return false;
1020 }
1021
1022 ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
1023                                                            vbucket_state_t to,
1024                                                            bool transfer,
1025                                                            bool notify_dcp) {
1026     // Lock to prevent a race condition between a failed update and add.
1027     LockHolder lh(vbsetMutex);
1028     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1029     if (vb && to == vb->getState()) {
1030         return ENGINE_SUCCESS;
1031     }
1032
1033     if (vb) {
1034         vbucket_state_t oldstate = vb->getState();
1035         if (oldstate != to && notify_dcp) {
1036             engine.getDcpConnMap().vbucketStateChanged(vbid, to);
1037         }
1038
1039         vb->setState(to, engine.getServerApi());
1040         if (to == vbucket_state_active && !transfer) {
1041             uint64_t snapStart = 0;
1042             uint64_t snapEnd = 0;
1043             vb->getCurrentSnapshot(snapStart, snapEnd);
1044             if (snapEnd == vbMap.getPersistenceSeqno(vbid)) {
1045                 vb->failovers->createEntry(snapEnd);
1046             } else {
1047                 vb->failovers->createEntry(snapStart);
1048             }
1049         }
1050
1051         lh.unlock();
1052         if (oldstate == vbucket_state_pending &&
1053             to == vbucket_state_active) {
1054             ExTask notifyTask = new PendingOpsNotification(engine, vb);
1055             ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
1056         }
1057         scheduleVBStatePersist(Priority::VBucketPersistLowPriority, vbid);
1058     } else {
1059         FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
1060         RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
1061                                          engine.getCheckpointConfig(),
1062                                          vbMap.getShard(vbid), 0, 0, 0, ft));
1063         // The first checkpoint for active vbucket should start with id 2.
1064         uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
1065         newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
1066         if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
1067             lh.unlock();
1068             return ENGINE_ERANGE;
1069         }
1070         vbMap.setPersistenceCheckpointId(vbid, 0);
1071         vbMap.setPersistenceSeqno(vbid, 0);
1072         vbMap.setBucketCreation(vbid, true);
1073         lh.unlock();
1074         scheduleVBStatePersist(Priority::VBucketPersistHighPriority, vbid);
1075     }
1076     return ENGINE_SUCCESS;
1077 }
1078
1079 bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
1080     KVShard *shard = NULL;
1081     if (p == Priority::VBucketPersistHighPriority) {
1082         for (size_t i = 0; i < vbMap.numShards; ++i) {
1083             shard = vbMap.shards[i];
1084             if (shard->setHighPriorityVbSnapshotFlag(true)) {
1085                 ExTask task = new VBSnapshotTask(&engine, p, i, false);
1086                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1087             }
1088         }
1089     } else {
1090         for (size_t i = 0; i < vbMap.numShards; ++i) {
1091             shard = vbMap.shards[i];
1092             if (shard->setLowPriorityVbSnapshotFlag(true)) {
1093                 ExTask task = new VBSnapshotTask(&engine, p, i, false);
1094                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1095             }
1096         }
1097     }
1098     if (stats.isShutdown) {
1099         return false;
1100     }
1101     return true;
1102 }
1103
1104 void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p,
1105                                                    uint16_t shardId,
1106                                                    bool force) {
1107     KVShard *shard = vbMap.shards[shardId];
1108     if (p == Priority::VBucketPersistHighPriority) {
1109         if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
1110             ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
1111             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1112         }
1113     } else {
1114         if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
1115             ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
1116             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1117         }
1118     }
1119 }
1120
1121 void EventuallyPersistentStore::scheduleVBStatePersist(const Priority &priority,
1122                                                        uint16_t vbid,
1123                                                        bool force) {
1124     bool inverse = false;
1125     if (force ||
1126         schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
1127         ExTask task = new VBStatePersistTask(&engine, priority, vbid, false);
1128         ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1129     }
1130 }
1131
1132 bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
1133                                                         const void* cookie) {
1134     LockHolder lh(vbsetMutex);
1135
1136     hrtime_t start_time(gethrtime());
1137     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1138     if (!vb || vb->getState() == vbucket_state_dead ||
1139          vbMap.isBucketDeletion(vbid)) {
1140         lh.unlock();
1141         LockHolder vlh(vb_mutexes[vbid]);
1142         getRWUnderlying(vbid)->delVBucket(vbid);
1143         vbMap.setBucketDeletion(vbid, false);
1144         vbMap.setPersistenceSeqno(vbid, 0);
1145         ++stats.vbucketDeletions;
1146     }
1147
1148     hrtime_t spent(gethrtime() - start_time);
1149     hrtime_t wall_time = spent / 1000;
1150     BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
1151     stats.diskVBDelHisto.add(wall_time);
1152     atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
1153     stats.vbucketDelTotWalltime.fetch_add(wall_time);
1154     if (cookie) {
1155         engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1156     }
1157
1158     return true;
1159 }
1160
1161 void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
1162                                                    const void* cookie,
1163                                                    double delay) {
1164     ExTask delTask = new VBucketMemoryDeletionTask(engine, vb, delay);
1165     ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
1166
1167     if (vbMap.setBucketDeletion(vb->getId(), true)) {
1168         ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie,
1169                                        Priority::VBucketDeletionPriority);
1170         ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1171     }
1172 }
1173
1174 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteVBucket(uint16_t vbid,
1175                                                            const void* c) {
1176     // Lock to prevent a race condition between a failed update and add
1177     // (and delete).
1178     LockHolder lh(vbsetMutex);
1179
1180     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1181     if (!vb) {
1182         return ENGINE_NOT_MY_VBUCKET;
1183     }
1184
1185     engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
1186     vbMap.removeBucket(vbid);
1187     lh.unlock();
1188     scheduleVBDeletion(vb, c);
1189     if (c) {
1190         return ENGINE_EWOULDBLOCK;
1191     }
1192     return ENGINE_SUCCESS;
1193 }
1194
1195 ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
1196                                                        compaction_ctx c,
1197                                                        const void *cookie) {
1198     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1199     if (!vb) {
1200         return ENGINE_NOT_MY_VBUCKET;
1201     }
1202
1203     LockHolder lh(compactionLock);
1204     ExTask task = new CompactVBucketTask(&engine, Priority::CompactorPriority,
1205                                          vbid, c, cookie);
1206     compactionTasks.push_back(std::make_pair(vbid, task));
1207     if (compactionTasks.size() > 1) {
1208         if ((stats.diskQueueSize > compactionWriteQueueCap &&
1209             compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
1210             engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
1211             // Snooze a new compaction task.
1212             // We will wake it up when one of the existing compaction tasks is done.
1213             task->snooze(60);
1214         }
1215     }
1216
1217     ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1218
1219     LOG(EXTENSION_LOG_DEBUG, "Scheduled compaction task %d on vbucket %d,"
1220         "purge_before_ts = %lld, purge_before_seq = %lld, dropdeletes = %d",
1221         task->getId(), vbid, c.purge_before_ts,
1222         c.purge_before_seq, c.drop_deletes);
1223
1224    return ENGINE_EWOULDBLOCK;
1225 }
1226
1227 class ExpiredItemsCallback : public Callback<compaction_ctx> {
1228     public:
1229         ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid)
1230             : epstore(store), vbucket(vbid) { }
1231
1232         void callback(compaction_ctx &ctx) {
1233             std::list<expiredItemCtx>::iterator it;
1234             for (it  = ctx.expiredItems.begin();
1235                  it != ctx.expiredItems.end(); it++) {
1236                 if (epstore->compactionCanExpireItems()) {
1237                     epstore->deleteExpiredItem(vbucket, it->keyStr,
1238                                                ctx.curr_time,
1239                                                it->revSeqno);
1240                 }
1241             }
1242         }
1243
1244     private:
1245         EventuallyPersistentStore *epstore;
1246         uint16_t vbucket;
1247 };
1248
1249 bool EventuallyPersistentStore::compactVBucket(const uint16_t vbid,
1250                                                compaction_ctx *ctx,
1251                                                const void *cookie) {
1252     ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1253     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1254     if (vb) {
1255         LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1256         if (!lh.islocked()) {
1257             return true; // Schedule a compaction task again.
1258         }
1259
1260         if (vb->getState() == vbucket_state_active) {
1261             // Set the current time ONLY for active vbuckets.
1262             ctx->curr_time = ep_real_time();
1263         } else {
1264             ctx->curr_time = 0;
1265         }
1266         ExpiredItemsCallback cb(this, vbid);
1267         KVStatsCallback kvcb(this);
1268         getRWUnderlying(vbid)->compactVBucket(vbid, ctx, cb, kvcb);
1269         vb->setPurgeSeqno(ctx->max_purged_seq);
1270     } else {
1271         err = ENGINE_NOT_MY_VBUCKET;
1272         engine.storeEngineSpecific(cookie, NULL);
1273         //Decrement session counter here, as memcached thread wouldn't
1274         //visit the engine interface in case of a NOT_MY_VB notification
1275         engine.decrementSessionCtr();
1276
1277     }
1278
1279     LockHolder lh(compactionLock);
1280     bool erased = false, woke = false;
1281     std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
1282     while (it != compactionTasks.end()) {
1283         if ((*it).first == vbid) {
1284             it = compactionTasks.erase(it);
1285             erased = true;
1286         } else {
1287             ExTask &task = (*it).second;
1288             if (task->getState() == TASK_SNOOZED) {
1289                 ExecutorPool::get()->wake(task->getId());
1290                 woke = true;
1291             }
1292             ++it;
1293         }
1294         if (erased && woke) {
1295             break;
1296         }
1297     }
1298     lh.unlock();
1299
1300     if (cookie) {
1301         engine.notifyIOComplete(cookie, err);
1302     }
1303     --stats.pendingCompactions;
1304     return false;
1305 }
1306
1307 bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
1308     LockHolder lh(vbsetMutex);
1309     bool rv(false);
1310
1311     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1312     if (vb) {
1313         vbucket_state_t vbstate = vb->getState();
1314
1315         vbMap.removeBucket(vbid);
1316         lh.unlock();
1317
1318         std::list<std::string> tap_cursors = vb->checkpointManager.
1319                                              getTAPCursorNames();
1320         // Delete and recreate the vbucket database file
1321         scheduleVBDeletion(vb, NULL, 0);
1322         setVBucketState(vbid, vbstate, false);
1323
1324         // Copy the all cursors from the old vbucket into the new vbucket
1325         RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
1326         newvb->checkpointManager.resetTAPCursors(tap_cursors);
1327
1328         rv = true;
1329     }
1330     return rv;
1331 }
1332
1333 extern "C" {
1334
1335     typedef struct {
1336         EventuallyPersistentEngine* engine;
1337         std::map<std::string, std::string> smap;
1338     } snapshot_stats_t;
1339
1340     static void add_stat(const char *key, const uint16_t klen,
1341                          const char *val, const uint32_t vlen,
1342                          const void *cookie) {
1343         cb_assert(cookie);
1344         void *ptr = const_cast<void *>(cookie);
1345         snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1346         ObjectRegistry::onSwitchThread(snap->engine);
1347
1348         std::string k(key, klen);
1349         std::string v(val, vlen);
1350         snap->smap.insert(std::pair<std::string, std::string>(k, v));
1351     }
1352 }
1353
1354 void EventuallyPersistentStore::snapshotStats() {
1355     snapshot_stats_t snap;
1356     snap.engine = &engine;
1357     std::map<std::string, std::string>  smap;
1358     bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1359               engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1360               engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS;
1361
1362     if (rv && stats.isShutdown) {
1363         snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1364                                                               "true" : "false";
1365         std::stringstream ss;
1366         ss << ep_real_time();
1367         snap.smap["ep_shutdown_time"] = ss.str();
1368     }
1369     getOneRWUnderlying()->snapshotStats(snap.smap);
1370 }
1371
1372 void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
1373                                               const hrtime_t start,
1374                                               const hrtime_t stop) {
1375     if (stop >= start && start >= init) {
1376         // skip the measurement if the counter wrapped...
1377         ++stats.bgNumOperations;
1378         hrtime_t w = (start - init) / 1000;
1379         BlockTimer::log(start - init, "bgwait", stats.timingLog);
1380         stats.bgWaitHisto.add(w);
1381         stats.bgWait.fetch_add(w);
1382         atomic_setIfLess(stats.bgMinWait, w);
1383         atomic_setIfBigger(stats.bgMaxWait, w);
1384
1385         hrtime_t l = (stop - start) / 1000;
1386         BlockTimer::log(stop - start, "bgload", stats.timingLog);
1387         stats.bgLoadHisto.add(l);
1388         stats.bgLoad.fetch_add(l);
1389         atomic_setIfLess(stats.bgMinLoad, l);
1390         atomic_setIfBigger(stats.bgMaxLoad, l);
1391     }
1392 }
1393
1394 void EventuallyPersistentStore::completeBGFetch(const std::string &key,
1395                                                 uint16_t vbucket,
1396                                                 uint64_t rowid,
1397                                                 const void *cookie,
1398                                                 hrtime_t init,
1399                                                 bool isMeta) {
1400     hrtime_t start(gethrtime());
1401     // Go find the data
1402     RememberingCallback<GetValue> gcb;
1403     if (isMeta) {
1404         gcb.val.setPartial();
1405         ++stats.bg_meta_fetched;
1406     } else {
1407         ++stats.bg_fetched;
1408     }
1409     getROUnderlying(vbucket)->get(key, rowid, vbucket, gcb);
1410     gcb.waitForValue();
1411     cb_assert(gcb.fired);
1412     ENGINE_ERROR_CODE status = gcb.val.getStatus();
1413
1414     // Lock to prevent a race condition between a fetch for restore and delete
1415     LockHolder lh(vbsetMutex);
1416
1417     RCPtr<VBucket> vb = getVBucket(vbucket);
1418     if (vb) {
1419         int bucket_num(0);
1420         LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1421         StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1422         if (isMeta) {
1423             if (v && v->unlocked_restoreMeta(gcb.val.getValue(),
1424                                              gcb.val.getStatus(), vb->ht)) {
1425                 status = ENGINE_SUCCESS;
1426             }
1427         } else {
1428             bool restore = false;
1429             if (v && v->isResident()) {
1430                 status = ENGINE_SUCCESS;
1431             } else {
1432                 switch (eviction_policy) {
1433                     case VALUE_ONLY:
1434                         if (v && !v->isResident() && !v->isDeleted()) {
1435                             restore = true;
1436                         }
1437                         break;
1438                     case FULL_EVICTION:
1439                         if (v) {
1440                             if (v->isTempInitialItem() ||
1441                                 (!v->isResident() && !v->isDeleted())) {
1442                                 restore = true;
1443                             }
1444                         }
1445                         break;
1446                     default:
1447                         throw std::logic_error("Unknown eviction policy");
1448                 }
1449             }
1450
1451             if (restore) {
1452                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1453                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1454                     cb_assert(v->isResident());
1455                     if (vb->getState() == vbucket_state_active &&
1456                         v->getExptime() != gcb.val.getValue()->getExptime() &&
1457                         v->getCas() == gcb.val.getValue()->getCas()) {
1458                         // MB-9306: It is possible that by the time bgfetcher
1459                         // returns, the item may have been updated and queued
1460                         // Hence test the CAS value to be the same first.
1461                         // exptime mutated, schedule it into new checkpoint
1462                         queueDirty(vb, v, &hlh);
1463                     }
1464                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1465                     v->setStoredValueState(
1466                                           StoredValue::state_non_existent_key);
1467                     if (eviction_policy == FULL_EVICTION) {
1468                         // For the full eviction, we should notify
1469                         // ENGINE_SUCCESS to the memcached worker thread, so
1470                         // that the worker thread can visit the ep-engine and
1471                         // figure out the correct error code.
1472                         status = ENGINE_SUCCESS;
1473                     }
1474                 } else {
1475                     // underlying kvstore couldn't fetch requested data
1476                     // log returned error and notify TMPFAIL to client
1477                     LOG(EXTENSION_LOG_WARNING,
1478                         "Warning: failed background fetch for vb=%d seq=%d "
1479                         "key=%s", vbucket, v->getBySeqno(), key.c_str());
1480                     status = ENGINE_TMPFAIL;
1481                 }
1482             }
1483         }
1484     } else {
1485         LOG(EXTENSION_LOG_INFO, "VBucket %d's file was deleted in the middle of"
1486             " a bg fetch for key %s\n", vbucket, key.c_str());
1487         status = ENGINE_NOT_MY_VBUCKET;
1488     }
1489
1490     lh.unlock();
1491
1492     hrtime_t stop = gethrtime();
1493     updateBGStats(init, start, stop);
1494     bgFetchQueue--;
1495
1496     delete gcb.val.getValue();
1497     engine.notifyIOComplete(cookie, status);
1498 }
1499
1500 void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
1501                                  std::vector<bgfetched_item_t> &fetchedItems,
1502                                  hrtime_t startTime)
1503 {
1504     RCPtr<VBucket> vb = getVBucket(vbId);
1505     if (!vb) {
1506         std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1507         for (; itemItr != fetchedItems.end(); ++itemItr) {
1508             engine.notifyIOComplete((*itemItr).second->cookie,
1509                                     ENGINE_NOT_MY_VBUCKET);
1510         }
1511         LOG(EXTENSION_LOG_WARNING,
1512             "EP Store completes %d of batched background fetch for "
1513             "for vBucket = %d that is already deleted\n",
1514             (int)fetchedItems.size(), vbId);
1515         return;
1516     }
1517
1518     std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1519     for (; itemItr != fetchedItems.end(); ++itemItr) {
1520         VBucketBGFetchItem *bgitem = (*itemItr).second;
1521         ENGINE_ERROR_CODE status = bgitem->value.getStatus();
1522         Item *fetchedValue = bgitem->value.getValue();
1523         const std::string &key = (*itemItr).first;
1524
1525         int bucket = 0;
1526         LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
1527         StoredValue *v = fetchValidValue(vb, key, bucket, true);
1528         if (bgitem->metaDataOnly) {
1529             if (v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht)) {
1530                 status = ENGINE_SUCCESS;
1531             }
1532         } else {
1533             bool restore = false;
1534             if (v && v->isResident()) {
1535                 status = ENGINE_SUCCESS;
1536             } else {
1537                 switch (eviction_policy) {
1538                     case VALUE_ONLY:
1539                         if (v && !v->isResident() && !v->isDeleted()) {
1540                             restore = true;
1541                         }
1542                         break;
1543                     case FULL_EVICTION:
1544                         if (v) {
1545                             if (v->isTempInitialItem() ||
1546                                 (!v->isResident() && !v->isDeleted())) {
1547                                 restore = true;
1548                             }
1549                         }
1550                         break;
1551                     default:
1552                         throw std::logic_error("Unknown eviction policy");
1553                 }
1554             }
1555
1556             if (restore) {
1557                 if (status == ENGINE_SUCCESS) {
1558                     v->unlocked_restoreValue(fetchedValue, vb->ht);
1559                     cb_assert(v->isResident());
1560                     if (vb->getState() == vbucket_state_active &&
1561                         v->getExptime() != fetchedValue->getExptime() &&
1562                         v->getCas() == fetchedValue->getCas()) {
1563                         // MB-9306: It is possible that by the time
1564                         // bgfetcher returns, the item may have been
1565                         // updated and queued
1566                         // Hence test the CAS value to be the same first.
1567                         // exptime mutated, schedule it into new checkpoint
1568                         queueDirty(vb, v, &blh);
1569                     }
1570                 } else if (status == ENGINE_KEY_ENOENT) {
1571                     v->setStoredValueState(StoredValue::state_non_existent_key);
1572                     if (eviction_policy == FULL_EVICTION) {
1573                         // For the full eviction, we should notify
1574                         // ENGINE_SUCCESS to the memcached worker thread,
1575                         // so that the worker thread can visit the
1576                         // ep-engine and figure out the correct error
1577                         // code.
1578                         status = ENGINE_SUCCESS;
1579                     }
1580                 } else {
1581                     // underlying kvstore couldn't fetch requested data
1582                     // log returned error and notify TMPFAIL to client
1583                     LOG(EXTENSION_LOG_WARNING,
1584                         "Warning: failed background fetch for vb=%d "
1585                         "key=%s", vbId, key.c_str());
1586                     status = ENGINE_TMPFAIL;
1587                 }
1588             }
1589         }
1590         blh.unlock();
1591
1592         if (bgitem->metaDataOnly) {
1593             ++stats.bg_meta_fetched;
1594         } else {
1595             ++stats.bg_fetched;
1596         }
1597
1598         hrtime_t endTime = gethrtime();
1599         updateBGStats(bgitem->initTime, startTime, endTime);
1600         engine.notifyIOComplete(bgitem->cookie, status);
1601     }
1602
1603     LOG(EXTENSION_LOG_DEBUG,
1604         "EP Store completes %d of batched background fetch "
1605         "for vBucket = %d endTime = %lld\n",
1606         fetchedItems.size(), vbId, gethrtime()/1000000);
1607 }
1608
1609 void EventuallyPersistentStore::bgFetch(const std::string &key,
1610                                         uint16_t vbucket,
1611                                         uint64_t rowid,
1612                                         const void *cookie,
1613                                         bool isMeta) {
1614     std::stringstream ss;
1615
1616     if (multiBGFetchEnabled()) {
1617         RCPtr<VBucket> vb = getVBucket(vbucket);
1618         cb_assert(vb);
1619         KVShard *myShard = vbMap.getShard(vbucket);
1620
1621         // schedule to the current batch of background fetch of the given
1622         // vbucket
1623         VBucketBGFetchItem * fetchThis = new VBucketBGFetchItem(cookie,
1624                                                                 isMeta);
1625         vb->queueBGFetchItem(key, fetchThis, myShard->getBgFetcher());
1626         myShard->getBgFetcher()->notifyBGEvent();
1627         ss << "Queued a background fetch, now at "
1628            << vb->numPendingBGFetchItems() << std::endl;
1629         LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1630     } else {
1631         bgFetchQueue++;
1632         stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
1633                                             bgFetchQueue.load());
1634         ExecutorPool* iom = ExecutorPool::get();
1635         ExTask task = new BGFetchTask(&engine, key, vbucket, rowid, cookie,
1636                                       isMeta,
1637                                       Priority::BgFetcherGetMetaPriority,
1638                                       bgFetchDelay, false);
1639         iom->schedule(task, READER_TASK_IDX);
1640         ss << "Queued a background fetch, now at " << bgFetchQueue.load()
1641            << std::endl;
1642         LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1643     }
1644 }
1645
1646 GetValue EventuallyPersistentStore::getInternal(const std::string &key,
1647                                                 uint16_t vbucket,
1648                                                 const void *cookie,
1649                                                 bool queueBG,
1650                                                 bool honorStates,
1651                                                 vbucket_state_t allowedState,
1652                                                 bool trackReference) {
1653
1654     vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1655         vbucket_state_replica : vbucket_state_active;
1656     RCPtr<VBucket> vb = getVBucket(vbucket);
1657     if (!vb) {
1658         ++stats.numNotMyVBuckets;
1659         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1660     } else if (honorStates && vb->getState() == vbucket_state_dead) {
1661         ++stats.numNotMyVBuckets;
1662         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1663     } else if (honorStates && vb->getState() == disallowedState) {
1664         ++stats.numNotMyVBuckets;
1665         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1666     } else if (honorStates && vb->getState() == vbucket_state_pending) {
1667         if (vb->addPendingOp(cookie)) {
1668             return GetValue(NULL, ENGINE_EWOULDBLOCK);
1669         }
1670     }
1671
1672     int bucket_num(0);
1673     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1674     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
1675                                      trackReference);
1676     if (v) {
1677         if (v->isDeleted() || v->isTempDeletedItem() ||
1678             v->isTempNonExistentItem()) {
1679             GetValue rv;
1680             return rv;
1681         }
1682         // If the value is not resident, wait for it...
1683         if (!v->isResident()) {
1684             if (queueBG) {
1685                 bgFetch(key, vbucket, v->getBySeqno(), cookie);
1686             }
1687             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
1688                             true, v->getNRUValue());
1689         }
1690
1691         GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
1692                     ENGINE_SUCCESS, v->getBySeqno(), false, v->getNRUValue());
1693         return rv;
1694     } else {
1695         if (eviction_policy == VALUE_ONLY || diskFlushAll) {
1696             GetValue rv;
1697             return rv;
1698         }
1699         ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1700         if (queueBG) { // Full eviction and need a bg fetch.
1701             ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
1702                                        cookie, false);
1703         }
1704         return GetValue(NULL, ec, -1, true);
1705     }
1706 }
1707
1708 GetValue EventuallyPersistentStore::getRandomKey() {
1709     long max = vbMap.getSize();
1710
1711     long start = random() % max;
1712     long curr = start;
1713     Item *itm = NULL;
1714
1715     while (itm == NULL) {
1716         RCPtr<VBucket> vb = getVBucket(curr++);
1717         while (!vb || vb->getState() != vbucket_state_active) {
1718             if (curr == start) {
1719                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1720             }
1721             if (curr == max) {
1722                 curr = 0;
1723             }
1724
1725             vb = getVBucket(curr++);
1726         }
1727
1728         if ((itm = vb->ht.getRandomKey(random())) != NULL) {
1729             GetValue rv(itm, ENGINE_SUCCESS);
1730             return rv;
1731         }
1732
1733         if (curr == max) {
1734             curr = 0;
1735         }
1736
1737         if (curr == start) {
1738             return GetValue(NULL, ENGINE_KEY_ENOENT);
1739         }
1740         // Search next vbucket
1741     }
1742
1743     return GetValue(NULL, ENGINE_KEY_ENOENT);
1744 }
1745
1746
1747 ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
1748                                                         const std::string &key,
1749                                                         uint16_t vbucket,
1750                                                         const void *cookie,
1751                                                         ItemMetaData &metadata,
1752                                                         uint32_t &deleted,
1753                                                         bool trackReferenced)
1754 {
1755     (void) cookie;
1756     RCPtr<VBucket> vb = getVBucket(vbucket);
1757     if (!vb || vb->getState() == vbucket_state_dead ||
1758         vb->getState() == vbucket_state_replica) {
1759         ++stats.numNotMyVBuckets;
1760         return ENGINE_NOT_MY_VBUCKET;
1761     }
1762
1763     int bucket_num(0);
1764     deleted = 0;
1765     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1766     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true,
1767                                           trackReferenced);
1768
1769     if (v) {
1770         stats.numOpsGetMeta++;
1771
1772         if (v->isTempInitialItem()) { // Need bg meta fetch.
1773             bgFetch(key, vbucket, -1, cookie, true);
1774             return ENGINE_EWOULDBLOCK;
1775         } else if (v->isTempNonExistentItem()) {
1776             metadata.cas = v->getCas();
1777             return ENGINE_KEY_ENOENT;
1778         } else {
1779             if (v->isTempDeletedItem() || v->isDeleted() ||
1780                 v->isExpired(ep_real_time())) {
1781                 deleted |= GET_META_ITEM_DELETED_FLAG;
1782             }
1783
1784             if (v->isLocked(ep_current_time())) {
1785                 metadata.cas = static_cast<uint64_t>(-1);
1786             } else {
1787                 metadata.cas = v->getCas();
1788             }
1789             metadata.flags = v->getFlags();
1790             metadata.exptime = v->getExptime();
1791             metadata.revSeqno = v->getRevSeqno();
1792             return ENGINE_SUCCESS;
1793         }
1794     } else {
1795         // The key wasn't found. However, this may be because it was previously
1796         // deleted or evicted with the full eviction strategy.
1797         // So, add a temporary item corresponding to the key to the hash table
1798         // and schedule a background fetch for its metadata from the persistent
1799         // store. The item's state will be updated after the fetch completes.
1800         return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
1801     }
1802 }
1803
1804 ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
1805                                                          uint64_t cas,
1806                                                          const void *cookie,
1807                                                          bool force,
1808                                                          bool allowExisting,
1809                                                          uint8_t nru,
1810                                                          bool genBySeqno,
1811                                                          bool isReplication)
1812 {
1813     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
1814     if (!vb || vb->getState() == vbucket_state_dead) {
1815         ++stats.numNotMyVBuckets;
1816         return ENGINE_NOT_MY_VBUCKET;
1817     } else if (vb->getState() == vbucket_state_replica && !force) {
1818         ++stats.numNotMyVBuckets;
1819         return ENGINE_NOT_MY_VBUCKET;
1820     } else if (vb->getState() == vbucket_state_pending && !force) {
1821         if (vb->addPendingOp(cookie)) {
1822             return ENGINE_EWOULDBLOCK;
1823         }
1824     }
1825
1826     int bucket_num(0);
1827     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
1828     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
1829                                           false);
1830
1831     if (!force) {
1832         if (v)  {
1833             if (v->isTempInitialItem()) {
1834                 bgFetch(itm.getKey(), itm.getVBucketId(), -1, cookie, true);
1835                 return ENGINE_EWOULDBLOCK;
1836             }
1837             if (!conflictResolver->resolve(v, itm.getMetaData(), false)) {
1838                 ++stats.numOpsSetMetaResolutionFailed;
1839                 return ENGINE_KEY_EEXISTS;
1840             }
1841         } else {
1842             return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1843                                          cookie, true, isReplication);
1844         }
1845     }
1846
1847     if (v && v->isLocked(ep_current_time()) &&
1848         (vb->getState() == vbucket_state_replica ||
1849          vb->getState() == vbucket_state_pending)) {
1850         v->unlock();
1851     }
1852     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
1853                                                 true, eviction_policy, nru,
1854                                                 isReplication);
1855
1856     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1857     switch (mtype) {
1858     case NOMEM:
1859         ret = ENGINE_ENOMEM;
1860         break;
1861     case INVALID_CAS:
1862     case IS_LOCKED:
1863         ret = ENGINE_KEY_EEXISTS;
1864         break;
1865     case INVALID_VBUCKET:
1866         ret = ENGINE_NOT_MY_VBUCKET;
1867         break;
1868     case WAS_DIRTY:
1869     case WAS_CLEAN:
1870         queueDirty(vb, v, &lh, false, true, genBySeqno);
1871         break;
1872     case NOT_FOUND:
1873         ret = ENGINE_KEY_ENOENT;
1874         break;
1875     case NEED_BG_FETCH:
1876         {            // CAS operation with non-resident item + full eviction.
1877             if (v) { // temp item is already created. Simply schedule a
1878                 lh.unlock(); // bg fetch job.
1879                 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
1880                 return ENGINE_EWOULDBLOCK;
1881             }
1882             ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1883                                         cookie, true, isReplication);
1884         }
1885     }
1886
1887     return ret;
1888 }
1889
1890 GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
1891                                                     uint16_t vbucket,
1892                                                     const void *cookie,
1893                                                     time_t exptime)
1894 {
1895     RCPtr<VBucket> vb = getVBucket(vbucket);
1896     if (!vb) {
1897         ++stats.numNotMyVBuckets;
1898         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1899     } else if (vb->getState() == vbucket_state_dead) {
1900         ++stats.numNotMyVBuckets;
1901         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1902     } else if (vb->getState() == vbucket_state_replica) {
1903         ++stats.numNotMyVBuckets;
1904         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1905     } else if (vb->getState() == vbucket_state_pending) {
1906         if (vb->addPendingOp(cookie)) {
1907             return GetValue(NULL, ENGINE_EWOULDBLOCK);
1908         }
1909     }
1910
1911     int bucket_num(0);
1912     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1913     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1914
1915     if (v) {
1916         if (v->isDeleted() || v->isTempDeletedItem() ||
1917             v->isTempNonExistentItem()) {
1918             GetValue rv;
1919             return rv;
1920         }
1921
1922         if (!v->isResident()) {
1923             bgFetch(key, vbucket, v->getBySeqno(), cookie);
1924             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
1925         }
1926         if (v->isLocked(ep_current_time())) {
1927             GetValue rv(NULL, ENGINE_KEY_EEXISTS, 0);
1928             return rv;
1929         }
1930
1931         bool exptime_mutated = exptime != v->getExptime() ? true : false;
1932         if (exptime_mutated) {
1933            v->markDirty();
1934            v->setExptime(exptime);
1935         }
1936
1937         GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
1938                     ENGINE_SUCCESS, v->getBySeqno());
1939
1940         if (exptime_mutated) {
1941             // persist the itme in the underlying storage for
1942             // mutated exptime
1943             queueDirty(vb, v, &lh);
1944         }
1945         return rv;
1946     } else {
1947         if (eviction_policy == VALUE_ONLY) {
1948             GetValue rv;
1949             return rv;
1950         } else {
1951             ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
1952                                                          vb, cookie, false);
1953             return GetValue(NULL, ec, -1, true);
1954         }
1955     }
1956 }
1957
1958 ENGINE_ERROR_CODE
1959 EventuallyPersistentStore::statsVKey(const std::string &key,
1960                                      uint16_t vbucket,
1961                                      const void *cookie) {
1962     RCPtr<VBucket> vb = getVBucket(vbucket);
1963     if (!vb) {
1964         return ENGINE_NOT_MY_VBUCKET;
1965     }
1966
1967     int bucket_num(0);
1968     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1969     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1970
1971     if (v) {
1972         if (v->isDeleted() || v->isTempDeletedItem() ||
1973             v->isTempNonExistentItem()) {
1974             return ENGINE_KEY_ENOENT;
1975         }
1976         bgFetchQueue++;
1977         cb_assert(bgFetchQueue > 0);
1978         ExecutorPool* iom = ExecutorPool::get();
1979         ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
1980                                            v->getBySeqno(), cookie,
1981                                            Priority::VKeyStatBgFetcherPriority,
1982                                            bgFetchDelay, false);
1983         iom->schedule(task, READER_TASK_IDX);
1984         return ENGINE_EWOULDBLOCK;
1985     } else {
1986         if (eviction_policy == VALUE_ONLY) {
1987             return ENGINE_KEY_ENOENT;
1988         } else {
1989             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
1990                                                         eviction_policy);
1991             switch(rv) {
1992             case ADD_NOMEM:
1993                 return ENGINE_ENOMEM;
1994             case ADD_EXISTS:
1995             case ADD_UNDEL:
1996             case ADD_SUCCESS:
1997             case ADD_TMP_AND_BG_FETCH:
1998                 // Since the hashtable bucket is locked, we shouldn't get here
1999                 abort();
2000             case ADD_BG_FETCH:
2001                 {
2002                     ++bgFetchQueue;
2003                     cb_assert(bgFetchQueue > 0);
2004                     ExecutorPool* iom = ExecutorPool::get();
2005                     ExTask task = new VKeyStatBGFetchTask(&engine, key,
2006                                                           vbucket, -1, cookie,
2007                                            Priority::VKeyStatBgFetcherPriority,
2008                                                           bgFetchDelay, false);
2009                     iom->schedule(task, READER_TASK_IDX);
2010                 }
2011             }
2012             return ENGINE_EWOULDBLOCK;
2013         }
2014     }
2015 }
2016
2017 void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
2018                                                   std::string &key,
2019                                                   uint16_t vbid,
2020                                                   uint64_t bySeqNum) {
2021     RememberingCallback<GetValue> gcb;
2022
2023     getROUnderlying(vbid)->get(key, bySeqNum, vbid, gcb);
2024     gcb.waitForValue();
2025     cb_assert(gcb.fired);
2026
2027     if (eviction_policy == FULL_EVICTION) {
2028         RCPtr<VBucket> vb = getVBucket(vbid);
2029         if (vb) {
2030             int bucket_num(0);
2031             LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
2032             StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2033             if (v && v->isTempInitialItem()) {
2034                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2035                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
2036                     cb_assert(v->isResident());
2037                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
2038                     v->setStoredValueState(
2039                                           StoredValue::state_non_existent_key);
2040                 } else {
2041                     // underlying kvstore couldn't fetch requested data
2042                     // log returned error and notify TMPFAIL to client
2043                     LOG(EXTENSION_LOG_WARNING,
2044                         "Warning: failed background fetch for vb=%d seq=%d "
2045                         "key=%s", vbid, v->getBySeqno(), key.c_str());
2046                 }
2047             }
2048         }
2049     }
2050
2051     if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2052         engine.addLookupResult(cookie, gcb.val.getValue());
2053     } else {
2054         engine.addLookupResult(cookie, NULL);
2055     }
2056
2057     bgFetchQueue--;
2058     engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
2059 }
2060
2061 bool EventuallyPersistentStore::getLocked(const std::string &key,
2062                                           uint16_t vbucket,
2063                                           Callback<GetValue> &cb,
2064                                           rel_time_t currentTime,
2065                                           uint32_t lockTimeout,
2066                                           const void *cookie) {
2067     RCPtr<VBucket> vb = getVBucket(vbucket, vbucket_state_active);
2068     if (!vb) {
2069         ++stats.numNotMyVBuckets;
2070         GetValue rv(NULL, ENGINE_NOT_MY_VBUCKET);
2071         cb.callback(rv);
2072         return false;
2073     }
2074
2075     int bucket_num(0);
2076     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2077     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2078
2079     if (v) {
2080         if (v->isDeleted() || v->isTempNonExistentItem() ||
2081             v->isTempDeletedItem()) {
2082             GetValue rv;
2083             cb.callback(rv);
2084             return true;
2085         }
2086
2087         // if v is locked return error
2088         if (v->isLocked(currentTime)) {
2089             GetValue rv;
2090             cb.callback(rv);
2091             return false;
2092         }
2093
2094         // If the value is not resident, wait for it...
2095         if (!v->isResident()) {
2096             if (cookie) {
2097                 bgFetch(key, vbucket, v->getBySeqno(), cookie);
2098             }
2099             GetValue rv(NULL, ENGINE_EWOULDBLOCK, -1, true);
2100             cb.callback(rv);
2101             return false;
2102         }
2103
2104         // acquire lock and increment cas value
2105         v->lock(currentTime + lockTimeout);
2106
2107         Item *it = v->toItem(false, vbucket);
2108         it->setCas();
2109         v->setCas(it->getCas());
2110
2111         GetValue rv(it);
2112         cb.callback(rv);
2113         return true;
2114     } else {
2115         if (eviction_policy == VALUE_ONLY) {
2116             GetValue rv;
2117             cb.callback(rv);
2118             return true;
2119         } else {
2120             ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
2121                                                          vb, cookie, false);
2122             GetValue rv(NULL, ec, -1, true);
2123             cb.callback(rv);
2124             return false;
2125         }
2126     }
2127 }
2128
2129 ENGINE_ERROR_CODE
2130 EventuallyPersistentStore::unlockKey(const std::string &key,
2131                                      uint16_t vbucket,
2132                                      uint64_t cas,
2133                                      rel_time_t currentTime)
2134 {
2135
2136     RCPtr<VBucket> vb = getVBucket(vbucket, vbucket_state_active);
2137     if (!vb) {
2138         ++stats.numNotMyVBuckets;
2139         return ENGINE_NOT_MY_VBUCKET;
2140     }
2141
2142     int bucket_num(0);
2143     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2144     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2145
2146     if (v) {
2147         if (v->isDeleted() || v->isTempNonExistentItem() ||
2148             v->isTempDeletedItem()) {
2149             return ENGINE_KEY_ENOENT;
2150         }
2151         if (v->isLocked(currentTime)) {
2152             if (v->getCas() == cas) {
2153                 v->unlock();
2154                 return ENGINE_SUCCESS;
2155             }
2156         }
2157         return ENGINE_TMPFAIL;
2158     } else {
2159         if (eviction_policy == VALUE_ONLY) {
2160             return ENGINE_KEY_ENOENT;
2161         } else {
2162             // With the full eviction, an item's lock is automatically
2163             // released when the item is evicted from memory. Therefore,
2164             // we simply return ENGINE_TMPFAIL when we receive unlockKey
2165             // for an item that is not in memocy cache. Note that we don't
2166             // spawn any bg fetch job to figure out if an item actually
2167             // exists in disk or not.
2168             return ENGINE_TMPFAIL;
2169         }
2170     }
2171 }
2172
2173
2174 ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
2175                                             const std::string &key,
2176                                             uint16_t vbucket,
2177                                             const void *cookie,
2178                                             struct key_stats &kstats,
2179                                             bool bgfetch,
2180                                             bool wantsDeleted)
2181 {
2182     RCPtr<VBucket> vb = getVBucket(vbucket);
2183     if (!vb) {
2184         return ENGINE_NOT_MY_VBUCKET;
2185     }
2186
2187     int bucket_num(0);
2188     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2189     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2190
2191     if (v) {
2192         if ((v->isDeleted() && !wantsDeleted) ||
2193             v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2194             return ENGINE_KEY_ENOENT;
2195         }
2196         if (eviction_policy == FULL_EVICTION &&
2197             v->isTempInitialItem() && bgfetch) {
2198             lh.unlock();
2199             bgFetch(key, vbucket, -1, cookie, true);
2200             return ENGINE_EWOULDBLOCK;
2201         }
2202         kstats.logically_deleted = v->isDeleted();
2203         kstats.dirty = v->isDirty();
2204         kstats.exptime = v->getExptime();
2205         kstats.flags = v->getFlags();
2206         kstats.cas = v->getCas();
2207         kstats.vb_state = vb->getState();
2208         return ENGINE_SUCCESS;
2209     } else {
2210         if (eviction_policy == VALUE_ONLY) {
2211             return ENGINE_KEY_ENOENT;
2212         } else {
2213             if (bgfetch) {
2214                 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2215                                              cookie, true);
2216             } else {
2217                 return ENGINE_KEY_ENOENT;
2218             }
2219         }
2220     }
2221 }
2222
2223 std::string EventuallyPersistentStore::validateKey(const std::string &key,
2224                                                    uint16_t vbucket,
2225                                                    Item &diskItem) {
2226     int bucket_num(0);
2227     RCPtr<VBucket> vb = getVBucket(vbucket);
2228     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2229     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
2230                                      false, true);
2231
2232     if (v) {
2233         if (v->isDeleted() || v->isTempNonExistentItem() ||
2234             v->isTempDeletedItem()) {
2235             return "item_deleted";
2236         }
2237
2238         if (diskItem.getFlags() != v->getFlags()) {
2239             return "flags_mismatch";
2240         } else if (v->isResident() && memcmp(diskItem.getData(),
2241                                              v->getValue()->getData(),
2242                                              diskItem.getNBytes())) {
2243             return "data_mismatch";
2244         } else {
2245             return "valid";
2246         }
2247     } else {
2248         return "item_deleted";
2249     }
2250
2251 }
2252
2253 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
2254                                                         uint64_t* cas,
2255                                                         uint16_t vbucket,
2256                                                         const void *cookie,
2257                                                         bool force,
2258                                                         ItemMetaData *itemMeta,
2259                                                         bool tapBackfill)
2260 {
2261     RCPtr<VBucket> vb = getVBucket(vbucket);
2262     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2263         ++stats.numNotMyVBuckets;
2264         return ENGINE_NOT_MY_VBUCKET;
2265     } else if(vb->getState() == vbucket_state_replica && !force) {
2266         ++stats.numNotMyVBuckets;
2267         return ENGINE_NOT_MY_VBUCKET;
2268     } else if(vb->getState() == vbucket_state_pending && !force) {
2269         if (vb->addPendingOp(cookie)) {
2270             return ENGINE_EWOULDBLOCK;
2271         }
2272     }
2273
2274     int bucket_num(0);
2275     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2276     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2277     if (!v || v->isDeleted() || v->isTempItem()) {
2278         if (eviction_policy == VALUE_ONLY) {
2279             return ENGINE_KEY_ENOENT;
2280         } else { // Full eviction.
2281             if (!force) {
2282                 if (!v) { // Item might be evicted from cache.
2283                     return addTempItemForBgFetch(lh, bucket_num, key, vb,
2284                                                  cookie, true);
2285                 } else if (v->isTempInitialItem()) {
2286                     lh.unlock();
2287                     bgFetch(key, vbucket, -1, cookie, true);
2288                     return ENGINE_EWOULDBLOCK;
2289                 } else { // Non-existent or deleted key.
2290                     return ENGINE_KEY_ENOENT;
2291                 }
2292             } else {
2293                 if (!v) { // Item might be evicted from cache.
2294                     // Create a temp item and delete it below as it is a
2295                     // force deletion.
2296                     add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num,
2297                                                               key,
2298                                                               eviction_policy);
2299                     if (rv == ADD_NOMEM) {
2300                         return ENGINE_ENOMEM;
2301                     }
2302                     v = vb->ht.unlocked_find(key, bucket_num, true, false);
2303                     v->setStoredValueState(StoredValue::state_deleted_key);
2304                 } else if (v->isTempInitialItem()) {
2305                     v->setStoredValueState(StoredValue::state_deleted_key);
2306                 } else { // Non-existent or deleted key.
2307                     return ENGINE_KEY_ENOENT;
2308                 }
2309             }
2310         }
2311     }
2312
2313     if (v && v->isLocked(ep_current_time()) &&
2314         (vb->getState() == vbucket_state_replica ||
2315          vb->getState() == vbucket_state_pending)) {
2316         v->unlock();
2317     }
2318     mutation_type_t delrv;
2319     delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
2320
2321     if (itemMeta && v) {
2322         itemMeta->revSeqno = v->getRevSeqno();
2323         itemMeta->cas = v->getCas();
2324         itemMeta->flags = v->getFlags();
2325         itemMeta->exptime = v->getExptime();
2326     }
2327     *cas = v ? v->getCas() : 0;
2328
2329     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2330     switch (delrv) {
2331     case NOMEM:
2332         ret = ENGINE_ENOMEM;
2333         break;
2334     case INVALID_VBUCKET:
2335         ret = ENGINE_NOT_MY_VBUCKET;
2336         break;
2337     case INVALID_CAS:
2338         ret = ENGINE_KEY_EEXISTS;
2339         break;
2340     case IS_LOCKED:
2341         ret = ENGINE_TMPFAIL;
2342         break;
2343     case NOT_FOUND:
2344         ret = ENGINE_KEY_ENOENT;
2345         if (v) {
2346             queueDirty(vb, v, &lh, tapBackfill);
2347         }
2348         break;
2349     case WAS_DIRTY:
2350     case WAS_CLEAN:
2351         queueDirty(vb, v, &lh, tapBackfill);
2352         break;
2353     case NEED_BG_FETCH:
2354         // We already figured out if a bg fetch is requred for a full-evicted
2355         // item above.
2356         abort();
2357     }
2358     return ret;
2359 }
2360
2361 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
2362                                                         const std::string &key,
2363                                                         uint64_t* cas,
2364                                                         uint16_t vbucket,
2365                                                         const void *cookie,
2366                                                         bool force,
2367                                                         ItemMetaData *itemMeta,
2368                                                         bool tapBackfill,
2369                                                         bool genBySeqno,
2370                                                         uint64_t bySeqno,
2371                                                         bool isReplication)
2372 {
2373     RCPtr<VBucket> vb = getVBucket(vbucket);
2374     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2375         ++stats.numNotMyVBuckets;
2376         return ENGINE_NOT_MY_VBUCKET;
2377     } else if(vb->getState() == vbucket_state_replica && !force) {
2378         ++stats.numNotMyVBuckets;
2379         return ENGINE_NOT_MY_VBUCKET;
2380     } else if(vb->getState() == vbucket_state_pending && !force) {
2381         if (vb->addPendingOp(cookie)) {
2382             return ENGINE_EWOULDBLOCK;
2383         }
2384     }
2385
2386     int bucket_num(0);
2387     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2388     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2389     if (!force) { // Need conflict resolution.
2390         if (v)  {
2391             if (v->isTempInitialItem()) {
2392                 bgFetch(key, vbucket, -1, cookie, true);
2393                 return ENGINE_EWOULDBLOCK;
2394             }
2395             if (!conflictResolver->resolve(v, *itemMeta, true)) {
2396                 ++stats.numOpsDelMetaResolutionFailed;
2397                 return ENGINE_KEY_EEXISTS;
2398             }
2399         } else {
2400             // Item is 1) deleted or not existent in the value eviction case OR
2401             // 2) deleted or evicted in the full eviction.
2402             return addTempItemForBgFetch(lh, bucket_num, key, vb,
2403                                          cookie, true, isReplication);
2404         }
2405     } else {
2406         if (!v) {
2407             // Create a temp item and delete it below as it is a force deletion
2408             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2409                                                         eviction_policy,
2410                                                         isReplication);
2411             if (rv == ADD_NOMEM) {
2412                 return ENGINE_ENOMEM;
2413             }
2414             v = vb->ht.unlocked_find(key, bucket_num, true, false);
2415             v->setStoredValueState(StoredValue::state_deleted_key);
2416         } else if (v->isTempInitialItem()) {
2417             v->setStoredValueState(StoredValue::state_deleted_key);
2418         }
2419     }
2420
2421     if (v && v->isLocked(ep_current_time()) &&
2422         (vb->getState() == vbucket_state_replica ||
2423          vb->getState() == vbucket_state_pending)) {
2424         v->unlock();
2425     }
2426     mutation_type_t delrv;
2427     delrv = vb->ht.unlocked_softDelete(v, *cas, *itemMeta,
2428                                        eviction_policy, true);
2429     *cas = v ? v->getCas() : 0;
2430
2431     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2432     switch (delrv) {
2433     case NOMEM:
2434         ret = ENGINE_ENOMEM;
2435         break;
2436     case INVALID_VBUCKET:
2437         ret = ENGINE_NOT_MY_VBUCKET;
2438         break;
2439     case INVALID_CAS:
2440         ret = ENGINE_KEY_EEXISTS;
2441         break;
2442     case IS_LOCKED:
2443         ret = ENGINE_TMPFAIL;
2444         break;
2445     case NOT_FOUND:
2446         ret = ENGINE_KEY_ENOENT;
2447         break;
2448     case WAS_DIRTY:
2449     case WAS_CLEAN:
2450         if (!genBySeqno) {
2451             v->setBySeqno(bySeqno);
2452         }
2453         queueDirty(vb, v, &lh, tapBackfill, true, genBySeqno);
2454         break;
2455     case NEED_BG_FETCH:
2456         lh.unlock();
2457         bgFetch(key, vbucket, -1, cookie, true);
2458         ret = ENGINE_EWOULDBLOCK;
2459     }
2460
2461     return ret;
2462 }
2463
2464 void EventuallyPersistentStore::reset() {
2465     std::vector<int> buckets = vbMap.getBuckets();
2466     std::vector<int>::iterator it;
2467     for (it = buckets.begin(); it != buckets.end(); ++it) {
2468         RCPtr<VBucket> vb = getVBucket(*it);
2469         if (vb) {
2470             LockHolder lh(vb_mutexes[vb->getId()]);
2471             vb->ht.clear();
2472             vb->checkpointManager.clear(vb->getState());
2473             vb->resetStats();
2474             vb->setCurrentSnapshot(0, 0);
2475         }
2476     }
2477
2478     bool inverse = false;
2479     if (diskFlushAll.compare_exchange_strong(inverse, true)) {
2480         ++stats.diskQueueSize;
2481         // wake up (notify) one flusher is good enough for diskFlushAll
2482         vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
2483     }
2484 }
2485
2486 /**
2487  * Callback invoked after persisting an item from memory to disk.
2488  *
2489  * This class exists to create a closure around a few variables within
2490  * EventuallyPersistentStore::flushOne so that an object can be
2491  * requeued in case of failure to store in the underlying layer.
2492  */
2493 class PersistenceCallback : public Callback<mutation_result>,
2494                             public Callback<int> {
2495 public:
2496
2497     PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
2498                         EventuallyPersistentStore *st, EPStats *s, uint64_t c)
2499         : queuedItem(qi), vbucket(vb), store(st), stats(s), cas(c) {
2500         cb_assert(vb);
2501         cb_assert(s);
2502     }
2503
2504     // This callback is invoked for set only.
2505     void callback(mutation_result &value) {
2506         if (value.first == 1) {
2507             int bucket_num(0);
2508             LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2509                                                         &bucket_num);
2510             StoredValue *v = store->fetchValidValue(vbucket,
2511                                                     queuedItem->getKey(),
2512                                                     bucket_num, true, false);
2513             if (v) {
2514                 if (v->getCas() == cas) {
2515                     // mark this item clean only if current and stored cas
2516                     // value match
2517                     v->markClean();
2518                 }
2519                 if (v->isNewCacheItem()) {
2520                     if (value.second) {
2521                         // Insert in value-only or full eviction mode.
2522                         ++vbucket->opsCreate;
2523                         vbucket->incrMetaDataDisk(*queuedItem);
2524                     } else { // Update in full eviction mode.
2525                         --vbucket->ht.numTotalItems;
2526                         ++vbucket->opsUpdate;
2527                     }
2528                     v->setNewCacheItem(false);
2529                 } else { // Update in value-only or full eviction mode.
2530                     ++vbucket->opsUpdate;
2531                 }
2532             }
2533
2534             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2535             stats->decrDiskQueueSize(1);
2536             stats->totalPersisted++;
2537         } else {
2538             // If the return was 0 here, we're in a bad state because
2539             // we do not know the rowid of this object.
2540             if (value.first == 0) {
2541                 int bucket_num(0);
2542                 LockHolder lh = vbucket->ht.getLockedBucket(
2543                                            queuedItem->getKey(), &bucket_num);
2544                 StoredValue *v = store->fetchValidValue(vbucket,
2545                                                         queuedItem->getKey(),
2546                                                         bucket_num, true,
2547                                                         false);
2548                 if (v) {
2549                     std::stringstream ss;
2550                     ss << "Persisting ``" << queuedItem->getKey() << "'' on vb"
2551                        << queuedItem->getVBucketId() << " (rowid="
2552                        << v->getBySeqno() << ") returned 0 updates\n";
2553                     LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2554                 } else {
2555                     LOG(EXTENSION_LOG_WARNING,
2556                         "Error persisting now missing ``%s'' from vb%d",
2557                         queuedItem->getKey().c_str(),
2558                         queuedItem->getVBucketId());
2559                 }
2560
2561                 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2562                 stats->decrDiskQueueSize(1);
2563             } else {
2564                 std::stringstream ss;
2565                 ss <<
2566                 "Fatal error in persisting SET ``" <<
2567                 queuedItem->getKey() << "'' on vb "
2568                    << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2569                 LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2570                 redirty();
2571             }
2572         }
2573     }
2574
2575     // This callback is invoked for deletions only.
2576     //
2577     // The boolean indicates whether the underlying storage
2578     // successfully deleted the item.
2579     void callback(int &value) {
2580         // > 1 would be bad.  We were only trying to delete one row.
2581         cb_assert(value < 2);
2582         // -1 means fail
2583         // 1 means we deleted one row
2584         // 0 means we did not delete a row, but did not fail (did not exist)
2585         if (value >= 0) {
2586             // We have succesfully removed an item from the disk, we
2587             // may now remove it from the hash table.
2588             int bucket_num(0);
2589             LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2590                                                         &bucket_num);
2591             StoredValue *v = store->fetchValidValue(vbucket,
2592                                                     queuedItem->getKey(),
2593                                                     bucket_num, true, false);
2594             if (v && v->isDeleted()) {
2595                 bool newCacheItem = v->isNewCacheItem();
2596                 bool deleted = vbucket->ht.unlocked_del(queuedItem->getKey(),
2597                                                         bucket_num);
2598                 cb_assert(deleted);
2599                 if (newCacheItem && value > 0) {
2600                     // Need to decrement the item counter again for an item that
2601                     // exists on DB file, but not in memory (i.e., full eviction),
2602                     // because we created the temp item in memory and incremented
2603                     // the item counter when a deletion is pushed in the queue.
2604                     --vbucket->ht.numTotalItems;
2605                 }
2606             }
2607
2608             if (value > 0) {
2609                 ++stats->totalPersisted;
2610                 ++vbucket->opsDelete;
2611             }
2612             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2613             stats->decrDiskQueueSize(1);
2614             vbucket->decrMetaDataDisk(*queuedItem);
2615         } else {
2616             std::stringstream ss;
2617             ss << "Fatal error in persisting DELETE ``" <<
2618             queuedItem->getKey() << "'' on vb "
2619                << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2620             LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2621             redirty();
2622         }
2623     }
2624
2625 private:
2626
2627     void redirty() {
2628         if (store->vbMap.isBucketDeletion(vbucket->getId())) {
2629             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2630             stats->decrDiskQueueSize(1);
2631             return;
2632         }
2633         ++stats->flushFailed;
2634         store->invokeOnLockedStoredValue(queuedItem->getKey(),
2635                                          queuedItem->getVBucketId(),
2636                                          &StoredValue::reDirty);
2637         vbucket->rejectQueue.push(queuedItem);
2638     }
2639
2640     const queued_item queuedItem;
2641     RCPtr<VBucket> &vbucket;
2642     EventuallyPersistentStore *store;
2643     EPStats *stats;
2644     uint64_t cas;
2645     DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
2646 };
2647
2648 void EventuallyPersistentStore::flushOneDeleteAll() {
2649     for (size_t i = 0; i < vbMap.getSize(); ++i) {
2650         RCPtr<VBucket> vb = getVBucket(i);
2651         if (vb) {
2652             LockHolder lh(vb_mutexes[vb->getId()]);
2653             getRWUnderlying(vb->getId())->reset(i);
2654         }
2655     }
2656
2657     bool inverse = true;
2658     diskFlushAll.compare_exchange_strong(inverse, false);
2659     stats.decrDiskQueueSize(1);
2660 }
2661
2662 int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
2663     KVShard *shard = vbMap.getShard(vbid);
2664     if (diskFlushAll) {
2665         if (shard->getId() == EP_PRIMARY_SHARD) {
2666             flushOneDeleteAll();
2667         } else {
2668             // disk flush is pending just return
2669             return 0;
2670         }
2671     }
2672
2673     if (vbMap.isBucketCreation(vbid)) {
2674         return RETRY_FLUSH_VBUCKET;
2675     }
2676
2677     int items_flushed = 0;
2678     rel_time_t flush_start = ep_current_time();
2679
2680     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
2681     if (vb) {
2682         LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
2683         if (!lh.islocked()) { // Try another bucket if this one is locked
2684             return RETRY_FLUSH_VBUCKET; // to avoid blocking flusher
2685         }
2686
2687         KVStatsCallback cb(this);
2688         std::vector<queued_item> items;
2689         KVStore *rwUnderlying = getRWUnderlying(vbid);
2690
2691         while (!vb->rejectQueue.empty()) {
2692             items.push_back(vb->rejectQueue.front());
2693             vb->rejectQueue.pop();
2694         }
2695
2696         LockHolder slh = vb->getSnapshotLock();
2697         uint64_t snapStart;
2698         uint64_t snapEnd;
2699         vb->getCurrentSnapshot_UNLOCKED(snapStart, snapEnd);
2700
2701         vb->getBackfillItems(items);
2702         vb->checkpointManager.getAllItemsForPersistence(items);
2703         slh.unlock();
2704
2705         if (!items.empty()) {
2706             while (!rwUnderlying->begin()) {
2707                 ++stats.beginFailed;
2708                 LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
2709                     "Retry in 1 sec ...");
2710                 sleep(1);
2711             }
2712             rwUnderlying->optimizeWrites(items);
2713
2714             Item *prev = NULL;
2715             uint64_t maxSeqno = 0;
2716             std::list<PersistenceCallback*> pcbs;
2717             std::vector<queued_item>::iterator it = items.begin();
2718             for(; it != items.end(); ++it) {
2719                 if ((*it)->getOperation() != queue_op_set &&
2720                     (*it)->getOperation() != queue_op_del) {
2721                     continue;
2722                 } else if (!prev || prev->getKey() != (*it)->getKey()) {
2723                     prev = (*it).get();
2724                     ++items_flushed;
2725                     PersistenceCallback *cb = flushOneDelOrSet(*it, vb);
2726                     if (cb) {
2727                         pcbs.push_back(cb);
2728                     }
2729                     maxSeqno = std::max(maxSeqno, (uint64_t)(*it)->getBySeqno());
2730                     ++stats.flusher_todo;
2731                 } else {
2732                     stats.decrDiskQueueSize(1);
2733                     vb->doStatsForFlushing(*(*it), (*it)->size());
2734                 }
2735             }
2736
2737             BlockTimer timer(&stats.diskCommitHisto, "disk_commit",
2738                              stats.timingLog);
2739             hrtime_t start = gethrtime();
2740
2741             if (vb->getState() == vbucket_state_active) {
2742                 snapStart = maxSeqno;
2743                 snapEnd = maxSeqno;
2744             }
2745
2746             while (!rwUnderlying->commit(&cb, snapStart, snapEnd)) {
2747                 ++stats.commitFailed;
2748                 LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
2749                     "1 sec...\n");
2750                 sleep(1);
2751
2752             }
2753
2754             if (vb->rejectQueue.empty()) {
2755                 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
2756                 if (highSeqno > 0 &&
2757                     highSeqno != vbMap.getPersistenceSeqno(vbid)) {
2758                     vbMap.setPersistenceSeqno(vbid, highSeqno);
2759                 }
2760             }
2761
2762             while (!pcbs.empty()) {
2763                 delete pcbs.front();
2764                 pcbs.pop_front();
2765             }
2766
2767             ++stats.flusherCommits;
2768             hrtime_t end = gethrtime();
2769             uint64_t commit_time = (end - start) / 1000000;
2770             uint64_t trans_time = (end - flush_start) / 1000000;
2771
2772             lastTransTimePerItem = (items_flushed == 0) ? 0 :
2773                 static_cast<double>(trans_time) /
2774                 static_cast<double>(items_flushed);
2775             stats.commit_time.store(commit_time);
2776             stats.cumulativeCommitTime.fetch_add(commit_time);
2777             stats.cumulativeFlushTime.fetch_add(ep_current_time()
2778                                                 - flush_start);
2779             stats.flusher_todo.store(0);
2780         }
2781
2782         rwUnderlying->pendingTasks();
2783
2784         if (vb->checkpointManager.getNumCheckpoints() > 1) {
2785             wakeUpCheckpointRemover();
2786         }
2787
2788         if (vb->rejectQueue.empty()) {
2789             vb->checkpointManager.itemsPersisted();
2790             uint64_t seqno = vbMap.getPersistenceSeqno(vbid);
2791             uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
2792             vb->notifyOnPersistence(engine, seqno, true);
2793             vb->notifyOnPersistence(engine, chkid, false);
2794             if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
2795                 vbMap.setPersistenceCheckpointId(vbid, chkid);
2796             }
2797         }
2798     }
2799
2800     return items_flushed;
2801 }
2802
2803 PersistenceCallback*
2804 EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
2805                                             RCPtr<VBucket> &vb) {
2806
2807     if (!vb) {
2808         stats.decrDiskQueueSize(1);
2809         return NULL;
2810     }
2811
2812     int64_t bySeqno = qi->getBySeqno();
2813     bool deleted = qi->isDeleted();
2814     rel_time_t queued(qi->getQueuedTime());
2815
2816     int dirtyAge = ep_current_time() - queued;
2817     stats.dirtyAgeHisto.add(dirtyAge * 1000000);
2818     stats.dirtyAge.store(dirtyAge);
2819     stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
2820                                          stats.dirtyAgeHighWat.load()));
2821
2822     // Wait until the vbucket database is created by the vbucket state
2823     // snapshot task.
2824     if (vbMap.isBucketCreation(qi->getVBucketId()) ||
2825         vbMap.isBucketDeletion(qi->getVBucketId())) {
2826         vb->rejectQueue.push(qi);
2827         ++vb->opsReject;
2828         return NULL;
2829     }
2830
2831     KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
2832     if (!deleted) {
2833         // TODO: Need to separate disk_insert from disk_update because
2834         // bySeqno doesn't give us that information.
2835         BlockTimer timer(bySeqno == -1 ?
2836                          &stats.diskInsertHisto : &stats.diskUpdateHisto,
2837                          bySeqno == -1 ? "disk_insert" : "disk_update",
2838                          stats.timingLog);
2839         PersistenceCallback *cb =
2840             new PersistenceCallback(qi, vb, this, &stats, qi->getCas());
2841         rwUnderlying->set(*qi, *cb);
2842         return cb;
2843     } else {
2844         BlockTimer timer(&stats.diskDelHisto, "disk_delete",
2845                          stats.timingLog);
2846         PersistenceCallback *cb =
2847             new PersistenceCallback(qi, vb, this, &stats, 0);
2848         rwUnderlying->del(*qi, *cb);
2849         return cb;
2850     }
2851 }
2852
2853 void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
2854                                            StoredValue* v,
2855                                            LockHolder *plh,
2856                                            bool tapBackfill,
2857                                            bool notifyReplicator,
2858                                            bool genBySeqno) {
2859     if (vb) {
2860         queued_item qi(v->toItem(false, vb->getId()));
2861         bool rv;
2862         if (genBySeqno) {
2863             LockHolder slh = vb->getSnapshotLock();
2864             rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
2865                                vb->checkpointManager.queueDirty(vb, qi,
2866                                                                 genBySeqno);
2867             v->setBySeqno(qi->getBySeqno());
2868             vb->setCurrentSnapshot_UNLOCKED(qi->getBySeqno(), qi->getBySeqno());
2869         } else {
2870             rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
2871                                vb->checkpointManager.queueDirty(vb, qi,
2872                                                                 genBySeqno);
2873             v->setBySeqno(qi->getBySeqno());
2874         }
2875
2876         if (plh) {
2877             plh->unlock();
2878         }
2879
2880         if (rv) {
2881             KVShard* shard = vbMap.getShard(vb->getId());
2882             shard->getFlusher()->notifyFlushEvent();
2883
2884         }
2885         if (!tapBackfill && notifyReplicator) {
2886             engine.getTapConnMap().notifyVBConnections(vb->getId());
2887             engine.getDcpConnMap().notifyVBConnections(vb->getId(),
2888                                                        qi->getBySeqno());
2889         }
2890     }
2891 }
2892
2893 std::vector<vbucket_state *> EventuallyPersistentStore::loadVBucketState()
2894 {
2895     return getOneROUnderlying()->listPersistedVbuckets();
2896 }
2897
2898 void EventuallyPersistentStore::warmupCompleted() {
2899     // Run the vbucket state snapshot job once after the warmup
2900     scheduleVBSnapshot(Priority::VBucketPersistHighPriority);
2901
2902     if (engine.getConfiguration().getAlogPath().length() > 0) {
2903
2904         if (engine.getConfiguration().isAccessScannerEnabled()) {
2905             LockHolder lh(accessScanner.mutex);
2906             accessScanner.enabled = true;
2907             lh.unlock();
2908             LOG(EXTENSION_LOG_WARNING, "Access Scanner task enabled");
2909             size_t smin = engine.getConfiguration().getAlogSleepTime();
2910             setAccessScannerSleeptime(smin);
2911         } else {
2912             LockHolder lh(accessScanner.mutex);
2913             accessScanner.enabled = false;
2914             LOG(EXTENSION_LOG_WARNING, "Access Scanner task disabled");
2915         }
2916
2917         Configuration &config = engine.getConfiguration();
2918         config.addValueChangedListener("access_scanner_enabled",
2919                                        new EPStoreValueChangeListener(*this));
2920         config.addValueChangedListener("alog_sleep_time",
2921                                        new EPStoreValueChangeListener(*this));
2922         config.addValueChangedListener("alog_task_time",
2923                                        new EPStoreValueChangeListener(*this));
2924     }
2925
2926     // "0" sleep_time means that the first snapshot task will be executed
2927     // right after warmup. Subsequent snapshot tasks will be scheduled every
2928     // 60 sec by default.
2929     ExecutorPool *iom = ExecutorPool::get();
2930     ExTask task = new StatSnap(&engine, Priority::StatSnapPriority, 0, false);
2931     statsSnapshotTaskId = iom->schedule(task, WRITER_TASK_IDX);
2932 }
2933
2934 bool EventuallyPersistentStore::maybeEnableTraffic()
2935 {
2936     // @todo rename.. skal vaere isTrafficDisabled elns
2937     double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
2938     double maxSize = static_cast<double>(stats.getMaxDataSize());
2939
2940     if (memoryUsed  >= stats.mem_low_wat) {
2941         LOG(EXTENSION_LOG_WARNING,
2942             "Total memory use reached to the low water mark, stop warmup");
2943         return true;
2944     } else if (memoryUsed > (maxSize * stats.warmupMemUsedCap)) {
2945         LOG(EXTENSION_LOG_WARNING,
2946                 "Enough MB of data loaded to enable traffic");
2947         return true;
2948     } else if (eviction_policy == VALUE_ONLY &&
2949                stats.warmedUpValues >=
2950                                (stats.warmedUpKeys * stats.warmupNumReadCap)) {
2951         // Let ep-engine think we're done with the warmup phase
2952         // (we should refactor this into "enableTraffic")
2953         LOG(EXTENSION_LOG_WARNING,
2954             "Enough number of items loaded to enable traffic");
2955         return true;
2956     } else if (eviction_policy == FULL_EVICTION &&
2957                stats.warmedUpValues >=
2958                             (warmupTask->getEstimatedItemCount() *
2959                              stats.warmupNumReadCap)) {
2960         // In case of FULL EVICTION, warmed up keys always matches the number
2961         // of warmed up values, therefore for honoring the min_item threshold
2962         // in this scenario, we can consider warmup's estimated item count.
2963         LOG(EXTENSION_LOG_WARNING,
2964             "Enough number of items loaded to enable traffic");
2965         return true;
2966     }
2967     return false;
2968 }
2969
2970 bool EventuallyPersistentStore::isWarmingUp() {
2971     return !warmupTask->isComplete();
2972 }
2973
2974 void EventuallyPersistentStore::stopWarmup(void)
2975 {
2976     // forcefully stop current warmup task
2977     if (isWarmingUp()) {
2978         LOG(EXTENSION_LOG_WARNING, "Stopping warmup while engine is loading "
2979             "data from underlying storage, shutdown = %s\n",
2980             stats.isShutdown ? "yes" : "no");
2981         warmupTask->stop();
2982     }
2983 }
2984
2985 bool EventuallyPersistentStore::isMemoryUsageTooHigh() {
2986     double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
2987     double maxSize = static_cast<double>(stats.getMaxDataSize());
2988     return memoryUsed > (maxSize * backfillMemoryThreshold);
2989 }
2990
2991 void EventuallyPersistentStore::setBackfillMemoryThreshold(
2992                                                 double threshold) {
2993     backfillMemoryThreshold = threshold;
2994 }
2995
2996 void EventuallyPersistentStore::setExpiryPagerSleeptime(size_t val) {
2997     LockHolder lh(expiryPager.mutex);
2998
2999     if (expiryPager.sleeptime != 0) {
3000         ExecutorPool::get()->cancel(expiryPager.task);
3001     }
3002
3003     expiryPager.sleeptime = val;
3004     if (val != 0) {
3005         ExTask expTask = new ExpiredItemPager(&engine, stats,
3006                                                 expiryPager.sleeptime);
3007         expiryPager.task = ExecutorPool::get()->schedule(expTask,
3008                                                         NONIO_TASK_IDX);
3009     }
3010 }
3011
3012 void EventuallyPersistentStore::enableAccessScannerTask() {
3013     LockHolder lh(accessScanner.mutex);
3014     if (!accessScanner.enabled) {
3015         accessScanner.enabled = true;
3016
3017         if (accessScanner.sleeptime != 0) {
3018             ExecutorPool::get()->cancel(accessScanner.task);
3019         }
3020
3021         size_t alogSleepTime = engine.getConfiguration().getAlogSleepTime();
3022         accessScanner.sleeptime = alogSleepTime * 60;
3023         if (accessScanner.sleeptime != 0) {
3024             ExTask task = new AccessScanner(*this, stats,
3025                                             Priority::AccessScannerPriority,
3026                                             accessScanner.sleeptime);
3027             accessScanner.task = ExecutorPool::get()->schedule(task,
3028                                                                AUXIO_TASK_IDX);
3029             struct timeval tv;
3030             gettimeofday(&tv, NULL);
3031             advance_tv(tv, accessScanner.sleeptime);
3032             stats.alogTime.store(tv.tv_sec);
3033         } else {
3034             LOG(EXTENSION_LOG_WARNING, "Did not enable access scanner task, "
3035                                        "as alog_sleep_time is set to zero!");
3036         }
3037     } else {
3038         LOG(EXTENSION_LOG_DEBUG, "Access scanner already enabled!");
3039     }
3040 }
3041
3042 void EventuallyPersistentStore::disableAccessScannerTask() {
3043     LockHolder lh(accessScanner.mutex);
3044     if (accessScanner.enabled) {
3045         ExecutorPool::get()->cancel(accessScanner.task);
3046         accessScanner.sleeptime = 0;
3047         accessScanner.enabled = false;
3048     } else {
3049         LOG(EXTENSION_LOG_DEBUG, "Access scanner already disabled!");
3050     }
3051 }
3052
3053 void EventuallyPersistentStore::setAccessScannerSleeptime(size_t val) {
3054     LockHolder lh(accessScanner.mutex);
3055
3056     if (accessScanner.enabled) {
3057         if (accessScanner.sleeptime != 0) {
3058             ExecutorPool::get()->cancel(accessScanner.task);
3059         }
3060
3061         // store sleeptime in seconds
3062         accessScanner.sleeptime = val * 60;
3063         if (accessScanner.sleeptime != 0) {
3064             ExTask task = new AccessScanner(*this, stats,
3065                                             Priority::AccessScannerPriority,
3066                                             accessScanner.sleeptime);
3067             accessScanner.task = ExecutorPool::get()->schedule(task,
3068                                                                AUXIO_TASK_IDX);
3069
3070             struct timeval tv;
3071             gettimeofday(&tv, NULL);
3072             advance_tv(tv, accessScanner.sleeptime);
3073             stats.alogTime.store(tv.tv_sec);
3074         }
3075     }
3076 }
3077
3078 void EventuallyPersistentStore::resetAccessScannerStartTime() {
3079     LockHolder lh(accessScanner.mutex);
3080
3081     if (accessScanner.enabled) {
3082         if (accessScanner.sleeptime != 0) {
3083             ExecutorPool::get()->cancel(accessScanner.task);
3084             // re-schedule task according to the new task start hour
3085             ExTask task = new AccessScanner(*this, stats,
3086                                             Priority::AccessScannerPriority,
3087                                             accessScanner.sleeptime);
3088             accessScanner.task = ExecutorPool::get()->schedule(task,
3089                                                                AUXIO_TASK_IDX);
3090
3091             struct timeval tv;
3092             gettimeofday(&tv, NULL);
3093             advance_tv(tv, accessScanner.sleeptime);
3094             stats.alogTime.store(tv.tv_sec);
3095         }
3096     }
3097 }
3098
3099 void EventuallyPersistentStore::visit(VBucketVisitor &visitor)
3100 {
3101     size_t maxSize = vbMap.getSize();
3102     cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
3103     for (size_t i = 0; i < maxSize; ++i) {
3104         uint16_t vbid = static_cast<uint16_t>(i);
3105         RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3106         if (vb) {
3107             bool wantData = visitor.visitBucket(vb);
3108             // We could've lost this along the way.
3109             if (wantData) {
3110                 vb->ht.visit(visitor);
3111             }
3112         }
3113     }
3114     visitor.complete();
3115 }
3116
3117 VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s,
3118                          shared_ptr<VBucketVisitor> v,
3119                          const char *l, const Priority &p, double sleep) :
3120     GlobalTask(&s->getEPEngine(), p, 0, false), store(s),
3121     visitor(v), label(l), sleepTime(sleep), currentvb(0)
3122 {
3123     const VBucketFilter &vbFilter = visitor->getVBucketFilter();
3124     size_t maxSize = store->vbMap.getSize();
3125     cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
3126     for (size_t i = 0; i < maxSize; ++i) {
3127         uint16_t vbid = static_cast<uint16_t>(i);
3128         RCPtr<VBucket> vb = store->vbMap.getBucket(vbid);
3129         if (vb && vbFilter(vbid)) {
3130             vbList.push(vbid);
3131         }
3132     }
3133 }
3134
3135 bool VBCBAdaptor::run(void) {
3136     if (!vbList.empty()) {
3137         currentvb = vbList.front();
3138         RCPtr<VBucket> vb = store->vbMap.getBucket(currentvb);
3139         if (vb) {
3140             if (visitor->pauseVisitor()) {
3141                 snooze(sleepTime);
3142                 return true;
3143             }
3144             if (visitor->visitBucket(vb)) {
3145                 vb->ht.visit(*visitor);
3146             }
3147         }
3148         vbList.pop();
3149     }
3150
3151     bool isdone = vbList.empty();
3152     if (isdone) {
3153         visitor->complete();
3154     }
3155     return !isdone;
3156 }
3157
3158 VBucketVisitorTask::VBucketVisitorTask(EventuallyPersistentStore *s,
3159                                        shared_ptr<VBucketVisitor> v,
3160                                        uint16_t sh, const char *l,
3161                                        double sleep, bool shutdown):
3162     GlobalTask(&(s->getEPEngine()), Priority::AccessScannerPriority,
3163                0, shutdown),
3164     store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
3165     shardID(sh)
3166 {
3167     const VBucketFilter &vbFilter = visitor->getVBucketFilter();
3168     std::vector<int> vbs = store->vbMap.getShard(shardID)->getVBuckets();
3169     cb_assert(vbs.size() <= std::numeric_limits<uint16_t>::max());
3170     std::vector<int>::iterator it;
3171     for (it = vbs.begin(); it != vbs.end(); ++it) {
3172         uint16_t vbid = static_cast<uint16_t>(*it);
3173         RCPtr<VBucket> vb = store->vbMap.getBucket(vbid);
3174         if (vb && vbFilter(vbid)) {
3175             vbList.push(vbid);
3176         }
3177     }
3178 }
3179
3180 bool VBucketVisitorTask::run() {
3181     if (!vbList.empty()) {
3182         currentvb = vbList.front();
3183         RCPtr<VBucket> vb = store->vbMap.getBucket(currentvb);
3184         if (vb) {
3185             if (visitor->pauseVisitor()) {
3186                 snooze(sleepTime);
3187                 return true;
3188             }
3189             if (visitor->visitBucket(vb)) {
3190                 vb->ht.visit(*visitor);
3191             }
3192         }
3193         vbList.pop();
3194     }
3195
3196     bool isDone = vbList.empty();
3197     if (isDone) {
3198         visitor->complete();
3199     }
3200     return !isDone;
3201 }
3202
3203 void EventuallyPersistentStore::resetUnderlyingStats(void)
3204 {
3205     for (size_t i = 0; i < vbMap.numShards; i++) {
3206         KVShard *shard = vbMap.shards[i];
3207         shard->getRWUnderlying()->resetStats();
3208         shard->getROUnderlying()->resetStats();
3209     }
3210
3211     for (size_t i = 0; i < MAX_TYPE_ID; i++) {
3212         stats.schedulingHisto[i].reset();
3213         stats.taskRuntimeHisto[i].reset();
3214     }
3215 }
3216
3217 void EventuallyPersistentStore::addKVStoreStats(ADD_STAT add_stat,
3218                                                 const void* cookie) {
3219     for (size_t i = 0; i < vbMap.numShards; i++) {
3220         std::stringstream rwPrefix;
3221         std::stringstream roPrefix;
3222         rwPrefix << "rw_" << i;
3223         roPrefix << "ro_" << i;
3224         vbMap.shards[i]->getRWUnderlying()->addStats(rwPrefix.str(), add_stat,
3225                                                      cookie);
3226         vbMap.shards[i]->getROUnderlying()->addStats(roPrefix.str(), add_stat,
3227                                                      cookie);
3228     }
3229 }
3230
3231 void EventuallyPersistentStore::addKVStoreTimingStats(ADD_STAT add_stat,
3232                                                       const void* cookie) {
3233     for (size_t i = 0; i < vbMap.numShards; i++) {
3234         std::stringstream rwPrefix;
3235         std::stringstream roPrefix;
3236         rwPrefix << "rw_" << i;
3237         roPrefix << "ro_" << i;
3238         vbMap.shards[i]->getRWUnderlying()->addTimingStats(rwPrefix.str(),
3239                                                            add_stat,
3240                                                            cookie);
3241         vbMap.shards[i]->getROUnderlying()->addTimingStats(roPrefix.str(),
3242                                                            add_stat,
3243                                                            cookie);
3244     }
3245 }
3246
3247 KVStore *EventuallyPersistentStore::getOneROUnderlying(void) {
3248     return vbMap.getShard(EP_PRIMARY_SHARD)->getROUnderlying();
3249 }
3250
3251 KVStore *EventuallyPersistentStore::getOneRWUnderlying(void) {
3252     return vbMap.getShard(EP_PRIMARY_SHARD)->getRWUnderlying();
3253 }
3254
3255 ENGINE_ERROR_CODE
3256 EventuallyPersistentStore::rollback(uint16_t vbid,
3257                                     uint64_t rollbackSeqno) {
3258     LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
3259     if (!lh.islocked()) {
3260         return ENGINE_TMPFAIL; // Reschedule a vbucket rollback task.
3261     }
3262
3263     if (rollbackSeqno != 0) {
3264         shared_ptr<RollbackCB> cb(new RollbackCB(engine));
3265         KVStore* rwUnderlying = vbMap.getShard(vbid)->getRWUnderlying();
3266         RollbackResult result = rwUnderlying->rollback(vbid, rollbackSeqno, cb);
3267
3268         if (result.success) {
3269             RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3270             vb->failovers->pruneEntries(result.highSeqno);
3271             vb->checkpointManager.clear(vb->getState());
3272             vb->checkpointManager.setBySeqno(result.highSeqno);
3273             vb->setCurrentSnapshot(result.snapStartSeqno, result.snapEndSeqno);
3274             return ENGINE_SUCCESS;
3275         }
3276     }
3277
3278     if (resetVBucket(vbid)) {
3279         return ENGINE_SUCCESS;
3280     }
3281     return ENGINE_NOT_MY_VBUCKET;
3282 }