11e6e8fd8dedd92eb941aa1772547d9c5a435bfd
[ep-engine.git] / src / ep.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <string.h>
21 #include <time.h>
22
23 #include <fstream>
24 #include <functional>
25 #include <iostream>
26 #include <map>
27 #include <sstream>
28 #include <string>
29 #include <utility>
30 #include <vector>
31
32 #include "access_scanner.h"
33 #include "checkpoint_remover.h"
34 #include "conflict_resolution.h"
35 #include "ep.h"
36 #include "ep_engine.h"
37 #include "failover-table.h"
38 #include "flusher.h"
39 #include "htresizer.h"
40 #include "kvshard.h"
41 #include "kvstore.h"
42 #include "locks.h"
43 #include "mutation_log.h"
44 #include "warmup.h"
45 #include "connmap.h"
46 #include "tapthrottle.h"
47
48 class StatsValueChangeListener : public ValueChangedListener {
49 public:
50     StatsValueChangeListener(EPStats &st) : stats(st) {
51         // EMPTY
52     }
53
54     virtual void sizeValueChanged(const std::string &key, size_t value) {
55         if (key.compare("max_size") == 0) {
56             stats.setMaxDataSize(value);
57             size_t low_wat = static_cast<size_t>
58                              (static_cast<double>(value) * 0.6);
59             size_t high_wat = static_cast<size_t>(
60                               static_cast<double>(value) * 0.75);
61             stats.mem_low_wat.store(low_wat);
62             stats.mem_high_wat.store(high_wat);
63         } else if (key.compare("mem_low_wat") == 0) {
64             stats.mem_low_wat.store(value);
65         } else if (key.compare("mem_high_wat") == 0) {
66             stats.mem_high_wat.store(value);
67         } else if (key.compare("tap_throttle_threshold") == 0) {
68             stats.tapThrottleThreshold.store(
69                                           static_cast<double>(value) / 100.0);
70         } else if (key.compare("warmup_min_memory_threshold") == 0) {
71             stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
72         } else if (key.compare("warmup_min_items_threshold") == 0) {
73             stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
74         } else {
75             LOG(EXTENSION_LOG_WARNING,
76                 "Failed to change value for unknown variable, %s\n",
77                 key.c_str());
78         }
79     }
80
81 private:
82     EPStats &stats;
83 };
84
85 /**
86  * A configuration value changed listener that responds to ep-engine
87  * parameter changes by invoking engine-specific methods on
88  * configuration change events.
89  */
90 class EPStoreValueChangeListener : public ValueChangedListener {
91 public:
92     EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
93     }
94
95     virtual void sizeValueChanged(const std::string &key, size_t value) {
96         if (key.compare("bg_fetch_delay") == 0) {
97             store.setBGFetchDelay(static_cast<uint32_t>(value));
98         } else if (key.compare("compaction_write_queue_cap") == 0) {
99             store.setCompactionWriteQueueCap(value);
100         } else if (key.compare("exp_pager_stime") == 0) {
101             store.setExpiryPagerSleeptime(value);
102         } else if (key.compare("alog_sleep_time") == 0) {
103             store.setAccessScannerSleeptime(value);
104         } else if (key.compare("alog_task_time") == 0) {
105             store.resetAccessScannerStartTime();
106         } else if (key.compare("mutation_mem_threshold") == 0) {
107             double mem_threshold = static_cast<double>(value) / 100;
108             StoredValue::setMutationMemoryThreshold(mem_threshold);
109         } else if (key.compare("backfill_mem_threshold") == 0) {
110             double backfill_threshold = static_cast<double>(value) / 100;
111             store.setBackfillMemoryThreshold(backfill_threshold);
112         } else if (key.compare("compaction_exp_mem_threshold") == 0) {
113             store.setCompactionExpMemThreshold(value);
114         } else if (key.compare("tap_throttle_queue_cap") == 0) {
115             store.getEPEngine().getTapThrottle().setQueueCap(value);
116         } else if (key.compare("tap_throttle_cap_pcnt") == 0) {
117             store.getEPEngine().getTapThrottle().setCapPercent(value);
118         } else {
119             LOG(EXTENSION_LOG_WARNING,
120                 "Failed to change value for unknown variable, %s\n",
121                 key.c_str());
122         }
123     }
124
125     virtual void booleanValueChanged(const std::string &key, bool value) {
126         if (key.compare("access_scanner_enabled") == 0) {
127             if (value) {
128                 store.enableAccessScannerTask();
129             } else {
130                 store.disableAccessScannerTask();
131             }
132         }
133     }
134
135 private:
136     EventuallyPersistentStore &store;
137 };
138
139 class VBucketMemoryDeletionTask : public GlobalTask {
140 public:
141     VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
142                               RCPtr<VBucket> &vb, double delay) :
143                               GlobalTask(&eng,
144                               Priority::VBMemoryDeletionPriority, delay, true),
145                               e(eng), vbucket(vb), vbid(vb->getId()) { }
146
147     std::string getDescription() {
148         std::stringstream ss;
149         ss << "Removing (dead) vbucket " << vbid << " from memory";
150         return ss.str();
151     }
152
153     bool run(void) {
154         vbucket->notifyAllPendingConnsFailed(e);
155         vbucket->ht.clear();
156         vbucket.reset();
157         return false;
158     }
159
160 private:
161     EventuallyPersistentEngine &e;
162     RCPtr<VBucket> vbucket;
163     uint16_t vbid;
164 };
165
166 class PendingOpsNotification : public GlobalTask {
167 public:
168     PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
169         GlobalTask(&e, Priority::VBMemoryDeletionPriority, 0, false),
170         engine(e), vbucket(vb) { }
171
172     std::string getDescription() {
173         std::stringstream ss;
174         ss << "Notify pending operations for vbucket " << vbucket->getId();
175         return ss.str();
176     }
177
178     bool run(void) {
179         vbucket->fireAllOps(engine);
180         return false;
181     }
182
183 private:
184     EventuallyPersistentEngine &engine;
185     RCPtr<VBucket> vbucket;
186 };
187
188 EventuallyPersistentStore::EventuallyPersistentStore(
189     EventuallyPersistentEngine &theEngine) :
190     engine(theEngine), stats(engine.getEpStats()),
191     vbMap(theEngine.getConfiguration(), *this),
192     bgFetchQueue(0),
193     diskFlushAll(false), bgFetchDelay(0), backfillMemoryThreshold(0.95),
194     statsSnapshotTaskId(0), lastTransTimePerItem(0)
195 {
196     cachedResidentRatio.activeRatio.store(0);
197     cachedResidentRatio.replicaRatio.store(0);
198
199     Configuration &config = engine.getConfiguration();
200     MutationLog *shardlog;
201     for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
202         std::stringstream s;
203         s << i;
204         shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
205                                  "." + s.str(),
206                                  engine.getConfiguration().getAlogBlockSize());
207         accessLog.push_back(shardlog);
208     }
209
210     storageProperties = new StorageProperties(true, true, true, true);
211
212     stats.schedulingHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
213     stats.taskRuntimeHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
214
215     for (size_t i = 0; i < MAX_TYPE_ID; i++) {
216         stats.schedulingHisto[i].reset();
217         stats.taskRuntimeHisto[i].reset();
218     }
219
220     ExecutorPool::get()->registerBucket(ObjectRegistry::getCurrentEngine());
221
222     size_t num_vbs = config.getMaxVbuckets();
223     vb_mutexes = new Mutex[num_vbs];
224     schedule_vbstate_persist = new AtomicValue<bool>[num_vbs];
225     for (size_t i = 0; i < num_vbs; ++i) {
226         schedule_vbstate_persist[i] = false;
227     }
228
229     stats.memOverhead = sizeof(EventuallyPersistentStore);
230
231     if (config.getConflictResolutionType().compare("seqno") == 0) {
232         conflictResolver = new SeqBasedResolution();
233     }
234
235     stats.setMaxDataSize(config.getMaxSize());
236     config.addValueChangedListener("max_size",
237                                    new StatsValueChangeListener(stats));
238
239     stats.mem_low_wat.store(config.getMemLowWat());
240     config.addValueChangedListener("mem_low_wat",
241                                    new StatsValueChangeListener(stats));
242
243     stats.mem_high_wat.store(config.getMemHighWat());
244     config.addValueChangedListener("mem_high_wat",
245                                    new StatsValueChangeListener(stats));
246
247     stats.tapThrottleThreshold.store(static_cast<double>
248                                     (config.getTapThrottleThreshold())
249                                      / 100.0);
250     config.addValueChangedListener("tap_throttle_threshold",
251                                    new StatsValueChangeListener(stats));
252
253     stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
254     config.addValueChangedListener("tap_throttle_queue_cap",
255                                    new EPStoreValueChangeListener(*this));
256     config.addValueChangedListener("tap_throttle_cap_pcnt",
257                                    new EPStoreValueChangeListener(*this));
258
259     setBGFetchDelay(config.getBgFetchDelay());
260     config.addValueChangedListener("bg_fetch_delay",
261                                    new EPStoreValueChangeListener(*this));
262
263     stats.warmupMemUsedCap.store(static_cast<double>
264                                (config.getWarmupMinMemoryThreshold()) / 100.0);
265     config.addValueChangedListener("warmup_min_memory_threshold",
266                                    new StatsValueChangeListener(stats));
267     stats.warmupNumReadCap.store(static_cast<double>
268                                 (config.getWarmupMinItemsThreshold()) / 100.0);
269     config.addValueChangedListener("warmup_min_items_threshold",
270                                    new StatsValueChangeListener(stats));
271
272     double mem_threshold = static_cast<double>
273                                       (config.getMutationMemThreshold()) / 100;
274     StoredValue::setMutationMemoryThreshold(mem_threshold);
275     config.addValueChangedListener("mutation_mem_threshold",
276                                    new EPStoreValueChangeListener(*this));
277
278     double backfill_threshold = static_cast<double>
279                                       (config.getBackfillMemThreshold()) / 100;
280     setBackfillMemoryThreshold(backfill_threshold);
281     config.addValueChangedListener("backfill_mem_threshold",
282                                    new EPStoreValueChangeListener(*this));
283
284     compactionExpMemThreshold = config.getCompactionExpMemThreshold();
285     config.addValueChangedListener("compaction_exp_mem_threshold",
286                                    new EPStoreValueChangeListener(*this));
287
288     compactionWriteQueueCap = config.getCompactionWriteQueueCap();
289     config.addValueChangedListener("compaction_write_queue_cap",
290                                    new EPStoreValueChangeListener(*this));
291
292     const std::string &policy = config.getItemEvictionPolicy();
293     if (policy.compare("value_only") == 0) {
294         eviction_policy = VALUE_ONLY;
295     } else {
296         eviction_policy = FULL_EVICTION;
297     }
298
299     warmupTask = new Warmup(this);
300 }
301
302 bool EventuallyPersistentStore::initialize() {
303     // We should nuke everything unless we want warmup
304     Configuration &config = engine.getConfiguration();
305     if (!config.isWarmup()) {
306         reset();
307     }
308
309     if (!startFlusher()) {
310         LOG(EXTENSION_LOG_WARNING,
311             "FATAL: Failed to create and start flushers");
312         return false;
313     }
314     if (!startBgFetcher()) {
315         LOG(EXTENSION_LOG_WARNING,
316            "FATAL: Failed to create and start bgfetchers");
317         return false;
318     }
319
320     warmupTask->start();
321
322     if (config.isFailpartialwarmup() && stats.warmOOM > 0) {
323         LOG(EXTENSION_LOG_WARNING,
324             "Warmup failed to load %d records due to OOM, exiting.\n",
325             static_cast<unsigned int>(stats.warmOOM));
326         return false;
327     }
328
329     itmpTask = new ItemPager(&engine, stats);
330     ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
331
332     size_t expiryPagerSleeptime = config.getExpPagerStime();
333     setExpiryPagerSleeptime(expiryPagerSleeptime);
334     config.addValueChangedListener("exp_pager_stime",
335                                    new EPStoreValueChangeListener(*this));
336
337     ExTask htrTask = new HashtableResizerTask(this, 10);
338     ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
339
340     size_t checkpointRemoverInterval = config.getChkRemoverStime();
341     chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
342                                                    checkpointRemoverInterval);
343     ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
344
345     ExTask vbSnapshotTask = new DaemonVBSnapshotTask(&engine);
346     ExecutorPool::get()->schedule(vbSnapshotTask, WRITER_TASK_IDX);
347
348     ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
349     ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
350
351     return true;
352 }
353
354 EventuallyPersistentStore::~EventuallyPersistentStore() {
355     stopWarmup();
356     stopBgFetcher();
357     ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX);
358
359     ExecutorPool::get()->cancel(statsSnapshotTaskId);
360     LockHolder lh(accessScanner.mutex);
361     ExecutorPool::get()->cancel(accessScanner.task);
362     lh.unlock();
363
364     stopFlusher();
365     ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine());
366
367     delete [] vb_mutexes;
368     delete [] schedule_vbstate_persist;
369     delete [] stats.schedulingHisto;
370     delete [] stats.taskRuntimeHisto;
371     delete conflictResolver;
372     delete warmupTask;
373     delete storageProperties;
374
375     std::vector<MutationLog*>::iterator it;
376     for (it = accessLog.begin(); it != accessLog.end(); it++) {
377         delete *it;
378     }
379 }
380
381 const Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
382     return vbMap.getShard(shardId)->getFlusher();
383 }
384
385 Warmup* EventuallyPersistentStore::getWarmup(void) const {
386     return warmupTask;
387 }
388
389 bool EventuallyPersistentStore::startFlusher() {
390     for (uint16_t i = 0; i < vbMap.numShards; ++i) {
391         Flusher *flusher = vbMap.shards[i]->getFlusher();
392         flusher->start();
393     }
394     return true;
395 }
396
397 void EventuallyPersistentStore::stopFlusher() {
398     for (uint16_t i = 0; i < vbMap.numShards; i++) {
399         Flusher *flusher = vbMap.shards[i]->getFlusher();
400         bool rv = flusher->stop(stats.forceShutdown);
401         if (rv && !stats.forceShutdown) {
402             flusher->wait();
403         }
404     }
405 }
406
407 bool EventuallyPersistentStore::pauseFlusher() {
408     bool rv = true;
409     for (uint16_t i = 0; i < vbMap.numShards; i++) {
410         Flusher *flusher = vbMap.shards[i]->getFlusher();
411         if (!flusher->pause()) {
412             LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
413                 "[%s], shard = %d", flusher->stateName(), i);
414             rv = false;
415         }
416     }
417     return rv;
418 }
419
420 bool EventuallyPersistentStore::resumeFlusher() {
421     bool rv = true;
422     for (uint16_t i = 0; i < vbMap.numShards; i++) {
423         Flusher *flusher = vbMap.shards[i]->getFlusher();
424         if (!flusher->resume()) {
425             LOG(EXTENSION_LOG_WARNING,
426                     "Warning: attempted to resume flusher in state [%s], "
427                     "shard = %d", flusher->stateName(), i);
428             rv = false;
429         }
430     }
431     return rv;
432 }
433
434 void EventuallyPersistentStore::wakeUpFlusher() {
435     if (stats.diskQueueSize.load() == 0) {
436         for (uint16_t i = 0; i < vbMap.numShards; i++) {
437             Flusher *flusher = vbMap.shards[i]->getFlusher();
438             flusher->wake();
439         }
440     }
441 }
442
443 bool EventuallyPersistentStore::startBgFetcher() {
444     for (uint16_t i = 0; i < vbMap.numShards; i++) {
445         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
446         if (bgfetcher == NULL) {
447             LOG(EXTENSION_LOG_WARNING,
448                 "Falied to start bg fetcher for shard %d", i);
449             return false;
450         }
451         bgfetcher->start();
452     }
453     return true;
454 }
455
456 void EventuallyPersistentStore::stopBgFetcher() {
457     for (uint16_t i = 0; i < vbMap.numShards; i++) {
458         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
459         if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
460             LOG(EXTENSION_LOG_WARNING,
461                 "Shutting down engine while there are still pending data "
462                 "read for shard %d from database storage", i);
463         }
464         LOG(EXTENSION_LOG_INFO, "Stopping bg fetcher for underlying storage");
465         bgfetcher->stop();
466     }
467 }
468
469 void
470 EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
471                                              time_t startTime,
472                                              uint64_t revSeqno) {
473     RCPtr<VBucket> vb = getVBucket(vbid);
474     if (vb) {
475         // Obtain reader access to the VB state change lock so that
476         // the VB can't switch state whilst we're processing
477         ReaderLockHolder rlh(vb->getStateLock());
478         if (vb->getState() == vbucket_state_active) {
479             int bucket_num(0);
480             incExpirationStat(vb);
481             LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
482             StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
483             if (v) {
484                 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
485                     // This is a temporary item whose background fetch for metadata
486                     // has completed.
487                     bool deleted = vb->ht.unlocked_del(key, bucket_num);
488                     cb_assert(deleted);
489                 } else if (v->isExpired(startTime) && !v->isDeleted()) {
490                     vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
491                     queueDirty(vb, v, &lh, false);
492                 }
493             } else {
494                 if (eviction_policy == FULL_EVICTION) {
495                     // Create a temp item and delete and push it
496                     // into the checkpoint queue.
497                     add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
498                                                                 eviction_policy);
499                     if (rv == ADD_NOMEM) {
500                         return;
501                     }
502                     v = vb->ht.unlocked_find(key, bucket_num, true, false);
503                     v->setStoredValueState(StoredValue::state_deleted_key);
504                     v->setRevSeqno(revSeqno);
505                     vb->ht.unlocked_softDelete(v, 0, eviction_policy);
506                     queueDirty(vb, v, &lh, false);
507                 }
508             }
509         }
510     }
511 }
512
513 void
514 EventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
515                                                         std::string> > &keys) {
516     std::list<std::pair<uint16_t, std::string> >::iterator it;
517     time_t startTime = ep_real_time();
518     for (it = keys.begin(); it != keys.end(); it++) {
519         deleteExpiredItem(it->first, it->second, startTime, 0);
520     }
521 }
522
523 StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
524                                                         const std::string &key,
525                                                         int bucket_num,
526                                                         bool wantDeleted,
527                                                         bool trackReference,
528                                                         bool queueExpired) {
529     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
530                                           trackReference);
531     if (v && !v->isDeleted() && !v->isTempItem()) {
532         // In the deleted case, we ignore expiration time.
533         if (v->isExpired(ep_real_time())) {
534             if (vb->getState() != vbucket_state_active) {
535                 return wantDeleted ? v : NULL;
536             }
537
538             // queueDirty only allowed on active VB
539             if (queueExpired && vb->getState() == vbucket_state_active) {
540                 incExpirationStat(vb, false);
541                 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
542                 queueDirty(vb, v, NULL, false, true);
543             }
544             return wantDeleted ? v : NULL;
545         }
546     }
547     return v;
548 }
549
550 protocol_binary_response_status EventuallyPersistentStore::evictKey(
551                                                         const std::string &key,
552                                                         uint16_t vbucket,
553                                                         const char **msg,
554                                                         size_t *msg_size,
555                                                         bool force) {
556     RCPtr<VBucket> vb = getVBucket(vbucket);
557     if (!vb || (vb->getState() != vbucket_state_active && !force)) {
558         return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
559     }
560
561     int bucket_num(0);
562     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
563     StoredValue *v = fetchValidValue(vb, key, bucket_num, force, false);
564
565     protocol_binary_response_status rv(PROTOCOL_BINARY_RESPONSE_SUCCESS);
566
567     *msg_size = 0;
568     if (v) {
569         if (force)  {
570             v->markClean();
571         }
572         if (v->isResident()) {
573             if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
574                 *msg = "Ejected.";
575             } else {
576                 *msg = "Can't eject: Dirty object.";
577                 rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
578             }
579         } else {
580             *msg = "Already ejected.";
581         }
582     } else {
583         if (eviction_policy == VALUE_ONLY) {
584             *msg = "Not found.";
585             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
586         } else {
587             *msg = "Already ejected.";
588         }
589     }
590
591     return rv;
592 }
593
594 ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
595                                                         LockHolder &lock,
596                                                         int bucket_num,
597                                                         const std::string &key,
598                                                         RCPtr<VBucket> &vb,
599                                                         const void *cookie,
600                                                         bool metadataOnly,
601                                                         bool isReplication) {
602
603     add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
604                                                 eviction_policy,
605                                                 isReplication);
606     switch(rv) {
607         case ADD_NOMEM:
608             return ENGINE_ENOMEM;
609         case ADD_EXISTS:
610         case ADD_UNDEL:
611         case ADD_SUCCESS:
612         case ADD_TMP_AND_BG_FETCH:
613             // Since the hashtable bucket is locked, we shouldn't get here
614             abort();
615         case ADD_BG_FETCH:
616             lock.unlock();
617             bgFetch(key, vb->getId(), -1, cookie, metadataOnly);
618     }
619     return ENGINE_EWOULDBLOCK;
620 }
621
622 ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
623                                                  const void *cookie,
624                                                  bool force,
625                                                  uint8_t nru) {
626
627     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
628     if (!vb) {
629         ++stats.numNotMyVBuckets;
630         return ENGINE_NOT_MY_VBUCKET;
631     }
632
633     // Obtain read-lock on VB state to ensure VB state changes are interlocked
634     // with this set
635     ReaderLockHolder rlh(vb->getStateLock());
636     if (vb->getState() == vbucket_state_dead) {
637         ++stats.numNotMyVBuckets;
638         return ENGINE_NOT_MY_VBUCKET;
639     } else if (vb->getState() == vbucket_state_replica && !force) {
640         ++stats.numNotMyVBuckets;
641         return ENGINE_NOT_MY_VBUCKET;
642     } else if (vb->getState() == vbucket_state_pending && !force) {
643         if (vb->addPendingOp(cookie)) {
644             return ENGINE_EWOULDBLOCK;
645         }
646     }
647
648     bool cas_op = (itm.getCas() != 0);
649     int bucket_num(0);
650     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
651     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
652                                           false);
653     if (v && v->isLocked(ep_current_time()) &&
654         (vb->getState() == vbucket_state_replica ||
655          vb->getState() == vbucket_state_pending)) {
656         v->unlock();
657     }
658     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(),
659                                                 true, false,
660                                                 eviction_policy, nru);
661
662     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
663     switch (mtype) {
664     case NOMEM:
665         ret = ENGINE_ENOMEM;
666         break;
667     case INVALID_CAS:
668     case IS_LOCKED:
669         ret = ENGINE_KEY_EEXISTS;
670         break;
671     case NOT_FOUND:
672         if (cas_op) {
673             ret = ENGINE_KEY_ENOENT;
674             break;
675         }
676         // FALLTHROUGH
677     case WAS_DIRTY:
678         // Even if the item was dirty, push it into the vbucket's open
679         // checkpoint.
680     case WAS_CLEAN:
681         queueDirty(vb, v, &lh);
682         break;
683     case NEED_BG_FETCH:
684         { // CAS operation with non-resident item + full eviction.
685             if (v) {
686                 // temp item is already created. Simply schedule a bg fetch job
687                 lh.unlock();
688                 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
689                 return ENGINE_EWOULDBLOCK;
690             }
691             ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
692                                         cookie, true);
693             break;
694         }
695     case INVALID_VBUCKET:
696         ret = ENGINE_NOT_MY_VBUCKET;
697         break;
698     }
699
700     return ret;
701 }
702
703 ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
704                                                  const void *cookie)
705 {
706     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
707     if (!vb) {
708         ++stats.numNotMyVBuckets;
709         return ENGINE_NOT_MY_VBUCKET;
710     }
711
712     // Obtain read-lock on VB state to ensure VB state changes are interlocked
713     // with this add
714     ReaderLockHolder rlh(vb->getStateLock());
715     if (vb->getState() == vbucket_state_dead ||
716         vb->getState() == vbucket_state_replica) {
717         ++stats.numNotMyVBuckets;
718         return ENGINE_NOT_MY_VBUCKET;
719     } else if(vb->getState() == vbucket_state_pending) {
720         if (vb->addPendingOp(cookie)) {
721             return ENGINE_EWOULDBLOCK;
722         }
723     }
724
725     if (itm.getCas() != 0) {
726         // Adding with a cas value doesn't make sense..
727         return ENGINE_NOT_STORED;
728     }
729
730     int bucket_num(0);
731     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
732     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
733                                           false);
734     add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
735                                            eviction_policy);
736
737     switch (atype) {
738     case ADD_NOMEM:
739         return ENGINE_ENOMEM;
740     case ADD_EXISTS:
741         return ENGINE_NOT_STORED;
742     case ADD_TMP_AND_BG_FETCH:
743         return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
744                                      cookie, true);
745     case ADD_BG_FETCH:
746         lh.unlock();
747         bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
748         return ENGINE_EWOULDBLOCK;
749     case ADD_SUCCESS:
750     case ADD_UNDEL:
751         queueDirty(vb, v, &lh);
752         break;
753     }
754     return ENGINE_SUCCESS;
755 }
756
757 ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
758                                                      const void *cookie) {
759     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
760     if (!vb) {
761         ++stats.numNotMyVBuckets;
762         return ENGINE_NOT_MY_VBUCKET;
763     }
764
765     // Obtain read-lock on VB state to ensure VB state changes are interlocked
766     // with this replace
767     ReaderLockHolder rlh(vb->getStateLock());
768     if (vb->getState() == vbucket_state_dead ||
769         vb->getState() == vbucket_state_replica) {
770         ++stats.numNotMyVBuckets;
771         return ENGINE_NOT_MY_VBUCKET;
772     } else if (vb->getState() == vbucket_state_pending) {
773         if (vb->addPendingOp(cookie)) {
774             return ENGINE_EWOULDBLOCK;
775         }
776     }
777
778     int bucket_num(0);
779     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
780     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
781                                           false);
782     if (v) {
783         if (v->isDeleted() || v->isTempDeletedItem() ||
784             v->isTempNonExistentItem()) {
785             return ENGINE_KEY_ENOENT;
786         }
787
788         mutation_type_t mtype;
789         if (eviction_policy == FULL_EVICTION && v->isTempInitialItem()) {
790             mtype = NEED_BG_FETCH;
791         } else {
792             mtype = vb->ht.unlocked_set(v, itm, 0, true, false, eviction_policy,
793                                         0xff);
794         }
795
796         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
797         switch (mtype) {
798             case NOMEM:
799                 ret = ENGINE_ENOMEM;
800                 break;
801             case IS_LOCKED:
802                 ret = ENGINE_KEY_EEXISTS;
803                 break;
804             case INVALID_CAS:
805             case NOT_FOUND:
806                 ret = ENGINE_NOT_STORED;
807                 break;
808                 // FALLTHROUGH
809             case WAS_DIRTY:
810                 // Even if the item was dirty, push it into the vbucket's open
811                 // checkpoint.
812             case WAS_CLEAN:
813                 queueDirty(vb, v, &lh);
814                 break;
815             case NEED_BG_FETCH:
816             {
817                 // temp item is already created. Simply schedule a bg fetch job
818                 lh.unlock();
819                 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
820                 ret = ENGINE_EWOULDBLOCK;
821                 break;
822             }
823             case INVALID_VBUCKET:
824                 ret = ENGINE_NOT_MY_VBUCKET;
825                 break;
826         }
827
828         return ret;
829     } else {
830         if (eviction_policy == VALUE_ONLY) {
831             return ENGINE_KEY_ENOENT;
832         }
833
834         return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
835                                      cookie, false);
836     }
837 }
838
839 ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
840                                                                 uint8_t nru,
841                                                                 bool genBySeqno) {
842
843     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
844     if (!vb) {
845         ++stats.numNotMyVBuckets;
846         return ENGINE_NOT_MY_VBUCKET;
847     }
848
849     // Obtain read-lock on VB state to ensure VB state changes are interlocked
850     // with this add-tapbackfill
851     ReaderLockHolder rlh(vb->getStateLock());
852     if (vb->getState() == vbucket_state_dead ||
853         vb->getState() == vbucket_state_active) {
854         ++stats.numNotMyVBuckets;
855         return ENGINE_NOT_MY_VBUCKET;
856     }
857
858     int bucket_num(0);
859     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
860     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
861                                           false);
862
863     // Note that this function is only called on replica or pending vbuckets.
864     if (v && v->isLocked(ep_current_time())) {
865         v->unlock();
866     }
867     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, 0, true, true,
868                                                 eviction_policy, nru);
869
870     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
871     switch (mtype) {
872     case NOMEM:
873         ret = ENGINE_ENOMEM;
874         break;
875     case INVALID_CAS:
876     case IS_LOCKED:
877         ret = ENGINE_KEY_EEXISTS;
878         break;
879     case WAS_DIRTY:
880         // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
881         // set correctly, and also the sequence numbers are ordered correctly.
882         // (MB-14003)
883     case NOT_FOUND:
884         // FALLTHROUGH
885     case WAS_CLEAN:
886         queueDirty(vb, v, &lh, true, true, genBySeqno);
887         break;
888     case INVALID_VBUCKET:
889         ret = ENGINE_NOT_MY_VBUCKET;
890         break;
891     case NEED_BG_FETCH:
892         // SET on a non-active vbucket should not require a bg_metadata_fetch.
893         abort();
894     }
895
896     return ret;
897 }
898
899 class KVStatsCallback : public Callback<kvstats_ctx> {
900     public:
901         KVStatsCallback(EventuallyPersistentStore *store)
902             : epstore(store) { }
903
904         void callback(kvstats_ctx &ctx) {
905             RCPtr<VBucket> vb = epstore->getVBucket(ctx.vbucket);
906             if (vb) {
907                 vb->fileSpaceUsed = ctx.fileSpaceUsed;
908                 vb->fileSize = ctx.fileSize;
909             }
910         }
911
912     private:
913         EventuallyPersistentStore *epstore;
914 };
915
916 void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
917                                                  uint16_t shardId) {
918
919     class VBucketStateVisitor : public VBucketVisitor {
920     public:
921         VBucketStateVisitor(VBucketMap &vb_map, uint16_t sid)
922             : vbuckets(vb_map), shardId(sid) { }
923         bool visitBucket(RCPtr<VBucket> &vb) {
924             if (vbuckets.getShard(vb->getId())->getId() == shardId) {
925                 uint64_t snapStart = 0;
926                 uint64_t snapEnd = 0;
927                 std::string failovers = vb->failovers->toJSON();
928                 uint64_t chkId = vbuckets.getPersistenceCheckpointId(vb->getId());
929
930                 vb->getCurrentSnapshot(snapStart, snapEnd);
931                 vbucket_state vb_state(vb->getState(), chkId, 0,
932                                        vb->getHighSeqno(), vb->getPurgeSeqno(),
933                                        snapStart, snapEnd, failovers);
934                 states.insert(std::pair<uint16_t, vbucket_state>(vb->getId(), vb_state));
935             }
936             return false;
937         }
938
939         void visit(StoredValue*) {
940             cb_assert(false); // this does not happen
941         }
942
943         std::map<uint16_t, vbucket_state> states;
944
945     private:
946         VBucketMap &vbuckets;
947         uint16_t shardId;
948     };
949
950     KVShard *shard = vbMap.shards[shardId];
951     if (priority == Priority::VBucketPersistLowPriority) {
952         shard->setLowPriorityVbSnapshotFlag(false);
953     } else {
954         shard->setHighPriorityVbSnapshotFlag(false);
955     }
956
957     KVStatsCallback kvcb(this);
958     VBucketStateVisitor v(vbMap, shard->getId());
959     visit(v);
960     hrtime_t start = gethrtime();
961
962     bool success = true;
963     vbucket_map_t::reverse_iterator iter = v.states.rbegin();
964     for (; iter != v.states.rend(); ++iter) {
965         LockHolder lh(vb_mutexes[iter->first], true /*tryLock*/);
966         if (!lh.islocked()) {
967             continue;
968         }
969         KVStore *rwUnderlying = getRWUnderlying(iter->first);
970         if (!rwUnderlying->snapshotVBucket(iter->first, iter->second,
971                     &kvcb)) {
972             LOG(EXTENSION_LOG_WARNING,
973                     "VBucket snapshot task failed!!! Rescheduling");
974             success = false;
975             break;
976         }
977
978         if (priority == Priority::VBucketPersistHighPriority) {
979             if (vbMap.setBucketCreation(iter->first, false)) {
980                 LOG(EXTENSION_LOG_INFO, "VBucket %d created", iter->first);
981             }
982         }
983     }
984
985     if (!success) {
986         scheduleVBSnapshot(priority, shard->getId());
987     } else {
988         stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
989     }
990 }
991
992 bool EventuallyPersistentStore::persistVBState(const Priority &priority,
993                                                uint16_t vbid) {
994     schedule_vbstate_persist[vbid] = false;
995
996     RCPtr<VBucket> vb = getVBucket(vbid);
997     if (!vb) {
998         LOG(EXTENSION_LOG_WARNING,
999             "VBucket %d not exist!!! vb_state persistence task failed!!!", vbid);
1000         return false;
1001     }
1002
1003     bool inverse = false;
1004     LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1005     if (!lh.islocked()) {
1006         if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
1007                                                                    true)) {
1008             return true;
1009         } else {
1010             return false;
1011         }
1012     }
1013
1014     KVStatsCallback kvcb(this);
1015     uint64_t chkId = vbMap.getPersistenceCheckpointId(vbid);
1016     std::string failovers = vb->failovers->toJSON();
1017     uint64_t snapStart = 0;
1018     uint64_t snapEnd = 0;
1019
1020     vb->getCurrentSnapshot(snapStart, snapEnd);
1021     vbucket_state vb_state(vb->getState(), chkId, 0, vb->getHighSeqno(),
1022                            vb->getPurgeSeqno(), snapStart, snapEnd, failovers);
1023
1024     KVStore *rwUnderlying = getRWUnderlying(vbid);
1025     if (rwUnderlying->snapshotVBucket(vbid, vb_state, &kvcb)) {
1026         if (vbMap.setBucketCreation(vbid, false)) {
1027             LOG(EXTENSION_LOG_INFO, "VBucket %d created", vbid);
1028         }
1029     } else {
1030         LOG(EXTENSION_LOG_WARNING,
1031             "VBucket %d: vb_state persistence task failed!!! Rescheduling", vbid);
1032
1033         if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
1034                                                                    true)) {
1035             return true;
1036         } else {
1037             return false;
1038         }
1039     }
1040     return false;
1041 }
1042
1043 ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
1044                                                            vbucket_state_t to,
1045                                                            bool transfer,
1046                                                            bool notify_dcp) {
1047     // Lock to prevent a race condition between a failed update and add.
1048     LockHolder lh(vbsetMutex);
1049     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1050     if (vb && to == vb->getState()) {
1051         return ENGINE_SUCCESS;
1052     }
1053
1054     if (vb) {
1055         vbucket_state_t oldstate = vb->getState();
1056         if (oldstate != to && notify_dcp) {
1057             engine.getDcpConnMap().vbucketStateChanged(vbid, to);
1058         }
1059
1060         vb->setState(to, engine.getServerApi());
1061         if (to == vbucket_state_active && !transfer) {
1062             uint64_t snapStart = 0;
1063             uint64_t snapEnd = 0;
1064             vb->getCurrentSnapshot(snapStart, snapEnd);
1065             if (snapEnd == vbMap.getPersistenceSeqno(vbid)) {
1066                 vb->failovers->createEntry(snapEnd);
1067             } else {
1068                 vb->failovers->createEntry(snapStart);
1069             }
1070         }
1071
1072         lh.unlock();
1073         if (oldstate == vbucket_state_pending &&
1074             to == vbucket_state_active) {
1075             ExTask notifyTask = new PendingOpsNotification(engine, vb);
1076             ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
1077         }
1078         scheduleVBStatePersist(Priority::VBucketPersistLowPriority, vbid);
1079     } else {
1080         FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
1081         RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
1082                                          engine.getCheckpointConfig(),
1083                                          vbMap.getShard(vbid), 0, 0, 0, ft));
1084         // The first checkpoint for active vbucket should start with id 2.
1085         uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
1086         newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
1087         if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
1088             lh.unlock();
1089             return ENGINE_ERANGE;
1090         }
1091         vbMap.setPersistenceCheckpointId(vbid, 0);
1092         vbMap.setPersistenceSeqno(vbid, 0);
1093         vbMap.setBucketCreation(vbid, true);
1094         lh.unlock();
1095         scheduleVBStatePersist(Priority::VBucketPersistHighPriority, vbid);
1096     }
1097     return ENGINE_SUCCESS;
1098 }
1099
1100 bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
1101     KVShard *shard = NULL;
1102     if (p == Priority::VBucketPersistHighPriority) {
1103         for (size_t i = 0; i < vbMap.numShards; ++i) {
1104             shard = vbMap.shards[i];
1105             if (shard->setHighPriorityVbSnapshotFlag(true)) {
1106                 ExTask task = new VBSnapshotTask(&engine, p, i, false);
1107                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1108             }
1109         }
1110     } else {
1111         for (size_t i = 0; i < vbMap.numShards; ++i) {
1112             shard = vbMap.shards[i];
1113             if (shard->setLowPriorityVbSnapshotFlag(true)) {
1114                 ExTask task = new VBSnapshotTask(&engine, p, i, false);
1115                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1116             }
1117         }
1118     }
1119     if (stats.isShutdown) {
1120         return false;
1121     }
1122     return true;
1123 }
1124
1125 void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p,
1126                                                    uint16_t shardId,
1127                                                    bool force) {
1128     KVShard *shard = vbMap.shards[shardId];
1129     if (p == Priority::VBucketPersistHighPriority) {
1130         if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
1131             ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
1132             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1133         }
1134     } else {
1135         if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
1136             ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
1137             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1138         }
1139     }
1140 }
1141
1142 void EventuallyPersistentStore::scheduleVBStatePersist(const Priority &priority,
1143                                                        uint16_t vbid,
1144                                                        bool force) {
1145     bool inverse = false;
1146     if (force ||
1147         schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
1148         ExTask task = new VBStatePersistTask(&engine, priority, vbid, false);
1149         ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1150     }
1151 }
1152
1153 bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
1154                                                         const void* cookie) {
1155     LockHolder lh(vbsetMutex);
1156
1157     hrtime_t start_time(gethrtime());
1158     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1159     if (!vb || vb->getState() == vbucket_state_dead ||
1160          vbMap.isBucketDeletion(vbid)) {
1161         lh.unlock();
1162         LockHolder vlh(vb_mutexes[vbid]);
1163         getRWUnderlying(vbid)->delVBucket(vbid);
1164         vbMap.setBucketDeletion(vbid, false);
1165         vbMap.setPersistenceSeqno(vbid, 0);
1166         ++stats.vbucketDeletions;
1167     }
1168
1169     hrtime_t spent(gethrtime() - start_time);
1170     hrtime_t wall_time = spent / 1000;
1171     BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
1172     stats.diskVBDelHisto.add(wall_time);
1173     atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
1174     stats.vbucketDelTotWalltime.fetch_add(wall_time);
1175     if (cookie) {
1176         engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1177     }
1178
1179     return true;
1180 }
1181
1182 void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
1183                                                    const void* cookie,
1184                                                    double delay) {
1185     ExTask delTask = new VBucketMemoryDeletionTask(engine, vb, delay);
1186     ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
1187
1188     if (vbMap.setBucketDeletion(vb->getId(), true)) {
1189         ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie,
1190                                        Priority::VBucketDeletionPriority);
1191         ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1192     }
1193 }
1194
1195 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteVBucket(uint16_t vbid,
1196                                                            const void* c) {
1197     // Lock to prevent a race condition between a failed update and add
1198     // (and delete).
1199     LockHolder lh(vbsetMutex);
1200
1201     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1202     if (!vb) {
1203         return ENGINE_NOT_MY_VBUCKET;
1204     }
1205
1206     engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
1207     vbMap.removeBucket(vbid);
1208     lh.unlock();
1209     scheduleVBDeletion(vb, c);
1210     if (c) {
1211         return ENGINE_EWOULDBLOCK;
1212     }
1213     return ENGINE_SUCCESS;
1214 }
1215
1216 ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
1217                                                        compaction_ctx c,
1218                                                        const void *cookie) {
1219     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1220     if (!vb) {
1221         return ENGINE_NOT_MY_VBUCKET;
1222     }
1223
1224     LockHolder lh(compactionLock);
1225     ExTask task = new CompactVBucketTask(&engine, Priority::CompactorPriority,
1226                                          vbid, c, cookie);
1227     compactionTasks.push_back(std::make_pair(vbid, task));
1228     if (compactionTasks.size() > 1) {
1229         if ((stats.diskQueueSize > compactionWriteQueueCap &&
1230             compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
1231             engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
1232             // Snooze a new compaction task.
1233             // We will wake it up when one of the existing compaction tasks is done.
1234             task->snooze(60);
1235         }
1236     }
1237
1238     ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1239
1240     LOG(EXTENSION_LOG_DEBUG, "Scheduled compaction task %d on vbucket %d,"
1241         "purge_before_ts = %lld, purge_before_seq = %lld, dropdeletes = %d",
1242         task->getId(), vbid, c.purge_before_ts,
1243         c.purge_before_seq, c.drop_deletes);
1244
1245    return ENGINE_EWOULDBLOCK;
1246 }
1247
1248 class ExpiredItemsCallback : public Callback<compaction_ctx> {
1249     public:
1250         ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid)
1251             : epstore(store), vbucket(vbid) { }
1252
1253         void callback(compaction_ctx &ctx) {
1254             std::list<expiredItemCtx>::iterator it;
1255             for (it  = ctx.expiredItems.begin();
1256                  it != ctx.expiredItems.end(); it++) {
1257                 if (epstore->compactionCanExpireItems()) {
1258                     epstore->deleteExpiredItem(vbucket, it->keyStr,
1259                                                ctx.curr_time,
1260                                                it->revSeqno);
1261                 }
1262             }
1263         }
1264
1265     private:
1266         EventuallyPersistentStore *epstore;
1267         uint16_t vbucket;
1268 };
1269
1270 bool EventuallyPersistentStore::compactVBucket(const uint16_t vbid,
1271                                                compaction_ctx *ctx,
1272                                                const void *cookie) {
1273     ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1274     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1275     if (vb) {
1276         LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1277         if (!lh.islocked()) {
1278             return true; // Schedule a compaction task again.
1279         }
1280
1281         if (vb->getState() == vbucket_state_active) {
1282             // Set the current time ONLY for active vbuckets.
1283             ctx->curr_time = ep_real_time();
1284         } else {
1285             ctx->curr_time = 0;
1286         }
1287         ExpiredItemsCallback cb(this, vbid);
1288         KVStatsCallback kvcb(this);
1289         getRWUnderlying(vbid)->compactVBucket(vbid, ctx, cb, kvcb);
1290         vb->setPurgeSeqno(ctx->max_purged_seq);
1291     } else {
1292         err = ENGINE_NOT_MY_VBUCKET;
1293         engine.storeEngineSpecific(cookie, NULL);
1294         //Decrement session counter here, as memcached thread wouldn't
1295         //visit the engine interface in case of a NOT_MY_VB notification
1296         engine.decrementSessionCtr();
1297
1298     }
1299
1300     LockHolder lh(compactionLock);
1301     bool erased = false, woke = false;
1302     std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
1303     while (it != compactionTasks.end()) {
1304         if ((*it).first == vbid) {
1305             it = compactionTasks.erase(it);
1306             erased = true;
1307         } else {
1308             ExTask &task = (*it).second;
1309             if (task->getState() == TASK_SNOOZED) {
1310                 ExecutorPool::get()->wake(task->getId());
1311                 woke = true;
1312             }
1313             ++it;
1314         }
1315         if (erased && woke) {
1316             break;
1317         }
1318     }
1319     lh.unlock();
1320
1321     if (cookie) {
1322         engine.notifyIOComplete(cookie, err);
1323     }
1324     --stats.pendingCompactions;
1325     return false;
1326 }
1327
1328 bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
1329     LockHolder lh(vbsetMutex);
1330     bool rv(false);
1331
1332     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1333     if (vb) {
1334         vbucket_state_t vbstate = vb->getState();
1335
1336         vbMap.removeBucket(vbid);
1337         lh.unlock();
1338
1339         std::list<std::string> tap_cursors = vb->checkpointManager.
1340                                              getTAPCursorNames();
1341         // Delete and recreate the vbucket database file
1342         scheduleVBDeletion(vb, NULL, 0);
1343         setVBucketState(vbid, vbstate, false);
1344
1345         // Copy the all cursors from the old vbucket into the new vbucket
1346         RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
1347         newvb->checkpointManager.resetTAPCursors(tap_cursors);
1348
1349         rv = true;
1350     }
1351     return rv;
1352 }
1353
1354 extern "C" {
1355
1356     typedef struct {
1357         EventuallyPersistentEngine* engine;
1358         std::map<std::string, std::string> smap;
1359     } snapshot_stats_t;
1360
1361     static void add_stat(const char *key, const uint16_t klen,
1362                          const char *val, const uint32_t vlen,
1363                          const void *cookie) {
1364         cb_assert(cookie);
1365         void *ptr = const_cast<void *>(cookie);
1366         snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1367         ObjectRegistry::onSwitchThread(snap->engine);
1368
1369         std::string k(key, klen);
1370         std::string v(val, vlen);
1371         snap->smap.insert(std::pair<std::string, std::string>(k, v));
1372     }
1373 }
1374
1375 void EventuallyPersistentStore::snapshotStats() {
1376     snapshot_stats_t snap;
1377     snap.engine = &engine;
1378     std::map<std::string, std::string>  smap;
1379     bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1380               engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1381               engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS;
1382
1383     if (rv && stats.isShutdown) {
1384         snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1385                                                               "true" : "false";
1386         std::stringstream ss;
1387         ss << ep_real_time();
1388         snap.smap["ep_shutdown_time"] = ss.str();
1389     }
1390     getOneRWUnderlying()->snapshotStats(snap.smap);
1391 }
1392
1393 void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
1394                                               const hrtime_t start,
1395                                               const hrtime_t stop) {
1396     if (stop >= start && start >= init) {
1397         // skip the measurement if the counter wrapped...
1398         ++stats.bgNumOperations;
1399         hrtime_t w = (start - init) / 1000;
1400         BlockTimer::log(start - init, "bgwait", stats.timingLog);
1401         stats.bgWaitHisto.add(w);
1402         stats.bgWait.fetch_add(w);
1403         atomic_setIfLess(stats.bgMinWait, w);
1404         atomic_setIfBigger(stats.bgMaxWait, w);
1405
1406         hrtime_t l = (stop - start) / 1000;
1407         BlockTimer::log(stop - start, "bgload", stats.timingLog);
1408         stats.bgLoadHisto.add(l);
1409         stats.bgLoad.fetch_add(l);
1410         atomic_setIfLess(stats.bgMinLoad, l);
1411         atomic_setIfBigger(stats.bgMaxLoad, l);
1412     }
1413 }
1414
1415 void EventuallyPersistentStore::completeBGFetch(const std::string &key,
1416                                                 uint16_t vbucket,
1417                                                 uint64_t rowid,
1418                                                 const void *cookie,
1419                                                 hrtime_t init,
1420                                                 bool isMeta) {
1421     hrtime_t start(gethrtime());
1422     // Go find the data
1423     RememberingCallback<GetValue> gcb;
1424     if (isMeta) {
1425         gcb.val.setPartial();
1426         ++stats.bg_meta_fetched;
1427     } else {
1428         ++stats.bg_fetched;
1429     }
1430     getROUnderlying(vbucket)->get(key, rowid, vbucket, gcb);
1431     gcb.waitForValue();
1432     cb_assert(gcb.fired);
1433     ENGINE_ERROR_CODE status = gcb.val.getStatus();
1434
1435     // Lock to prevent a race condition between a fetch for restore and delete
1436     LockHolder lh(vbsetMutex);
1437
1438     RCPtr<VBucket> vb = getVBucket(vbucket);
1439     if (vb) {
1440         ReaderLockHolder rlh(vb->getStateLock());
1441         int bucket_num(0);
1442         LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1443         StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1444         if (isMeta) {
1445             if ((v && v->unlocked_restoreMeta(gcb.val.getValue(),
1446                                               gcb.val.getStatus(), vb->ht))
1447                 || ENGINE_KEY_ENOENT == status) {
1448                 /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1449                  key is removed from hash table by the time bgfetch returns
1450                  (in case multiple bgfetch is scheduled for a key), we still
1451                  need to return ENGINE_SUCCESS to the memcached worker thread,
1452                  so that the worker thread can visit the ep-engine and figure
1453                  out the correct flow */
1454                 status = ENGINE_SUCCESS;
1455             }
1456         } else {
1457             bool restore = false;
1458             if (v && v->isResident()) {
1459                 status = ENGINE_SUCCESS;
1460             } else {
1461                 switch (eviction_policy) {
1462                     case VALUE_ONLY:
1463                         if (v && !v->isResident() && !v->isDeleted()) {
1464                             restore = true;
1465                         }
1466                         break;
1467                     case FULL_EVICTION:
1468                         if (v) {
1469                             if (v->isTempInitialItem() ||
1470                                 (!v->isResident() && !v->isDeleted())) {
1471                                 restore = true;
1472                             }
1473                         }
1474                         break;
1475                     default:
1476                         throw std::logic_error("Unknown eviction policy");
1477                 }
1478             }
1479
1480             if (restore) {
1481                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1482                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1483                     cb_assert(v->isResident());
1484                     if (vb->getState() == vbucket_state_active &&
1485                         v->getExptime() != gcb.val.getValue()->getExptime() &&
1486                         v->getCas() == gcb.val.getValue()->getCas()) {
1487                         // MB-9306: It is possible that by the time bgfetcher
1488                         // returns, the item may have been updated and queued
1489                         // Hence test the CAS value to be the same first.
1490                         // exptime mutated, schedule it into new checkpoint
1491                         queueDirty(vb, v, &hlh);
1492                     }
1493                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1494                     v->setStoredValueState(
1495                                           StoredValue::state_non_existent_key);
1496                     if (eviction_policy == FULL_EVICTION) {
1497                         // For the full eviction, we should notify
1498                         // ENGINE_SUCCESS to the memcached worker thread, so
1499                         // that the worker thread can visit the ep-engine and
1500                         // figure out the correct error code.
1501                         status = ENGINE_SUCCESS;
1502                     }
1503                 } else {
1504                     // underlying kvstore couldn't fetch requested data
1505                     // log returned error and notify TMPFAIL to client
1506                     LOG(EXTENSION_LOG_WARNING,
1507                         "Warning: failed background fetch for vb=%d seq=%d "
1508                         "key=%s", vbucket, v->getBySeqno(), key.c_str());
1509                     status = ENGINE_TMPFAIL;
1510                 }
1511             }
1512         }
1513     } else {
1514         LOG(EXTENSION_LOG_INFO, "VBucket %d's file was deleted in the middle of"
1515             " a bg fetch for key %s\n", vbucket, key.c_str());
1516         status = ENGINE_NOT_MY_VBUCKET;
1517     }
1518
1519     lh.unlock();
1520
1521     hrtime_t stop = gethrtime();
1522     updateBGStats(init, start, stop);
1523     bgFetchQueue--;
1524
1525     delete gcb.val.getValue();
1526     engine.notifyIOComplete(cookie, status);
1527 }
1528
1529 void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
1530                                  std::vector<bgfetched_item_t> &fetchedItems,
1531                                  hrtime_t startTime)
1532 {
1533     RCPtr<VBucket> vb = getVBucket(vbId);
1534     if (!vb) {
1535         std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1536         for (; itemItr != fetchedItems.end(); ++itemItr) {
1537             engine.notifyIOComplete((*itemItr).second->cookie,
1538                                     ENGINE_NOT_MY_VBUCKET);
1539         }
1540         LOG(EXTENSION_LOG_WARNING,
1541             "EP Store completes %d of batched background fetch for "
1542             "for vBucket = %d that is already deleted\n",
1543             (int)fetchedItems.size(), vbId);
1544         return;
1545     }
1546
1547     std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1548     for (; itemItr != fetchedItems.end(); ++itemItr) {
1549         VBucketBGFetchItem *bgitem = (*itemItr).second;
1550         ENGINE_ERROR_CODE status = bgitem->value.getStatus();
1551         Item *fetchedValue = bgitem->value.getValue();
1552         const std::string &key = (*itemItr).first;
1553         {   // locking scope
1554             ReaderLockHolder rlh(vb->getStateLock());
1555
1556             int bucket = 0;
1557             LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
1558             StoredValue *v = fetchValidValue(vb, key, bucket, true);
1559             if (bgitem->metaDataOnly) {
1560                 if ((v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht))
1561                     || ENGINE_KEY_ENOENT == status) {
1562                     /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1563                      key is removed from hash table by the time bgfetch returns
1564                      (in case multiple bgfetch is scheduled for a key), we still
1565                      need to return ENGINE_SUCCESS to the memcached worker thread,
1566                      so that the worker thread can visit the ep-engine and figure
1567                      out the correct flow */
1568                     status = ENGINE_SUCCESS;
1569                 }
1570             } else {
1571                 bool restore = false;
1572                 if (v && v->isResident()) {
1573                     status = ENGINE_SUCCESS;
1574                 } else {
1575                     switch (eviction_policy) {
1576                         case VALUE_ONLY:
1577                             if (v && !v->isResident() && !v->isDeleted()) {
1578                                 restore = true;
1579                             }
1580                             break;
1581                         case FULL_EVICTION:
1582                             if (v) {
1583                                 if (v->isTempInitialItem() ||
1584                                     (!v->isResident() && !v->isDeleted())) {
1585                                     restore = true;
1586                                 }
1587                             }
1588                             break;
1589                         default:
1590                             throw std::logic_error("Unknown eviction policy");
1591                     }
1592                 }
1593
1594                 if (restore) {
1595                     if (status == ENGINE_SUCCESS) {
1596                         v->unlocked_restoreValue(fetchedValue, vb->ht);
1597                         cb_assert(v->isResident());
1598                         if (vb->getState() == vbucket_state_active &&
1599                             v->getExptime() != fetchedValue->getExptime() &&
1600                             v->getCas() == fetchedValue->getCas()) {
1601                             // MB-9306: It is possible that by the time
1602                             // bgfetcher returns, the item may have been
1603                             // updated and queued
1604                             // Hence test the CAS value to be the same first.
1605                             // exptime mutated, schedule it into new checkpoint
1606                             queueDirty(vb, v, &blh);
1607                         }
1608                     } else if (status == ENGINE_KEY_ENOENT) {
1609                         v->setStoredValueState(StoredValue::state_non_existent_key);
1610                         if (eviction_policy == FULL_EVICTION) {
1611                             // For the full eviction, we should notify
1612                             // ENGINE_SUCCESS to the memcached worker thread,
1613                             // so that the worker thread can visit the
1614                             // ep-engine and figure out the correct error
1615                             // code.
1616                             status = ENGINE_SUCCESS;
1617                         }
1618                     } else {
1619                         // underlying kvstore couldn't fetch requested data
1620                         // log returned error and notify TMPFAIL to client
1621                         LOG(EXTENSION_LOG_WARNING,
1622                             "Warning: failed background fetch for vb=%d "
1623                             "key=%s", vbId, key.c_str());
1624                         status = ENGINE_TMPFAIL;
1625                     }
1626                 }
1627             }
1628         } // locking scope ends
1629
1630         if (bgitem->metaDataOnly) {
1631             ++stats.bg_meta_fetched;
1632         } else {
1633             ++stats.bg_fetched;
1634         }
1635
1636         hrtime_t endTime = gethrtime();
1637         updateBGStats(bgitem->initTime, startTime, endTime);
1638         engine.notifyIOComplete(bgitem->cookie, status);
1639     }
1640
1641     LOG(EXTENSION_LOG_DEBUG,
1642         "EP Store completes %d of batched background fetch "
1643         "for vBucket = %d endTime = %lld\n",
1644         fetchedItems.size(), vbId, gethrtime()/1000000);
1645 }
1646
1647 void EventuallyPersistentStore::bgFetch(const std::string &key,
1648                                         uint16_t vbucket,
1649                                         uint64_t rowid,
1650                                         const void *cookie,
1651                                         bool isMeta) {
1652     std::stringstream ss;
1653
1654     if (multiBGFetchEnabled()) {
1655         RCPtr<VBucket> vb = getVBucket(vbucket);
1656         cb_assert(vb);
1657         KVShard *myShard = vbMap.getShard(vbucket);
1658
1659         // schedule to the current batch of background fetch of the given
1660         // vbucket
1661         VBucketBGFetchItem * fetchThis = new VBucketBGFetchItem(cookie,
1662                                                                 isMeta);
1663         vb->queueBGFetchItem(key, fetchThis, myShard->getBgFetcher());
1664         myShard->getBgFetcher()->notifyBGEvent();
1665         ss << "Queued a background fetch, now at "
1666            << vb->numPendingBGFetchItems() << std::endl;
1667         LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1668     } else {
1669         bgFetchQueue++;
1670         stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
1671                                             bgFetchQueue.load());
1672         ExecutorPool* iom = ExecutorPool::get();
1673         ExTask task = new BGFetchTask(&engine, key, vbucket, rowid, cookie,
1674                                       isMeta,
1675                                       Priority::BgFetcherGetMetaPriority,
1676                                       bgFetchDelay, false);
1677         iom->schedule(task, READER_TASK_IDX);
1678         ss << "Queued a background fetch, now at " << bgFetchQueue.load()
1679            << std::endl;
1680         LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1681     }
1682 }
1683
1684 GetValue EventuallyPersistentStore::getInternal(const std::string &key,
1685                                                 uint16_t vbucket,
1686                                                 const void *cookie,
1687                                                 bool queueBG,
1688                                                 bool honorStates,
1689                                                 vbucket_state_t allowedState,
1690                                                 bool trackReference) {
1691
1692     vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1693         vbucket_state_replica : vbucket_state_active;
1694     RCPtr<VBucket> vb = getVBucket(vbucket);
1695     if (!vb) {
1696         ++stats.numNotMyVBuckets;
1697         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1698     }
1699
1700     ReaderLockHolder rlh(vb->getStateLock());
1701     if (honorStates && vb->getState() == vbucket_state_dead) {
1702         ++stats.numNotMyVBuckets;
1703         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1704     } else if (honorStates && vb->getState() == disallowedState) {
1705         ++stats.numNotMyVBuckets;
1706         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1707     } else if (honorStates && vb->getState() == vbucket_state_pending) {
1708         if (vb->addPendingOp(cookie)) {
1709             return GetValue(NULL, ENGINE_EWOULDBLOCK);
1710         }
1711     }
1712
1713     int bucket_num(0);
1714     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1715     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
1716                                      trackReference);
1717     if (v) {
1718         if (v->isDeleted() || v->isTempDeletedItem() ||
1719             v->isTempNonExistentItem()) {
1720             GetValue rv;
1721             return rv;
1722         }
1723         // If the value is not resident, wait for it...
1724         if (!v->isResident()) {
1725             if (queueBG) {
1726                 bgFetch(key, vbucket, v->getBySeqno(), cookie);
1727             }
1728             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
1729                             true, v->getNRUValue());
1730         }
1731
1732         GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
1733                     ENGINE_SUCCESS, v->getBySeqno(), false, v->getNRUValue());
1734         return rv;
1735     } else {
1736         if (eviction_policy == VALUE_ONLY || diskFlushAll) {
1737             GetValue rv;
1738             return rv;
1739         }
1740         ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1741         if (queueBG) { // Full eviction and need a bg fetch.
1742             ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
1743                                        cookie, false);
1744         }
1745         return GetValue(NULL, ec, -1, true);
1746     }
1747 }
1748
1749 GetValue EventuallyPersistentStore::getRandomKey() {
1750     long max = vbMap.getSize();
1751
1752     long start = random() % max;
1753     long curr = start;
1754     Item *itm = NULL;
1755
1756     while (itm == NULL) {
1757         RCPtr<VBucket> vb = getVBucket(curr++);
1758         while (!vb || vb->getState() != vbucket_state_active) {
1759             if (curr == start) {
1760                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1761             }
1762             if (curr == max) {
1763                 curr = 0;
1764             }
1765
1766             vb = getVBucket(curr++);
1767         }
1768
1769         if ((itm = vb->ht.getRandomKey(random())) != NULL) {
1770             GetValue rv(itm, ENGINE_SUCCESS);
1771             return rv;
1772         }
1773
1774         if (curr == max) {
1775             curr = 0;
1776         }
1777
1778         if (curr == start) {
1779             return GetValue(NULL, ENGINE_KEY_ENOENT);
1780         }
1781         // Search next vbucket
1782     }
1783
1784     return GetValue(NULL, ENGINE_KEY_ENOENT);
1785 }
1786
1787
1788 ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
1789                                                         const std::string &key,
1790                                                         uint16_t vbucket,
1791                                                         const void *cookie,
1792                                                         ItemMetaData &metadata,
1793                                                         uint32_t &deleted,
1794                                                         bool trackReferenced)
1795 {
1796     (void) cookie;
1797     RCPtr<VBucket> vb = getVBucket(vbucket);
1798     if (!vb) {
1799         ++stats.numNotMyVBuckets;
1800         return ENGINE_NOT_MY_VBUCKET;
1801     }
1802
1803     ReaderLockHolder rlh(vb->getStateLock());
1804     if (vb->getState() == vbucket_state_dead ||
1805         vb->getState() == vbucket_state_replica) {
1806         ++stats.numNotMyVBuckets;
1807         return ENGINE_NOT_MY_VBUCKET;
1808     }
1809
1810     int bucket_num(0);
1811     deleted = 0;
1812     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1813     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true,
1814                                           trackReferenced);
1815
1816     if (v) {
1817         stats.numOpsGetMeta++;
1818
1819         if (v->isTempInitialItem()) { // Need bg meta fetch.
1820             bgFetch(key, vbucket, -1, cookie, true);
1821             return ENGINE_EWOULDBLOCK;
1822         } else if (v->isTempNonExistentItem()) {
1823             metadata.cas = v->getCas();
1824             return ENGINE_KEY_ENOENT;
1825         } else {
1826             if (v->isTempDeletedItem() || v->isDeleted() ||
1827                 v->isExpired(ep_real_time())) {
1828                 deleted |= GET_META_ITEM_DELETED_FLAG;
1829             }
1830
1831             if (v->isLocked(ep_current_time())) {
1832                 metadata.cas = static_cast<uint64_t>(-1);
1833             } else {
1834                 metadata.cas = v->getCas();
1835             }
1836             metadata.flags = v->getFlags();
1837             metadata.exptime = v->getExptime();
1838             metadata.revSeqno = v->getRevSeqno();
1839             return ENGINE_SUCCESS;
1840         }
1841     } else {
1842         // The key wasn't found. However, this may be because it was previously
1843         // deleted or evicted with the full eviction strategy.
1844         // So, add a temporary item corresponding to the key to the hash table
1845         // and schedule a background fetch for its metadata from the persistent
1846         // store. The item's state will be updated after the fetch completes.
1847         return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
1848     }
1849 }
1850
1851 ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
1852                                                          uint64_t cas,
1853                                                          const void *cookie,
1854                                                          bool force,
1855                                                          bool allowExisting,
1856                                                          uint8_t nru,
1857                                                          bool genBySeqno,
1858                                                          bool isReplication)
1859 {
1860     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
1861     if (!vb) {
1862         ++stats.numNotMyVBuckets;
1863         return ENGINE_NOT_MY_VBUCKET;
1864     }
1865
1866     ReaderLockHolder rlh(vb->getStateLock());
1867     if (vb->getState() == vbucket_state_dead) {
1868         ++stats.numNotMyVBuckets;
1869         return ENGINE_NOT_MY_VBUCKET;
1870     } else if (vb->getState() == vbucket_state_replica && !force) {
1871         ++stats.numNotMyVBuckets;
1872         return ENGINE_NOT_MY_VBUCKET;
1873     } else if (vb->getState() == vbucket_state_pending && !force) {
1874         if (vb->addPendingOp(cookie)) {
1875             return ENGINE_EWOULDBLOCK;
1876         }
1877     }
1878
1879     int bucket_num(0);
1880     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
1881     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
1882                                           false);
1883
1884     if (!force) {
1885         if (v)  {
1886             if (v->isTempInitialItem()) {
1887                 bgFetch(itm.getKey(), itm.getVBucketId(), -1, cookie, true);
1888                 return ENGINE_EWOULDBLOCK;
1889             }
1890             if (!conflictResolver->resolve(v, itm.getMetaData(), false)) {
1891                 ++stats.numOpsSetMetaResolutionFailed;
1892                 return ENGINE_KEY_EEXISTS;
1893             }
1894         } else {
1895             return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1896                                          cookie, true, isReplication);
1897         }
1898     }
1899
1900     if (v && v->isLocked(ep_current_time()) &&
1901         (vb->getState() == vbucket_state_replica ||
1902          vb->getState() == vbucket_state_pending)) {
1903         v->unlock();
1904     }
1905     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
1906                                                 true, eviction_policy, nru,
1907                                                 isReplication);
1908
1909     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1910     switch (mtype) {
1911     case NOMEM:
1912         ret = ENGINE_ENOMEM;
1913         break;
1914     case INVALID_CAS:
1915     case IS_LOCKED:
1916         ret = ENGINE_KEY_EEXISTS;
1917         break;
1918     case INVALID_VBUCKET:
1919         ret = ENGINE_NOT_MY_VBUCKET;
1920         break;
1921     case WAS_DIRTY:
1922     case WAS_CLEAN:
1923         queueDirty(vb, v, &lh, false, true, genBySeqno);
1924         break;
1925     case NOT_FOUND:
1926         ret = ENGINE_KEY_ENOENT;
1927         break;
1928     case NEED_BG_FETCH:
1929         {            // CAS operation with non-resident item + full eviction.
1930             if (v) { // temp item is already created. Simply schedule a
1931                 lh.unlock(); // bg fetch job.
1932                 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
1933                 return ENGINE_EWOULDBLOCK;
1934             }
1935             ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1936                                         cookie, true, isReplication);
1937         }
1938     }
1939
1940     return ret;
1941 }
1942
1943 GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
1944                                                     uint16_t vbucket,
1945                                                     const void *cookie,
1946                                                     time_t exptime)
1947 {
1948     RCPtr<VBucket> vb = getVBucket(vbucket);
1949     if (!vb) {
1950         ++stats.numNotMyVBuckets;
1951         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1952     }
1953
1954     ReaderLockHolder rlh(vb->getStateLock());
1955     if (vb->getState() == vbucket_state_dead) {
1956         ++stats.numNotMyVBuckets;
1957         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1958     } else if (vb->getState() == vbucket_state_replica) {
1959         ++stats.numNotMyVBuckets;
1960         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1961     } else if (vb->getState() == vbucket_state_pending) {
1962         if (vb->addPendingOp(cookie)) {
1963             return GetValue(NULL, ENGINE_EWOULDBLOCK);
1964         }
1965     }
1966
1967     int bucket_num(0);
1968     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1969     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1970
1971     if (v) {
1972         if (v->isDeleted() || v->isTempDeletedItem() ||
1973             v->isTempNonExistentItem()) {
1974             GetValue rv;
1975             return rv;
1976         }
1977
1978         if (!v->isResident()) {
1979             bgFetch(key, vbucket, v->getBySeqno(), cookie);
1980             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
1981         }
1982         if (v->isLocked(ep_current_time())) {
1983             GetValue rv(NULL, ENGINE_KEY_EEXISTS, 0);
1984             return rv;
1985         }
1986
1987         bool exptime_mutated = exptime != v->getExptime() ? true : false;
1988         if (exptime_mutated) {
1989            v->markDirty();
1990            v->setExptime(exptime);
1991         }
1992
1993         GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
1994                     ENGINE_SUCCESS, v->getBySeqno());
1995
1996         if (exptime_mutated) {
1997             if (vb->getState() == vbucket_state_active) {
1998                 // persist the item in the underlying storage for
1999                 // mutated exptime but only if VB is active.
2000                 queueDirty(vb, v, &lh);
2001             }
2002         }
2003         return rv;
2004     } else {
2005         if (eviction_policy == VALUE_ONLY) {
2006             GetValue rv;
2007             return rv;
2008         } else {
2009             ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
2010                                                          vb, cookie, false);
2011             return GetValue(NULL, ec, -1, true);
2012         }
2013     }
2014 }
2015
2016 ENGINE_ERROR_CODE
2017 EventuallyPersistentStore::statsVKey(const std::string &key,
2018                                      uint16_t vbucket,
2019                                      const void *cookie) {
2020     RCPtr<VBucket> vb = getVBucket(vbucket);
2021     if (!vb) {
2022         return ENGINE_NOT_MY_VBUCKET;
2023     }
2024
2025     int bucket_num(0);
2026     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2027     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2028
2029     if (v) {
2030         if (v->isDeleted() || v->isTempDeletedItem() ||
2031             v->isTempNonExistentItem()) {
2032             return ENGINE_KEY_ENOENT;
2033         }
2034         bgFetchQueue++;
2035         cb_assert(bgFetchQueue > 0);
2036         ExecutorPool* iom = ExecutorPool::get();
2037         ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
2038                                            v->getBySeqno(), cookie,
2039                                            Priority::VKeyStatBgFetcherPriority,
2040                                            bgFetchDelay, false);
2041         iom->schedule(task, READER_TASK_IDX);
2042         return ENGINE_EWOULDBLOCK;
2043     } else {
2044         if (eviction_policy == VALUE_ONLY) {
2045             return ENGINE_KEY_ENOENT;
2046         } else {
2047             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2048                                                         eviction_policy);
2049             switch(rv) {
2050             case ADD_NOMEM:
2051                 return ENGINE_ENOMEM;
2052             case ADD_EXISTS:
2053             case ADD_UNDEL:
2054             case ADD_SUCCESS:
2055             case ADD_TMP_AND_BG_FETCH:
2056                 // Since the hashtable bucket is locked, we shouldn't get here
2057                 abort();
2058             case ADD_BG_FETCH:
2059                 {
2060                     ++bgFetchQueue;
2061                     cb_assert(bgFetchQueue > 0);
2062                     ExecutorPool* iom = ExecutorPool::get();
2063                     ExTask task = new VKeyStatBGFetchTask(&engine, key,
2064                                                           vbucket, -1, cookie,
2065                                            Priority::VKeyStatBgFetcherPriority,
2066                                                           bgFetchDelay, false);
2067                     iom->schedule(task, READER_TASK_IDX);
2068                 }
2069             }
2070             return ENGINE_EWOULDBLOCK;
2071         }
2072     }
2073 }
2074
2075 void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
2076                                                   std::string &key,
2077                                                   uint16_t vbid,
2078                                                   uint64_t bySeqNum) {
2079     RememberingCallback<GetValue> gcb;
2080
2081     getROUnderlying(vbid)->get(key, bySeqNum, vbid, gcb);
2082     gcb.waitForValue();
2083     cb_assert(gcb.fired);
2084
2085     if (eviction_policy == FULL_EVICTION) {
2086         RCPtr<VBucket> vb = getVBucket(vbid);
2087         if (vb) {
2088             int bucket_num(0);
2089             LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
2090             StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2091             if (v && v->isTempInitialItem()) {
2092                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2093                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
2094                     cb_assert(v->isResident());
2095                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
2096                     v->setStoredValueState(
2097                                           StoredValue::state_non_existent_key);
2098                 } else {
2099                     // underlying kvstore couldn't fetch requested data
2100                     // log returned error and notify TMPFAIL to client
2101                     LOG(EXTENSION_LOG_WARNING,
2102                         "Warning: failed background fetch for vb=%d seq=%d "
2103                         "key=%s", vbid, v->getBySeqno(), key.c_str());
2104                 }
2105             }
2106         }
2107     }
2108
2109     if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2110         engine.addLookupResult(cookie, gcb.val.getValue());
2111     } else {
2112         engine.addLookupResult(cookie, NULL);
2113     }
2114
2115     bgFetchQueue--;
2116     engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
2117 }
2118
2119 bool EventuallyPersistentStore::getLocked(const std::string &key,
2120                                           uint16_t vbucket,
2121                                           Callback<GetValue> &cb,
2122                                           rel_time_t currentTime,
2123                                           uint32_t lockTimeout,
2124                                           const void *cookie) {
2125     RCPtr<VBucket> vb = getVBucket(vbucket);
2126     if (!vb || vb->getState() != vbucket_state_active) {
2127         ++stats.numNotMyVBuckets;
2128         GetValue rv(NULL, ENGINE_NOT_MY_VBUCKET);
2129         cb.callback(rv);
2130         return false;
2131     }
2132
2133     int bucket_num(0);
2134     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2135     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2136
2137     if (v) {
2138         if (v->isDeleted() || v->isTempNonExistentItem() ||
2139             v->isTempDeletedItem()) {
2140             GetValue rv;
2141             cb.callback(rv);
2142             return true;
2143         }
2144
2145         // if v is locked return error
2146         if (v->isLocked(currentTime)) {
2147             GetValue rv;
2148             cb.callback(rv);
2149             return false;
2150         }
2151
2152         // If the value is not resident, wait for it...
2153         if (!v->isResident()) {
2154             if (cookie) {
2155                 bgFetch(key, vbucket, v->getBySeqno(), cookie);
2156             }
2157             GetValue rv(NULL, ENGINE_EWOULDBLOCK, -1, true);
2158             cb.callback(rv);
2159             return false;
2160         }
2161
2162         // acquire lock and increment cas value
2163         v->lock(currentTime + lockTimeout);
2164
2165         Item *it = v->toItem(false, vbucket);
2166         it->setCas();
2167         v->setCas(it->getCas());
2168
2169         GetValue rv(it);
2170         cb.callback(rv);
2171         return true;
2172     } else {
2173         if (eviction_policy == VALUE_ONLY) {
2174             GetValue rv;
2175             cb.callback(rv);
2176             return true;
2177         } else {
2178             ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
2179                                                          vb, cookie, false);
2180             GetValue rv(NULL, ec, -1, true);
2181             cb.callback(rv);
2182             return false;
2183         }
2184     }
2185 }
2186
2187 ENGINE_ERROR_CODE
2188 EventuallyPersistentStore::unlockKey(const std::string &key,
2189                                      uint16_t vbucket,
2190                                      uint64_t cas,
2191                                      rel_time_t currentTime)
2192 {
2193
2194     RCPtr<VBucket> vb = getVBucket(vbucket);
2195     if (!vb || vb->getState() != vbucket_state_active) {
2196         ++stats.numNotMyVBuckets;
2197         return ENGINE_NOT_MY_VBUCKET;
2198     }
2199
2200     int bucket_num(0);
2201     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2202     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2203
2204     if (v) {
2205         if (v->isDeleted() || v->isTempNonExistentItem() ||
2206             v->isTempDeletedItem()) {
2207             return ENGINE_KEY_ENOENT;
2208         }
2209         if (v->isLocked(currentTime)) {
2210             if (v->getCas() == cas) {
2211                 v->unlock();
2212                 return ENGINE_SUCCESS;
2213             }
2214         }
2215         return ENGINE_TMPFAIL;
2216     } else {
2217         if (eviction_policy == VALUE_ONLY) {
2218             return ENGINE_KEY_ENOENT;
2219         } else {
2220             // With the full eviction, an item's lock is automatically
2221             // released when the item is evicted from memory. Therefore,
2222             // we simply return ENGINE_TMPFAIL when we receive unlockKey
2223             // for an item that is not in memocy cache. Note that we don't
2224             // spawn any bg fetch job to figure out if an item actually
2225             // exists in disk or not.
2226             return ENGINE_TMPFAIL;
2227         }
2228     }
2229 }
2230
2231
2232 ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
2233                                             const std::string &key,
2234                                             uint16_t vbucket,
2235                                             const void *cookie,
2236                                             struct key_stats &kstats,
2237                                             bool bgfetch,
2238                                             bool wantsDeleted)
2239 {
2240     RCPtr<VBucket> vb = getVBucket(vbucket);
2241     if (!vb) {
2242         return ENGINE_NOT_MY_VBUCKET;
2243     }
2244
2245     int bucket_num(0);
2246     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2247     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2248
2249     if (v) {
2250         if ((v->isDeleted() && !wantsDeleted) ||
2251             v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2252             return ENGINE_KEY_ENOENT;
2253         }
2254         if (eviction_policy == FULL_EVICTION &&
2255             v->isTempInitialItem() && bgfetch) {
2256             lh.unlock();
2257             bgFetch(key, vbucket, -1, cookie, true);
2258             return ENGINE_EWOULDBLOCK;
2259         }
2260         kstats.logically_deleted = v->isDeleted();
2261         kstats.dirty = v->isDirty();
2262         kstats.exptime = v->getExptime();
2263         kstats.flags = v->getFlags();
2264         kstats.cas = v->getCas();
2265         kstats.vb_state = vb->getState();
2266         return ENGINE_SUCCESS;
2267     } else {
2268         if (eviction_policy == VALUE_ONLY) {
2269             return ENGINE_KEY_ENOENT;
2270         } else {
2271             if (bgfetch) {
2272                 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2273                                              cookie, true);
2274             } else {
2275                 return ENGINE_KEY_ENOENT;
2276             }
2277         }
2278     }
2279 }
2280
2281 std::string EventuallyPersistentStore::validateKey(const std::string &key,
2282                                                    uint16_t vbucket,
2283                                                    Item &diskItem) {
2284     int bucket_num(0);
2285     RCPtr<VBucket> vb = getVBucket(vbucket);
2286     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2287     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
2288                                      false, true);
2289
2290     if (v) {
2291         if (v->isDeleted() || v->isTempNonExistentItem() ||
2292             v->isTempDeletedItem()) {
2293             return "item_deleted";
2294         }
2295
2296         if (diskItem.getFlags() != v->getFlags()) {
2297             return "flags_mismatch";
2298         } else if (v->isResident() && memcmp(diskItem.getData(),
2299                                              v->getValue()->getData(),
2300                                              diskItem.getNBytes())) {
2301             return "data_mismatch";
2302         } else {
2303             return "valid";
2304         }
2305     } else {
2306         return "item_deleted";
2307     }
2308
2309 }
2310
2311 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
2312                                                         uint64_t* cas,
2313                                                         uint16_t vbucket,
2314                                                         const void *cookie,
2315                                                         bool force,
2316                                                         ItemMetaData *itemMeta,
2317                                                         bool tapBackfill)
2318 {
2319     RCPtr<VBucket> vb = getVBucket(vbucket);
2320     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2321         ++stats.numNotMyVBuckets;
2322         return ENGINE_NOT_MY_VBUCKET;
2323     } else if(vb->getState() == vbucket_state_replica && !force) {
2324         ++stats.numNotMyVBuckets;
2325         return ENGINE_NOT_MY_VBUCKET;
2326     } else if(vb->getState() == vbucket_state_pending && !force) {
2327         if (vb->addPendingOp(cookie)) {
2328             return ENGINE_EWOULDBLOCK;
2329         }
2330     }
2331
2332     int bucket_num(0);
2333     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2334     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2335     if (!v || v->isDeleted() || v->isTempItem()) {
2336         if (eviction_policy == VALUE_ONLY) {
2337             return ENGINE_KEY_ENOENT;
2338         } else { // Full eviction.
2339             if (!force) {
2340                 if (!v) { // Item might be evicted from cache.
2341                     return addTempItemForBgFetch(lh, bucket_num, key, vb,
2342                                                  cookie, true);
2343                 } else if (v->isTempInitialItem()) {
2344                     lh.unlock();
2345                     bgFetch(key, vbucket, -1, cookie, true);
2346                     return ENGINE_EWOULDBLOCK;
2347                 } else { // Non-existent or deleted key.
2348                     return ENGINE_KEY_ENOENT;
2349                 }
2350             } else {
2351                 if (!v) { // Item might be evicted from cache.
2352                     // Create a temp item and delete it below as it is a
2353                     // force deletion.
2354                     add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num,
2355                                                               key,
2356                                                               eviction_policy);
2357                     if (rv == ADD_NOMEM) {
2358                         return ENGINE_ENOMEM;
2359                     }
2360                     v = vb->ht.unlocked_find(key, bucket_num, true, false);
2361                     v->setStoredValueState(StoredValue::state_deleted_key);
2362                 } else if (v->isTempInitialItem()) {
2363                     v->setStoredValueState(StoredValue::state_deleted_key);
2364                 } else { // Non-existent or deleted key.
2365                     return ENGINE_KEY_ENOENT;
2366                 }
2367             }
2368         }
2369     }
2370
2371     if (v && v->isLocked(ep_current_time()) &&
2372         (vb->getState() == vbucket_state_replica ||
2373          vb->getState() == vbucket_state_pending)) {
2374         v->unlock();
2375     }
2376     mutation_type_t delrv;
2377     delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
2378
2379     if (itemMeta && v) {
2380         itemMeta->revSeqno = v->getRevSeqno();
2381         itemMeta->cas = v->getCas();
2382         itemMeta->flags = v->getFlags();
2383         itemMeta->exptime = v->getExptime();
2384     }
2385     *cas = v ? v->getCas() : 0;
2386
2387     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2388     switch (delrv) {
2389     case NOMEM:
2390         ret = ENGINE_ENOMEM;
2391         break;
2392     case INVALID_VBUCKET:
2393         ret = ENGINE_NOT_MY_VBUCKET;
2394         break;
2395     case INVALID_CAS:
2396         ret = ENGINE_KEY_EEXISTS;
2397         break;
2398     case IS_LOCKED:
2399         ret = ENGINE_TMPFAIL;
2400         break;
2401     case NOT_FOUND:
2402         ret = ENGINE_KEY_ENOENT;
2403         if (v) {
2404             queueDirty(vb, v, &lh, tapBackfill);
2405         }
2406         break;
2407     case WAS_DIRTY:
2408     case WAS_CLEAN:
2409         queueDirty(vb, v, &lh, tapBackfill);
2410         break;
2411     case NEED_BG_FETCH:
2412         // We already figured out if a bg fetch is requred for a full-evicted
2413         // item above.
2414         abort();
2415     }
2416     return ret;
2417 }
2418
2419 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
2420                                                         const std::string &key,
2421                                                         uint64_t* cas,
2422                                                         uint16_t vbucket,
2423                                                         const void *cookie,
2424                                                         bool force,
2425                                                         ItemMetaData *itemMeta,
2426                                                         bool tapBackfill,
2427                                                         bool genBySeqno,
2428                                                         uint64_t bySeqno,
2429                                                         bool isReplication)
2430 {
2431     RCPtr<VBucket> vb = getVBucket(vbucket);
2432     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2433         ++stats.numNotMyVBuckets;
2434         return ENGINE_NOT_MY_VBUCKET;
2435     } else if(vb->getState() == vbucket_state_replica && !force) {
2436         ++stats.numNotMyVBuckets;
2437         return ENGINE_NOT_MY_VBUCKET;
2438     } else if(vb->getState() == vbucket_state_pending && !force) {
2439         if (vb->addPendingOp(cookie)) {
2440             return ENGINE_EWOULDBLOCK;
2441         }
2442     }
2443
2444     int bucket_num(0);
2445     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2446     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2447     if (!force) { // Need conflict resolution.
2448         if (v)  {
2449             if (v->isTempInitialItem()) {
2450                 bgFetch(key, vbucket, -1, cookie, true);
2451                 return ENGINE_EWOULDBLOCK;
2452             }
2453             if (!conflictResolver->resolve(v, *itemMeta, true)) {
2454                 ++stats.numOpsDelMetaResolutionFailed;
2455                 return ENGINE_KEY_EEXISTS;
2456             }
2457         } else {
2458             // Item is 1) deleted or not existent in the value eviction case OR
2459             // 2) deleted or evicted in the full eviction.
2460             return addTempItemForBgFetch(lh, bucket_num, key, vb,
2461                                          cookie, true, isReplication);
2462         }
2463     } else {
2464         if (!v) {
2465             // Create a temp item and delete it below as it is a force deletion
2466             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2467                                                         eviction_policy,
2468                                                         isReplication);
2469             if (rv == ADD_NOMEM) {
2470                 return ENGINE_ENOMEM;
2471             }
2472             v = vb->ht.unlocked_find(key, bucket_num, true, false);
2473             v->setStoredValueState(StoredValue::state_deleted_key);
2474         } else if (v->isTempInitialItem()) {
2475             v->setStoredValueState(StoredValue::state_deleted_key);
2476         }
2477     }
2478
2479     if (v && v->isLocked(ep_current_time()) &&
2480         (vb->getState() == vbucket_state_replica ||
2481          vb->getState() == vbucket_state_pending)) {
2482         v->unlock();
2483     }
2484     mutation_type_t delrv;
2485     delrv = vb->ht.unlocked_softDelete(v, *cas, *itemMeta,
2486                                        eviction_policy, true);
2487     *cas = v ? v->getCas() : 0;
2488
2489     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2490     switch (delrv) {
2491     case NOMEM:
2492         ret = ENGINE_ENOMEM;
2493         break;
2494     case INVALID_VBUCKET:
2495         ret = ENGINE_NOT_MY_VBUCKET;
2496         break;
2497     case INVALID_CAS:
2498         ret = ENGINE_KEY_EEXISTS;
2499         break;
2500     case IS_LOCKED:
2501         ret = ENGINE_TMPFAIL;
2502         break;
2503     case NOT_FOUND:
2504         ret = ENGINE_KEY_ENOENT;
2505         break;
2506     case WAS_DIRTY:
2507     case WAS_CLEAN:
2508         if (!genBySeqno) {
2509             v->setBySeqno(bySeqno);
2510         }
2511         queueDirty(vb, v, &lh, tapBackfill, true, genBySeqno);
2512         break;
2513     case NEED_BG_FETCH:
2514         lh.unlock();
2515         bgFetch(key, vbucket, -1, cookie, true);
2516         ret = ENGINE_EWOULDBLOCK;
2517     }
2518
2519     return ret;
2520 }
2521
2522 void EventuallyPersistentStore::reset() {
2523     std::vector<int> buckets = vbMap.getBuckets();
2524     std::vector<int>::iterator it;
2525     for (it = buckets.begin(); it != buckets.end(); ++it) {
2526         RCPtr<VBucket> vb = getVBucket(*it);
2527         if (vb) {
2528             LockHolder lh(vb_mutexes[vb->getId()]);
2529             vb->ht.clear();
2530             vb->checkpointManager.clear(vb->getState());
2531             vb->resetStats();
2532             vb->setCurrentSnapshot(0, 0);
2533         }
2534     }
2535
2536     bool inverse = false;
2537     if (diskFlushAll.compare_exchange_strong(inverse, true)) {
2538         ++stats.diskQueueSize;
2539         // wake up (notify) one flusher is good enough for diskFlushAll
2540         vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
2541     }
2542 }
2543
2544 /**
2545  * Callback invoked after persisting an item from memory to disk.
2546  *
2547  * This class exists to create a closure around a few variables within
2548  * EventuallyPersistentStore::flushOne so that an object can be
2549  * requeued in case of failure to store in the underlying layer.
2550  */
2551 class PersistenceCallback : public Callback<mutation_result>,
2552                             public Callback<int> {
2553 public:
2554
2555     PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
2556                         EventuallyPersistentStore *st, EPStats *s, uint64_t c)
2557         : queuedItem(qi), vbucket(vb), store(st), stats(s), cas(c) {
2558         cb_assert(vb);
2559         cb_assert(s);
2560     }
2561
2562     // This callback is invoked for set only.
2563     void callback(mutation_result &value) {
2564         if (value.first == 1) {
2565             int bucket_num(0);
2566             LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2567                                                         &bucket_num);
2568             StoredValue *v = store->fetchValidValue(vbucket,
2569                                                     queuedItem->getKey(),
2570                                                     bucket_num, true, false);
2571             if (v) {
2572                 if (v->getCas() == cas) {
2573                     // mark this item clean only if current and stored cas
2574                     // value match
2575                     v->markClean();
2576                 }
2577                 if (v->isNewCacheItem()) {
2578                     if (value.second) {
2579                         // Insert in value-only or full eviction mode.
2580                         ++vbucket->opsCreate;
2581                         vbucket->incrMetaDataDisk(*queuedItem);
2582                     } else { // Update in full eviction mode.
2583                         --vbucket->ht.numTotalItems;
2584                         ++vbucket->opsUpdate;
2585                     }
2586                     v->setNewCacheItem(false);
2587                 } else { // Update in value-only or full eviction mode.
2588                     ++vbucket->opsUpdate;
2589                 }
2590             }
2591
2592             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2593             stats->decrDiskQueueSize(1);
2594             stats->totalPersisted++;
2595         } else {
2596             // If the return was 0 here, we're in a bad state because
2597             // we do not know the rowid of this object.
2598             if (value.first == 0) {
2599                 int bucket_num(0);
2600                 LockHolder lh = vbucket->ht.getLockedBucket(
2601                                            queuedItem->getKey(), &bucket_num);
2602                 StoredValue *v = store->fetchValidValue(vbucket,
2603                                                         queuedItem->getKey(),
2604                                                         bucket_num, true,
2605                                                         false);
2606                 if (v) {
2607                     std::stringstream ss;
2608                     ss << "Persisting ``" << queuedItem->getKey() << "'' on vb"
2609                        << queuedItem->getVBucketId() << " (rowid="
2610                        << v->getBySeqno() << ") returned 0 updates\n";
2611                     LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2612                 } else {
2613                     LOG(EXTENSION_LOG_WARNING,
2614                         "Error persisting now missing ``%s'' from vb%d",
2615                         queuedItem->getKey().c_str(),
2616                         queuedItem->getVBucketId());
2617                 }
2618
2619                 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2620                 stats->decrDiskQueueSize(1);
2621             } else {
2622                 std::stringstream ss;
2623                 ss <<
2624                 "Fatal error in persisting SET ``" <<
2625                 queuedItem->getKey() << "'' on vb "
2626                    << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2627                 LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2628                 redirty();
2629             }
2630         }
2631     }
2632
2633     // This callback is invoked for deletions only.
2634     //
2635     // The boolean indicates whether the underlying storage
2636     // successfully deleted the item.
2637     void callback(int &value) {
2638         // > 1 would be bad.  We were only trying to delete one row.
2639         cb_assert(value < 2);
2640         // -1 means fail
2641         // 1 means we deleted one row
2642         // 0 means we did not delete a row, but did not fail (did not exist)
2643         if (value >= 0) {
2644             // We have succesfully removed an item from the disk, we
2645             // may now remove it from the hash table.
2646             int bucket_num(0);
2647             LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2648                                                         &bucket_num);
2649             StoredValue *v = store->fetchValidValue(vbucket,
2650                                                     queuedItem->getKey(),
2651                                                     bucket_num, true, false);
2652             if (v && v->isDeleted()) {
2653                 bool newCacheItem = v->isNewCacheItem();
2654                 bool deleted = vbucket->ht.unlocked_del(queuedItem->getKey(),
2655                                                         bucket_num);
2656                 cb_assert(deleted);
2657                 if (newCacheItem && value > 0) {
2658                     // Need to decrement the item counter again for an item that
2659                     // exists on DB file, but not in memory (i.e., full eviction),
2660                     // because we created the temp item in memory and incremented
2661                     // the item counter when a deletion is pushed in the queue.
2662                     --vbucket->ht.numTotalItems;
2663                 }
2664             }
2665
2666             if (value > 0) {
2667                 ++stats->totalPersisted;
2668                 ++vbucket->opsDelete;
2669             }
2670             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2671             stats->decrDiskQueueSize(1);
2672             vbucket->decrMetaDataDisk(*queuedItem);
2673         } else {
2674             std::stringstream ss;
2675             ss << "Fatal error in persisting DELETE ``" <<
2676             queuedItem->getKey() << "'' on vb "
2677                << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2678             LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2679             redirty();
2680         }
2681     }
2682
2683 private:
2684
2685     void redirty() {
2686         if (store->vbMap.isBucketDeletion(vbucket->getId())) {
2687             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2688             stats->decrDiskQueueSize(1);
2689             return;
2690         }
2691         ++stats->flushFailed;
2692         store->invokeOnLockedStoredValue(queuedItem->getKey(),
2693                                          queuedItem->getVBucketId(),
2694                                          &StoredValue::reDirty);
2695         vbucket->rejectQueue.push(queuedItem);
2696     }
2697
2698     const queued_item queuedItem;
2699     RCPtr<VBucket> &vbucket;
2700     EventuallyPersistentStore *store;
2701     EPStats *stats;
2702     uint64_t cas;
2703     DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
2704 };
2705
2706 void EventuallyPersistentStore::flushOneDeleteAll() {
2707     for (size_t i = 0; i < vbMap.getSize(); ++i) {
2708         RCPtr<VBucket> vb = getVBucket(i);
2709         if (vb) {
2710             LockHolder lh(vb_mutexes[vb->getId()]);
2711             getRWUnderlying(vb->getId())->reset(i);
2712         }
2713     }
2714
2715     bool inverse = true;
2716     diskFlushAll.compare_exchange_strong(inverse, false);
2717     stats.decrDiskQueueSize(1);
2718 }
2719
2720 int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
2721     KVShard *shard = vbMap.getShard(vbid);
2722     if (diskFlushAll) {
2723         if (shard->getId() == EP_PRIMARY_SHARD) {
2724             flushOneDeleteAll();
2725         } else {
2726             // disk flush is pending just return
2727             return 0;
2728         }
2729     }
2730
2731     if (vbMap.isBucketCreation(vbid)) {
2732         return RETRY_FLUSH_VBUCKET;
2733     }
2734
2735     int items_flushed = 0;
2736     rel_time_t flush_start = ep_current_time();
2737
2738     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
2739     if (vb) {
2740         LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
2741         if (!lh.islocked()) { // Try another bucket if this one is locked
2742             return RETRY_FLUSH_VBUCKET; // to avoid blocking flusher
2743         }
2744
2745         KVStatsCallback cb(this);
2746         std::vector<queued_item> items;
2747         KVStore *rwUnderlying = getRWUnderlying(vbid);
2748
2749         while (!vb->rejectQueue.empty()) {
2750             items.push_back(vb->rejectQueue.front());
2751             vb->rejectQueue.pop();
2752         }
2753
2754         LockHolder slh = vb->getSnapshotLock();
2755         uint64_t snapStart;
2756         uint64_t snapEnd;
2757         vb->getCurrentSnapshot_UNLOCKED(snapStart, snapEnd);
2758
2759         vb->getBackfillItems(items);
2760         vb->checkpointManager.getAllItemsForPersistence(items);
2761         slh.unlock();
2762
2763         if (!items.empty()) {
2764             while (!rwUnderlying->begin()) {
2765                 ++stats.beginFailed;
2766                 LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
2767                     "Retry in 1 sec ...");
2768                 sleep(1);
2769             }
2770             rwUnderlying->optimizeWrites(items);
2771
2772             Item *prev = NULL;
2773             uint64_t maxSeqno = 0;
2774             std::list<PersistenceCallback*> pcbs;
2775             std::vector<queued_item>::iterator it = items.begin();
2776             for(; it != items.end(); ++it) {
2777                 if ((*it)->getOperation() != queue_op_set &&
2778                     (*it)->getOperation() != queue_op_del) {
2779                     continue;
2780                 } else if (!prev || prev->getKey() != (*it)->getKey()) {
2781                     prev = (*it).get();
2782                     ++items_flushed;
2783                     PersistenceCallback *cb = flushOneDelOrSet(*it, vb);
2784                     if (cb) {
2785                         pcbs.push_back(cb);
2786                     }
2787                     maxSeqno = std::max(maxSeqno, (uint64_t)(*it)->getBySeqno());
2788                     ++stats.flusher_todo;
2789                 } else {
2790                     stats.decrDiskQueueSize(1);
2791                     vb->doStatsForFlushing(*(*it), (*it)->size());
2792                 }
2793             }
2794
2795             BlockTimer timer(&stats.diskCommitHisto, "disk_commit",
2796                              stats.timingLog);
2797             hrtime_t start = gethrtime();
2798
2799             if (vb->getState() == vbucket_state_active) {
2800                 snapStart = maxSeqno;
2801                 snapEnd = maxSeqno;
2802             }
2803
2804             while (!rwUnderlying->commit(&cb, snapStart, snapEnd)) {
2805                 ++stats.commitFailed;
2806                 LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
2807                     "1 sec...\n");
2808                 sleep(1);
2809
2810             }
2811
2812             if (vb->rejectQueue.empty()) {
2813                 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
2814                 if (highSeqno > 0 &&
2815                     highSeqno != vbMap.getPersistenceSeqno(vbid)) {
2816                     vbMap.setPersistenceSeqno(vbid, highSeqno);
2817                 }
2818             }
2819
2820             while (!pcbs.empty()) {
2821                 delete pcbs.front();
2822                 pcbs.pop_front();
2823             }
2824
2825             ++stats.flusherCommits;
2826             hrtime_t end = gethrtime();
2827             uint64_t commit_time = (end - start) / 1000000;
2828             uint64_t trans_time = (end - flush_start) / 1000000;
2829
2830             lastTransTimePerItem.store((items_flushed == 0) ? 0 :
2831                                        static_cast<double>(trans_time) /
2832                                        static_cast<double>(items_flushed));
2833             stats.commit_time.store(commit_time);
2834             stats.cumulativeCommitTime.fetch_add(commit_time);
2835             stats.cumulativeFlushTime.fetch_add(ep_current_time()
2836                                                 - flush_start);
2837             stats.flusher_todo.store(0);
2838         }
2839
2840         rwUnderlying->pendingTasks();
2841
2842         if (vb->checkpointManager.getNumCheckpoints() > 1) {
2843             wakeUpCheckpointRemover();
2844         }
2845
2846         if (vb->rejectQueue.empty()) {
2847             vb->checkpointManager.itemsPersisted();
2848             uint64_t seqno = vbMap.getPersistenceSeqno(vbid);
2849             uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
2850             vb->notifyOnPersistence(engine, seqno, true);
2851             vb->notifyOnPersistence(engine, chkid, false);
2852             if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
2853                 vbMap.setPersistenceCheckpointId(vbid, chkid);
2854             }
2855         }
2856     }
2857
2858     return items_flushed;
2859 }
2860
2861 PersistenceCallback*
2862 EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
2863                                             RCPtr<VBucket> &vb) {
2864
2865     if (!vb) {
2866         stats.decrDiskQueueSize(1);
2867         return NULL;
2868     }
2869
2870     int64_t bySeqno = qi->getBySeqno();
2871     bool deleted = qi->isDeleted();
2872     rel_time_t queued(qi->getQueuedTime());
2873
2874     int dirtyAge = ep_current_time() - queued;
2875     stats.dirtyAgeHisto.add(dirtyAge * 1000000);
2876     stats.dirtyAge.store(dirtyAge);
2877     stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
2878                                          stats.dirtyAgeHighWat.load()));
2879
2880     // Wait until the vbucket database is created by the vbucket state
2881     // snapshot task.
2882     if (vbMap.isBucketCreation(qi->getVBucketId()) ||
2883         vbMap.isBucketDeletion(qi->getVBucketId())) {
2884         vb->rejectQueue.push(qi);
2885         ++vb->opsReject;
2886         return NULL;
2887     }
2888
2889     KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
2890     if (!deleted) {
2891         // TODO: Need to separate disk_insert from disk_update because
2892         // bySeqno doesn't give us that information.
2893         BlockTimer timer(bySeqno == -1 ?
2894                          &stats.diskInsertHisto : &stats.diskUpdateHisto,
2895                          bySeqno == -1 ? "disk_insert" : "disk_update",
2896                          stats.timingLog);
2897         PersistenceCallback *cb =
2898             new PersistenceCallback(qi, vb, this, &stats, qi->getCas());
2899         rwUnderlying->set(*qi, *cb);
2900         return cb;
2901     } else {
2902         BlockTimer timer(&stats.diskDelHisto, "disk_delete",
2903                          stats.timingLog);
2904         PersistenceCallback *cb =
2905             new PersistenceCallback(qi, vb, this, &stats, 0);
2906         rwUnderlying->del(*qi, *cb);
2907         return cb;
2908     }
2909 }
2910
2911 void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
2912                                            StoredValue* v,
2913                                            LockHolder *plh,
2914                                            bool tapBackfill,
2915                                            bool notifyReplicator,
2916                                            bool genBySeqno) {
2917     if (vb) {
2918         queued_item qi(v->toItem(false, vb->getId()));
2919         bool rv;
2920         if (genBySeqno) {
2921             LockHolder slh = vb->getSnapshotLock();
2922             rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
2923                                vb->checkpointManager.queueDirty(vb, qi,
2924                                                                 genBySeqno);
2925             v->setBySeqno(qi->getBySeqno());
2926             vb->setCurrentSnapshot_UNLOCKED(qi->getBySeqno(), qi->getBySeqno());
2927         } else {
2928             rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
2929                                vb->checkpointManager.queueDirty(vb, qi,
2930                                                                 genBySeqno);
2931             v->setBySeqno(qi->getBySeqno());
2932         }
2933
2934         if (plh) {
2935             plh->unlock();
2936         }
2937
2938         if (rv) {
2939             KVShard* shard = vbMap.getShard(vb->getId());
2940             shard->getFlusher()->notifyFlushEvent();
2941
2942         }
2943         if (!tapBackfill && notifyReplicator) {
2944             engine.getTapConnMap().notifyVBConnections(vb->getId());
2945             engine.getDcpConnMap().notifyVBConnections(vb->getId(),
2946                                                        qi->getBySeqno());
2947         }
2948     }
2949 }
2950
2951 std::vector<vbucket_state *> EventuallyPersistentStore::loadVBucketState()
2952 {
2953     return getOneROUnderlying()->listPersistedVbuckets();
2954 }
2955
2956 void EventuallyPersistentStore::warmupCompleted() {
2957     // Run the vbucket state snapshot job once after the warmup
2958     scheduleVBSnapshot(Priority::VBucketPersistHighPriority);
2959
2960     if (engine.getConfiguration().getAlogPath().length() > 0) {
2961
2962         if (engine.getConfiguration().isAccessScannerEnabled()) {
2963             LockHolder lh(accessScanner.mutex);
2964             accessScanner.enabled = true;
2965             lh.unlock();
2966             LOG(EXTENSION_LOG_WARNING, "Access Scanner task enabled");
2967             size_t smin = engine.getConfiguration().getAlogSleepTime();
2968             setAccessScannerSleeptime(smin);
2969         } else {
2970             LockHolder lh(accessScanner.mutex);
2971             accessScanner.enabled = false;
2972             LOG(EXTENSION_LOG_WARNING, "Access Scanner task disabled");
2973         }
2974
2975         Configuration &config = engine.getConfiguration();
2976         config.addValueChangedListener("access_scanner_enabled",
2977                                        new EPStoreValueChangeListener(*this));
2978         config.addValueChangedListener("alog_sleep_time",
2979                                        new EPStoreValueChangeListener(*this));
2980         config.addValueChangedListener("alog_task_time",
2981                                        new EPStoreValueChangeListener(*this));
2982     }
2983
2984     // "0" sleep_time means that the first snapshot task will be executed
2985     // right after warmup. Subsequent snapshot tasks will be scheduled every
2986     // 60 sec by default.
2987     ExecutorPool *iom = ExecutorPool::get();
2988     ExTask task = new StatSnap(&engine, Priority::StatSnapPriority, 0, false);
2989     statsSnapshotTaskId = iom->schedule(task, WRITER_TASK_IDX);
2990 }
2991
2992 bool EventuallyPersistentStore::maybeEnableTraffic()
2993 {
2994     // @todo rename.. skal vaere isTrafficDisabled elns
2995     double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
2996     double maxSize = static_cast<double>(stats.getMaxDataSize());
2997
2998     if (memoryUsed  >= stats.mem_low_wat) {
2999         LOG(EXTENSION_LOG_WARNING,
3000             "Total memory use reached to the low water mark, stop warmup");
3001         return true;
3002     } else if (memoryUsed > (maxSize * stats.warmupMemUsedCap)) {
3003         LOG(EXTENSION_LOG_WARNING,
3004                 "Enough MB of data loaded to enable traffic");
3005         return true;
3006     } else if (eviction_policy == VALUE_ONLY &&
3007                stats.warmedUpValues >=
3008                                (stats.warmedUpKeys * stats.warmupNumReadCap)) {
3009         // Let ep-engine think we're done with the warmup phase
3010         // (we should refactor this into "enableTraffic")
3011         LOG(EXTENSION_LOG_WARNING,
3012             "Enough number of items loaded to enable traffic");
3013         return true;
3014     } else if (eviction_policy == FULL_EVICTION &&
3015                stats.warmedUpValues >=
3016                             (warmupTask->getEstimatedItemCount() *
3017                              stats.warmupNumReadCap)) {
3018         // In case of FULL EVICTION, warmed up keys always matches the number
3019         // of warmed up values, therefore for honoring the min_item threshold
3020         // in this scenario, we can consider warmup's estimated item count.
3021         LOG(EXTENSION_LOG_WARNING,
3022             "Enough number of items loaded to enable traffic");
3023         return true;
3024     }
3025     return false;
3026 }
3027
3028 bool EventuallyPersistentStore::isWarmingUp() {
3029     return !warmupTask->isComplete();
3030 }
3031
3032 void EventuallyPersistentStore::stopWarmup(void)
3033 {
3034     // forcefully stop current warmup task
3035     if (isWarmingUp()) {
3036         LOG(EXTENSION_LOG_WARNING, "Stopping warmup while engine is loading "
3037             "data from underlying storage, shutdown = %s\n",
3038             stats.isShutdown ? "yes" : "no");
3039         warmupTask->stop();
3040     }
3041 }
3042
3043 bool EventuallyPersistentStore::isMemoryUsageTooHigh() {
3044     double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
3045     double maxSize = static_cast<double>(stats.getMaxDataSize());
3046     return memoryUsed > (maxSize * backfillMemoryThreshold);
3047 }
3048
3049 void EventuallyPersistentStore::setBackfillMemoryThreshold(
3050                                                 double threshold) {
3051     backfillMemoryThreshold = threshold;
3052 }
3053
3054 void EventuallyPersistentStore::setExpiryPagerSleeptime(size_t val) {
3055     LockHolder lh(expiryPager.mutex);
3056
3057     if (expiryPager.sleeptime != 0) {
3058         ExecutorPool::get()->cancel(expiryPager.task);
3059     }
3060
3061     expiryPager.sleeptime = val;
3062     if (val != 0) {
3063         ExTask expTask = new ExpiredItemPager(&engine, stats,
3064                                                 expiryPager.sleeptime);
3065         expiryPager.task = ExecutorPool::get()->schedule(expTask,
3066                                                         NONIO_TASK_IDX);
3067     }
3068 }
3069
3070 void EventuallyPersistentStore::enableAccessScannerTask() {
3071     LockHolder lh(accessScanner.mutex);
3072     if (!accessScanner.enabled) {
3073         accessScanner.enabled = true;
3074
3075         if (accessScanner.sleeptime != 0) {
3076             ExecutorPool::get()->cancel(accessScanner.task);
3077         }
3078
3079         size_t alogSleepTime = engine.getConfiguration().getAlogSleepTime();
3080         accessScanner.sleeptime = alogSleepTime * 60;
3081         if (accessScanner.sleeptime != 0) {
3082             ExTask task = new AccessScanner(*this, stats,
3083                                             Priority::AccessScannerPriority,
3084                                             accessScanner.sleeptime);
3085             accessScanner.task = ExecutorPool::get()->schedule(task,
3086                                                                AUXIO_TASK_IDX);
3087             struct timeval tv;
3088             gettimeofday(&tv, NULL);
3089             advance_tv(tv, accessScanner.sleeptime);
3090             stats.alogTime.store(tv.tv_sec);
3091         } else {
3092             LOG(EXTENSION_LOG_WARNING, "Did not enable access scanner task, "
3093                                        "as alog_sleep_time is set to zero!");
3094         }
3095     } else {
3096         LOG(EXTENSION_LOG_DEBUG, "Access scanner already enabled!");
3097     }
3098 }
3099
3100 void EventuallyPersistentStore::disableAccessScannerTask() {
3101     LockHolder lh(accessScanner.mutex);
3102     if (accessScanner.enabled) {
3103         ExecutorPool::get()->cancel(accessScanner.task);
3104         accessScanner.sleeptime = 0;
3105         accessScanner.enabled = false;
3106     } else {
3107         LOG(EXTENSION_LOG_DEBUG, "Access scanner already disabled!");
3108     }
3109 }
3110
3111 void EventuallyPersistentStore::setAccessScannerSleeptime(size_t val) {
3112     LockHolder lh(accessScanner.mutex);
3113
3114     if (accessScanner.enabled) {
3115         if (accessScanner.sleeptime != 0) {
3116             ExecutorPool::get()->cancel(accessScanner.task);
3117         }
3118
3119         // store sleeptime in seconds
3120         accessScanner.sleeptime = val * 60;
3121         if (accessScanner.sleeptime != 0) {
3122             ExTask task = new AccessScanner(*this, stats,
3123                                             Priority::AccessScannerPriority,
3124                                             accessScanner.sleeptime);
3125             accessScanner.task = ExecutorPool::get()->schedule(task,
3126                                                                AUXIO_TASK_IDX);
3127
3128             struct timeval tv;
3129             gettimeofday(&tv, NULL);
3130             advance_tv(tv, accessScanner.sleeptime);
3131             stats.alogTime.store(tv.tv_sec);
3132         }
3133     }
3134 }
3135
3136 void EventuallyPersistentStore::resetAccessScannerStartTime() {
3137     LockHolder lh(accessScanner.mutex);
3138
3139     if (accessScanner.enabled) {
3140         if (accessScanner.sleeptime != 0) {
3141             ExecutorPool::get()->cancel(accessScanner.task);
3142             // re-schedule task according to the new task start hour
3143             ExTask task = new AccessScanner(*this, stats,
3144                                             Priority::AccessScannerPriority,
3145                                             accessScanner.sleeptime);
3146             accessScanner.task = ExecutorPool::get()->schedule(task,
3147                                                                AUXIO_TASK_IDX);
3148
3149             struct timeval tv;
3150             gettimeofday(&tv, NULL);
3151             advance_tv(tv, accessScanner.sleeptime);
3152             stats.alogTime.store(tv.tv_sec);
3153         }
3154     }
3155 }
3156
3157 void EventuallyPersistentStore::visit(VBucketVisitor &visitor)
3158 {
3159     size_t maxSize = vbMap.getSize();
3160     cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
3161     for (size_t i = 0; i < maxSize; ++i) {
3162         uint16_t vbid = static_cast<uint16_t>(i);
3163         RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3164         if (vb) {
3165             bool wantData = visitor.visitBucket(vb);
3166             // We could've lost this along the way.
3167             if (wantData) {
3168                 vb->ht.visit(visitor);
3169             }
3170         }
3171     }
3172     visitor.complete();
3173 }
3174
3175 VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s,
3176                          shared_ptr<VBucketVisitor> v,
3177                          const char *l, const Priority &p, double sleep) :
3178     GlobalTask(&s->getEPEngine(), p, 0, false), store(s),
3179     visitor(v), label(l), sleepTime(sleep), currentvb(0)
3180 {
3181     const VBucketFilter &vbFilter = visitor->getVBucketFilter();
3182     size_t maxSize = store->vbMap.getSize();
3183     cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
3184     for (size_t i = 0; i < maxSize; ++i) {
3185         uint16_t vbid = static_cast<uint16_t>(i);
3186         RCPtr<VBucket> vb = store->vbMap.getBucket(vbid);
3187         if (vb && vbFilter(vbid)) {
3188             vbList.push(vbid);
3189         }
3190     }
3191 }
3192
3193 bool VBCBAdaptor::run(void) {
3194     if (!vbList.empty()) {
3195         currentvb.store(vbList.front());
3196         RCPtr<VBucket> vb = store->vbMap.getBucket(currentvb);
3197         if (vb) {
3198             if (visitor->pauseVisitor()) {
3199                 snooze(sleepTime);
3200                 return true;
3201             }
3202             if (visitor->visitBucket(vb)) {
3203                 vb->ht.visit(*visitor);
3204             }
3205         }
3206         vbList.pop();
3207     }
3208
3209     bool isdone = vbList.empty();
3210     if (isdone) {
3211         visitor->complete();
3212     }
3213     return !isdone;
3214 }
3215
3216 VBucketVisitorTask::VBucketVisitorTask(EventuallyPersistentStore *s,
3217                                        shared_ptr<VBucketVisitor> v,
3218                                        uint16_t sh, const char *l,
3219                                        double sleep, bool shutdown):
3220     GlobalTask(&(s->getEPEngine()), Priority::AccessScannerPriority,
3221                0, shutdown),
3222     store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
3223     shardID(sh)
3224 {
3225     const VBucketFilter &vbFilter = visitor->getVBucketFilter();
3226     std::vector<int> vbs = store->vbMap.getShard(shardID)->getVBuckets();
3227     cb_assert(vbs.size() <= std::numeric_limits<uint16_t>::max());
3228     std::vector<int>::iterator it;
3229     for (it = vbs.begin(); it != vbs.end(); ++it) {
3230         uint16_t vbid = static_cast<uint16_t>(*it);
3231         RCPtr<VBucket> vb = store->vbMap.getBucket(vbid);
3232         if (vb && vbFilter(vbid)) {
3233             vbList.push(vbid);
3234         }
3235     }
3236 }
3237
3238 bool VBucketVisitorTask::run() {
3239     if (!vbList.empty()) {
3240         currentvb = vbList.front();
3241         RCPtr<VBucket> vb = store->vbMap.getBucket(currentvb);
3242         if (vb) {
3243             if (visitor->pauseVisitor()) {
3244                 snooze(sleepTime);
3245                 return true;
3246             }
3247             if (visitor->visitBucket(vb)) {
3248                 vb->ht.visit(*visitor);
3249             }
3250         }
3251         vbList.pop();
3252     }
3253
3254     bool isDone = vbList.empty();
3255     if (isDone) {
3256         visitor->complete();
3257     }
3258     return !isDone;
3259 }
3260
3261 void EventuallyPersistentStore::resetUnderlyingStats(void)
3262 {
3263     for (size_t i = 0; i < vbMap.numShards; i++) {
3264         KVShard *shard = vbMap.shards[i];
3265         shard->getRWUnderlying()->resetStats();
3266         shard->getROUnderlying()->resetStats();
3267     }
3268
3269     for (size_t i = 0; i < MAX_TYPE_ID; i++) {
3270         stats.schedulingHisto[i].reset();
3271         stats.taskRuntimeHisto[i].reset();
3272     }
3273 }
3274
3275 void EventuallyPersistentStore::addKVStoreStats(ADD_STAT add_stat,
3276                                                 const void* cookie) {
3277     for (size_t i = 0; i < vbMap.numShards; i++) {
3278         std::stringstream rwPrefix;
3279         std::stringstream roPrefix;
3280         rwPrefix << "rw_" << i;
3281         roPrefix << "ro_" << i;
3282         vbMap.shards[i]->getRWUnderlying()->addStats(rwPrefix.str(), add_stat,
3283                                                      cookie);
3284         vbMap.shards[i]->getROUnderlying()->addStats(roPrefix.str(), add_stat,
3285                                                      cookie);
3286     }
3287 }
3288
3289 void EventuallyPersistentStore::addKVStoreTimingStats(ADD_STAT add_stat,
3290                                                       const void* cookie) {
3291     for (size_t i = 0; i < vbMap.numShards; i++) {
3292         std::stringstream rwPrefix;
3293         std::stringstream roPrefix;
3294         rwPrefix << "rw_" << i;
3295         roPrefix << "ro_" << i;
3296         vbMap.shards[i]->getRWUnderlying()->addTimingStats(rwPrefix.str(),
3297                                                            add_stat,
3298                                                            cookie);
3299         vbMap.shards[i]->getROUnderlying()->addTimingStats(roPrefix.str(),
3300                                                            add_stat,
3301                                                            cookie);
3302     }
3303 }
3304
3305 KVStore *EventuallyPersistentStore::getOneROUnderlying(void) {
3306     return vbMap.getShard(EP_PRIMARY_SHARD)->getROUnderlying();
3307 }
3308
3309 KVStore *EventuallyPersistentStore::getOneRWUnderlying(void) {
3310     return vbMap.getShard(EP_PRIMARY_SHARD)->getRWUnderlying();
3311 }
3312
3313 ENGINE_ERROR_CODE
3314 EventuallyPersistentStore::rollback(uint16_t vbid,
3315                                     uint64_t rollbackSeqno) {
3316     LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
3317     if (!lh.islocked()) {
3318         return ENGINE_TMPFAIL; // Reschedule a vbucket rollback task.
3319     }
3320
3321     if (rollbackSeqno != 0) {
3322         shared_ptr<RollbackCB> cb(new RollbackCB(engine));
3323         KVStore* rwUnderlying = vbMap.getShard(vbid)->getRWUnderlying();
3324         RollbackResult result = rwUnderlying->rollback(vbid, rollbackSeqno, cb);
3325
3326         if (result.success) {
3327             RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3328             vb->failovers->pruneEntries(result.highSeqno);
3329             vb->checkpointManager.clear(vb->getState());
3330             vb->checkpointManager.setBySeqno(result.highSeqno);
3331             vb->setCurrentSnapshot(result.snapStartSeqno, result.snapEndSeqno);
3332             return ENGINE_SUCCESS;
3333         }
3334     }
3335
3336     if (resetVBucket(vbid)) {
3337         return ENGINE_SUCCESS;
3338     }
3339     return ENGINE_NOT_MY_VBUCKET;
3340 }