Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / ep.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <string.h>
21 #include <time.h>
22
23 #include <fstream>
24 #include <functional>
25 #include <iostream>
26 #include <map>
27 #include <sstream>
28 #include <string>
29 #include <utility>
30 #include <vector>
31
32 #include "access_scanner.h"
33 #include "checkpoint_remover.h"
34 #include "conflict_resolution.h"
35 #include "defragmenter.h"
36 #include "ep.h"
37 #include "ep_engine.h"
38 #include "failover-table.h"
39 #include "flusher.h"
40 #include "htresizer.h"
41 #include "kvshard.h"
42 #include "kvstore.h"
43 #include "locks.h"
44 #include "mutation_log.h"
45 #include "warmup.h"
46 #include "connmap.h"
47 #include "tapthrottle.h"
48
49 class StatsValueChangeListener : public ValueChangedListener {
50 public:
51     StatsValueChangeListener(EPStats &st, EventuallyPersistentStore &str)
52         : stats(st), store(str) {
53         // EMPTY
54     }
55
56     virtual void sizeValueChanged(const std::string &key, size_t value) {
57         if (key.compare("max_size") == 0) {
58             stats.setMaxDataSize(value);
59             store.getEPEngine().getDcpConnMap(). \
60                                      updateMaxActiveSnoozingBackfills(value);
61             size_t low_wat = static_cast<size_t>
62                              (static_cast<double>(value) * 0.75);
63             size_t high_wat = static_cast<size_t>(
64                               static_cast<double>(value) * 0.85);
65             stats.mem_low_wat.store(low_wat);
66             stats.mem_high_wat.store(high_wat);
67         } else if (key.compare("mem_low_wat") == 0) {
68             stats.mem_low_wat.store(value);
69         } else if (key.compare("mem_high_wat") == 0) {
70             stats.mem_high_wat.store(value);
71         } else if (key.compare("tap_throttle_threshold") == 0) {
72             stats.tapThrottleThreshold.store(
73                                           static_cast<double>(value) / 100.0);
74         } else if (key.compare("warmup_min_memory_threshold") == 0) {
75             stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
76         } else if (key.compare("warmup_min_items_threshold") == 0) {
77             stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
78         } else {
79             LOG(EXTENSION_LOG_WARNING,
80                 "Failed to change value for unknown variable, %s\n",
81                 key.c_str());
82         }
83     }
84
85 private:
86     EPStats &stats;
87     EventuallyPersistentStore &store;
88 };
89
90 /**
91  * A configuration value changed listener that responds to ep-engine
92  * parameter changes by invoking engine-specific methods on
93  * configuration change events.
94  */
95 class EPStoreValueChangeListener : public ValueChangedListener {
96 public:
97     EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
98     }
99
100     virtual void sizeValueChanged(const std::string &key, size_t value) {
101         if (key.compare("bg_fetch_delay") == 0) {
102             store.setBGFetchDelay(static_cast<uint32_t>(value));
103         } else if (key.compare("compaction_write_queue_cap") == 0) {
104             store.setCompactionWriteQueueCap(value);
105         } else if (key.compare("exp_pager_stime") == 0) {
106             store.setExpiryPagerSleeptime(value);
107         } else if (key.compare("alog_sleep_time") == 0) {
108             store.setAccessScannerSleeptime(value);
109         } else if (key.compare("alog_task_time") == 0) {
110             store.resetAccessScannerStartTime();
111         } else if (key.compare("mutation_mem_threshold") == 0) {
112             double mem_threshold = static_cast<double>(value) / 100;
113             StoredValue::setMutationMemoryThreshold(mem_threshold);
114         } else if (key.compare("backfill_mem_threshold") == 0) {
115             double backfill_threshold = static_cast<double>(value) / 100;
116             store.setBackfillMemoryThreshold(backfill_threshold);
117         } else if (key.compare("compaction_exp_mem_threshold") == 0) {
118             store.setCompactionExpMemThreshold(value);
119         } else if (key.compare("tap_throttle_queue_cap") == 0) {
120             store.getEPEngine().getTapThrottle().setQueueCap(value);
121         } else if (key.compare("tap_throttle_cap_pcnt") == 0) {
122             store.getEPEngine().getTapThrottle().setCapPercent(value);
123         } else {
124             LOG(EXTENSION_LOG_WARNING,
125                 "Failed to change value for unknown variable, %s\n",
126                 key.c_str());
127         }
128     }
129
130     virtual void booleanValueChanged(const std::string &key, bool value) {
131         if (key.compare("access_scanner_enabled") == 0) {
132             if (value) {
133                 store.enableAccessScannerTask();
134             } else {
135                 store.disableAccessScannerTask();
136             }
137         } else if (key.compare("bfilter_enabled") == 0) {
138             store.setAllBloomFilters(value);
139         }
140     }
141
142     virtual void floatValueChanged(const std::string &key, float value) {
143         if (key.compare("bfilter_residency_threshold") == 0) {
144             store.setBfiltersResidencyThreshold(value);
145         }
146     }
147
148 private:
149     EventuallyPersistentStore &store;
150 };
151
152 class VBucketMemoryDeletionTask : public GlobalTask {
153 public:
154     VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
155                               RCPtr<VBucket> &vb, double delay) :
156                               GlobalTask(&eng,
157                               TaskId::VBucketMemoryDeletionTask, delay, true),
158                               e(eng), vbucket(vb), vbid(vb->getId()) { }
159
160     std::string getDescription() {
161         std::stringstream ss;
162         ss << "Removing (dead) vbucket " << vbid << " from memory";
163         return ss.str();
164     }
165
166     bool run(void) {
167         vbucket->notifyAllPendingConnsFailed(e);
168         vbucket->ht.clear();
169         vbucket.reset();
170         return false;
171     }
172
173 private:
174     EventuallyPersistentEngine &e;
175     RCPtr<VBucket> vbucket;
176     uint16_t vbid;
177 };
178
179 class PendingOpsNotification : public GlobalTask {
180 public:
181     PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
182         GlobalTask(&e, TaskId::PendingOpsNotification, 0, false),
183         engine(e), vbucket(vb) { }
184
185     std::string getDescription() {
186         std::stringstream ss;
187         ss << "Notify pending operations for vbucket " << vbucket->getId();
188         return ss.str();
189     }
190
191     bool run(void) {
192         vbucket->fireAllOps(engine);
193         return false;
194     }
195
196 private:
197     EventuallyPersistentEngine &engine;
198     RCPtr<VBucket> vbucket;
199 };
200
201 EventuallyPersistentStore::EventuallyPersistentStore(
202     EventuallyPersistentEngine &theEngine) :
203     engine(theEngine), stats(engine.getEpStats()),
204     vbMap(theEngine.getConfiguration(), *this),
205     defragmenterTask(NULL),
206     bgFetchQueue(0),
207     diskFlushAll(false), bgFetchDelay(0),
208     backfillMemoryThreshold(0.95),
209     statsSnapshotTaskId(0), lastTransTimePerItem(0)
210 {
211     cachedResidentRatio.activeRatio.store(0);
212     cachedResidentRatio.replicaRatio.store(0);
213
214     Configuration &config = engine.getConfiguration();
215     MutationLog *shardlog;
216     for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
217         std::stringstream s;
218         s << i;
219         shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
220                                  "." + s.str(),
221                                  engine.getConfiguration().getAlogBlockSize());
222         accessLog.push_back(shardlog);
223     }
224
225     storageProperties = new StorageProperties(true, true, true, true);
226
227     stats.schedulingHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
228     stats.taskRuntimeHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
229
230     for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
231         stats.schedulingHisto[i].reset();
232         stats.taskRuntimeHisto[i].reset();
233     }
234
235     ExecutorPool::get()->registerBucket(ObjectRegistry::getCurrentEngine());
236
237     size_t num_vbs = config.getMaxVbuckets();
238     vb_mutexes = new Mutex[num_vbs];
239     schedule_vbstate_persist = new AtomicValue<bool>[num_vbs];
240     for (size_t i = 0; i < num_vbs; ++i) {
241         schedule_vbstate_persist[i] = false;
242     }
243
244     stats.memOverhead = sizeof(EventuallyPersistentStore);
245
246     if (config.getConflictResolutionType().compare("seqno") == 0) {
247         conflictResolver = new ConflictResolution();
248     }
249
250     stats.setMaxDataSize(config.getMaxSize());
251     config.addValueChangedListener("max_size",
252                                    new StatsValueChangeListener(stats, *this));
253     getEPEngine().getDcpConnMap().updateMaxActiveSnoozingBackfills(config.getMaxSize());
254
255     stats.mem_low_wat.store(config.getMemLowWat());
256     config.addValueChangedListener("mem_low_wat",
257                                    new StatsValueChangeListener(stats, *this));
258
259     stats.mem_high_wat.store(config.getMemHighWat());
260     config.addValueChangedListener("mem_high_wat",
261                                    new StatsValueChangeListener(stats, *this));
262
263     stats.tapThrottleThreshold.store(static_cast<double>
264                                     (config.getTapThrottleThreshold())
265                                      / 100.0);
266     config.addValueChangedListener("tap_throttle_threshold",
267                                    new StatsValueChangeListener(stats, *this));
268
269     stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
270     config.addValueChangedListener("tap_throttle_queue_cap",
271                                    new EPStoreValueChangeListener(*this));
272     config.addValueChangedListener("tap_throttle_cap_pcnt",
273                                    new EPStoreValueChangeListener(*this));
274
275     setBGFetchDelay(config.getBgFetchDelay());
276     config.addValueChangedListener("bg_fetch_delay",
277                                    new EPStoreValueChangeListener(*this));
278
279     stats.warmupMemUsedCap.store(static_cast<double>
280                                (config.getWarmupMinMemoryThreshold()) / 100.0);
281     config.addValueChangedListener("warmup_min_memory_threshold",
282                                    new StatsValueChangeListener(stats, *this));
283     stats.warmupNumReadCap.store(static_cast<double>
284                                 (config.getWarmupMinItemsThreshold()) / 100.0);
285     config.addValueChangedListener("warmup_min_items_threshold",
286                                    new StatsValueChangeListener(stats, *this));
287
288     double mem_threshold = static_cast<double>
289                                       (config.getMutationMemThreshold()) / 100;
290     StoredValue::setMutationMemoryThreshold(mem_threshold);
291     config.addValueChangedListener("mutation_mem_threshold",
292                                    new EPStoreValueChangeListener(*this));
293
294     double backfill_threshold = static_cast<double>
295                                       (config.getBackfillMemThreshold()) / 100;
296     setBackfillMemoryThreshold(backfill_threshold);
297     config.addValueChangedListener("backfill_mem_threshold",
298                                    new EPStoreValueChangeListener(*this));
299
300     config.addValueChangedListener("bfilter_enabled",
301                                    new EPStoreValueChangeListener(*this));
302
303     bfilterResidencyThreshold = config.getBfilterResidencyThreshold();
304     config.addValueChangedListener("bfilter_residency_threshold",
305                                    new EPStoreValueChangeListener(*this));
306
307     compactionExpMemThreshold = config.getCompactionExpMemThreshold();
308     config.addValueChangedListener("compaction_exp_mem_threshold",
309                                    new EPStoreValueChangeListener(*this));
310
311     compactionWriteQueueCap = config.getCompactionWriteQueueCap();
312     config.addValueChangedListener("compaction_write_queue_cap",
313                                    new EPStoreValueChangeListener(*this));
314
315     const std::string &policy = config.getItemEvictionPolicy();
316     if (policy.compare("value_only") == 0) {
317         eviction_policy = VALUE_ONLY;
318     } else {
319         eviction_policy = FULL_EVICTION;
320     }
321
322     warmupTask = new Warmup(this);
323 }
324
325 bool EventuallyPersistentStore::initialize() {
326     // We should nuke everything unless we want warmup
327     Configuration &config = engine.getConfiguration();
328     if (!config.isWarmup()) {
329         reset();
330     }
331
332     if (!startFlusher()) {
333         LOG(EXTENSION_LOG_WARNING,
334             "FATAL: Failed to create and start flushers");
335         return false;
336     }
337     if (!startBgFetcher()) {
338         LOG(EXTENSION_LOG_WARNING,
339            "FATAL: Failed to create and start bgfetchers");
340         return false;
341     }
342
343     warmupTask->start();
344
345     if (config.isFailpartialwarmup() && stats.warmOOM > 0) {
346         LOG(EXTENSION_LOG_WARNING,
347             "Warmup failed to load %d records due to OOM, exiting.\n",
348             static_cast<unsigned int>(stats.warmOOM));
349         return false;
350     }
351
352     itmpTask = new ItemPager(&engine, stats);
353     ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
354
355     size_t expiryPagerSleeptime = config.getExpPagerStime();
356     setExpiryPagerSleeptime(expiryPagerSleeptime);
357     config.addValueChangedListener("exp_pager_stime",
358                                    new EPStoreValueChangeListener(*this));
359
360     ExTask htrTask = new HashtableResizerTask(this, 10);
361     ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
362
363     size_t checkpointRemoverInterval = config.getChkRemoverStime();
364     chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
365                                                    checkpointRemoverInterval);
366     ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
367
368     ExTask vbSnapshotTask = new DaemonVBSnapshotTask(&engine);
369     ExecutorPool::get()->schedule(vbSnapshotTask, WRITER_TASK_IDX);
370
371     ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
372     ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
373
374 #if HAVE_JEMALLOC
375     /* Only create the defragmenter task if we have an underlying memory
376      * allocator which can facilitate defragmenting memory.
377      */
378     defragmenterTask = new DefragmenterTask(&engine, stats);
379     ExecutorPool::get()->schedule(defragmenterTask, NONIO_TASK_IDX);
380 #endif
381
382     return true;
383 }
384
385 EventuallyPersistentStore::~EventuallyPersistentStore() {
386     stopWarmup();
387     stopBgFetcher();
388     ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX,
389                                        stats.forceShutdown);
390
391     ExecutorPool::get()->cancel(statsSnapshotTaskId);
392     LockHolder lh(accessScanner.mutex);
393     ExecutorPool::get()->cancel(accessScanner.task);
394     lh.unlock();
395
396     stopFlusher();
397     ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine(),
398                                           stats.forceShutdown);
399
400     delete [] vb_mutexes;
401     delete [] schedule_vbstate_persist;
402     delete [] stats.schedulingHisto;
403     delete [] stats.taskRuntimeHisto;
404     delete conflictResolver;
405     delete warmupTask;
406     delete storageProperties;
407     defragmenterTask.reset();
408
409     std::vector<MutationLog*>::iterator it;
410     for (it = accessLog.begin(); it != accessLog.end(); it++) {
411         delete *it;
412     }
413 }
414
415 const Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
416     return vbMap.getShard(shardId)->getFlusher();
417 }
418
419 Warmup* EventuallyPersistentStore::getWarmup(void) const {
420     return warmupTask;
421 }
422
423 bool EventuallyPersistentStore::startFlusher() {
424     for (uint16_t i = 0; i < vbMap.numShards; ++i) {
425         Flusher *flusher = vbMap.shards[i]->getFlusher();
426         flusher->start();
427     }
428     return true;
429 }
430
431 void EventuallyPersistentStore::stopFlusher() {
432     for (uint16_t i = 0; i < vbMap.numShards; i++) {
433         Flusher *flusher = vbMap.shards[i]->getFlusher();
434         LOG(EXTENSION_LOG_WARNING, "Attempting to stop the flusher for "
435             "shard:%" PRIu16, i);
436         bool rv = flusher->stop(stats.forceShutdown);
437         if (rv && !stats.forceShutdown) {
438             flusher->wait();
439         }
440     }
441 }
442
443 bool EventuallyPersistentStore::pauseFlusher() {
444     bool rv = true;
445     for (uint16_t i = 0; i < vbMap.numShards; i++) {
446         Flusher *flusher = vbMap.shards[i]->getFlusher();
447         if (!flusher->pause()) {
448             LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
449                 "[%s], shard = %d", flusher->stateName(), i);
450             rv = false;
451         }
452     }
453     return rv;
454 }
455
456 bool EventuallyPersistentStore::resumeFlusher() {
457     bool rv = true;
458     for (uint16_t i = 0; i < vbMap.numShards; i++) {
459         Flusher *flusher = vbMap.shards[i]->getFlusher();
460         if (!flusher->resume()) {
461             LOG(EXTENSION_LOG_WARNING,
462                     "Warning: attempted to resume flusher in state [%s], "
463                     "shard = %d", flusher->stateName(), i);
464             rv = false;
465         }
466     }
467     return rv;
468 }
469
470 void EventuallyPersistentStore::wakeUpFlusher() {
471     if (stats.diskQueueSize.load() == 0) {
472         for (uint16_t i = 0; i < vbMap.numShards; i++) {
473             Flusher *flusher = vbMap.shards[i]->getFlusher();
474             flusher->wake();
475         }
476     }
477 }
478
479 bool EventuallyPersistentStore::startBgFetcher() {
480     for (uint16_t i = 0; i < vbMap.numShards; i++) {
481         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
482         if (bgfetcher == NULL) {
483             LOG(EXTENSION_LOG_WARNING,
484                 "Failed to start bg fetcher for shard %d", i);
485             return false;
486         }
487         bgfetcher->start();
488     }
489     return true;
490 }
491
492 void EventuallyPersistentStore::stopBgFetcher() {
493     for (uint16_t i = 0; i < vbMap.numShards; i++) {
494         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
495         if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
496             LOG(EXTENSION_LOG_WARNING,
497                 "Shutting down engine while there are still pending data "
498                 "read for shard %d from database storage", i);
499         }
500         LOG(EXTENSION_LOG_WARNING, "Stopping bg fetcher for shard:%" PRIu16, i);
501         bgfetcher->stop();
502     }
503 }
504
505 void
506 EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
507                                              time_t startTime,
508                                              uint64_t revSeqno) {
509     RCPtr<VBucket> vb = getVBucket(vbid);
510     if (vb) {
511         // Obtain reader access to the VB state change lock so that
512         // the VB can't switch state whilst we're processing
513         ReaderLockHolder rlh(vb->getStateLock());
514         if (vb->getState() == vbucket_state_active) {
515             int bucket_num(0);
516             incExpirationStat(vb);
517             LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
518             StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
519             if (v) {
520                 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
521                     // This is a temporary item whose background fetch for metadata
522                     // has completed.
523                     bool deleted = vb->ht.unlocked_del(key, bucket_num);
524                     cb_assert(deleted);
525                 } else if (v->isExpired(startTime) && !v->isDeleted()) {
526                     vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
527                     queueDirty(vb, v, &lh, NULL, false);
528                 }
529             } else {
530                 if (eviction_policy == FULL_EVICTION) {
531                     // Create a temp item and delete and push it
532                     // into the checkpoint queue, only if the bloomfilter
533                     // predicts that the item may exist on disk.
534                     if (vb->maybeKeyExistsInFilter(key)) {
535                         add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
536                                                                     eviction_policy);
537                         if (rv == ADD_NOMEM) {
538                             return;
539                         }
540                         v = vb->ht.unlocked_find(key, bucket_num, true, false);
541                         v->setStoredValueState(StoredValue::state_deleted_key);
542                         v->setRevSeqno(revSeqno);
543                         vb->ht.unlocked_softDelete(v, 0, eviction_policy);
544                         queueDirty(vb, v, &lh, NULL, false);
545                     }
546                 }
547             }
548         }
549     }
550 }
551
552 void
553 EventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
554                                                         std::string> > &keys) {
555     std::list<std::pair<uint16_t, std::string> >::iterator it;
556     time_t startTime = ep_real_time();
557     for (it = keys.begin(); it != keys.end(); it++) {
558         deleteExpiredItem(it->first, it->second, startTime, 0);
559     }
560 }
561
562 StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
563                                                         const std::string &key,
564                                                         int bucket_num,
565                                                         bool wantDeleted,
566                                                         bool trackReference,
567                                                         bool queueExpired) {
568     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
569                                           trackReference);
570     if (v && !v->isDeleted() && !v->isTempItem()) {
571         // In the deleted case, we ignore expiration time.
572         if (v->isExpired(ep_real_time())) {
573             if (vb->getState() != vbucket_state_active) {
574                 return wantDeleted ? v : NULL;
575             }
576
577             // queueDirty only allowed on active VB
578             if (queueExpired && vb->getState() == vbucket_state_active) {
579                 incExpirationStat(vb, false);
580                 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
581                 queueDirty(vb, v, NULL, NULL, false, true);
582             }
583             return wantDeleted ? v : NULL;
584         }
585     }
586     return v;
587 }
588
589 bool EventuallyPersistentStore::isMetaDataResident(RCPtr<VBucket> &vb,
590                                                    const std::string &key) {
591
592     cb_assert(vb);
593     int bucket_num(0);
594     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
595     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, false, false);
596
597     if (v && !v->isTempItem()) {
598         return true;
599     } else {
600         return false;
601     }
602 }
603
604 protocol_binary_response_status EventuallyPersistentStore::evictKey(
605                                                         const std::string &key,
606                                                         uint16_t vbucket,
607                                                         const char **msg,
608                                                         size_t *msg_size,
609                                                         bool force) {
610     RCPtr<VBucket> vb = getVBucket(vbucket);
611     if (!vb || (vb->getState() != vbucket_state_active && !force)) {
612         return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
613     }
614
615     int bucket_num(0);
616     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
617     StoredValue *v = fetchValidValue(vb, key, bucket_num, force, false);
618
619     protocol_binary_response_status rv(PROTOCOL_BINARY_RESPONSE_SUCCESS);
620
621     *msg_size = 0;
622     if (v) {
623         if (force)  {
624             v->markClean();
625         }
626         if (v->isResident()) {
627             if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
628                 *msg = "Ejected.";
629
630                 // Add key to bloom filter incase of full eviction mode
631                 if (getItemEvictionPolicy() == FULL_EVICTION) {
632                     vb->addToFilter(key);
633                 }
634             } else {
635                 *msg = "Can't eject: Dirty object.";
636                 rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
637             }
638         } else {
639             *msg = "Already ejected.";
640         }
641     } else {
642         if (eviction_policy == VALUE_ONLY) {
643             *msg = "Not found.";
644             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
645         } else {
646             *msg = "Already ejected.";
647         }
648     }
649
650     return rv;
651 }
652
653 ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
654                                                         LockHolder &lock,
655                                                         int bucket_num,
656                                                         const std::string &key,
657                                                         RCPtr<VBucket> &vb,
658                                                         const void *cookie,
659                                                         bool metadataOnly,
660                                                         bool isReplication) {
661
662     add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
663                                                 eviction_policy,
664                                                 isReplication);
665     switch(rv) {
666         case ADD_NOMEM:
667             return ENGINE_ENOMEM;
668         case ADD_EXISTS:
669         case ADD_UNDEL:
670         case ADD_SUCCESS:
671         case ADD_TMP_AND_BG_FETCH:
672             // Since the hashtable bucket is locked, we shouldn't get here
673             abort();
674         case ADD_BG_FETCH:
675             lock.unlock();
676             bgFetch(key, vb->getId(), cookie, metadataOnly);
677     }
678     return ENGINE_EWOULDBLOCK;
679 }
680
681 ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
682                                                  const void *cookie,
683                                                  bool force,
684                                                  uint8_t nru) {
685
686     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
687     if (!vb) {
688         ++stats.numNotMyVBuckets;
689         return ENGINE_NOT_MY_VBUCKET;
690     }
691
692     // Obtain read-lock on VB state to ensure VB state changes are interlocked
693     // with this set
694     ReaderLockHolder rlh(vb->getStateLock());
695     if (vb->getState() == vbucket_state_dead) {
696         ++stats.numNotMyVBuckets;
697         return ENGINE_NOT_MY_VBUCKET;
698     } else if (vb->getState() == vbucket_state_replica && !force) {
699         ++stats.numNotMyVBuckets;
700         return ENGINE_NOT_MY_VBUCKET;
701     } else if (vb->getState() == vbucket_state_pending && !force) {
702         if (vb->addPendingOp(cookie)) {
703             return ENGINE_EWOULDBLOCK;
704         }
705     }
706
707     bool cas_op = (itm.getCas() != 0);
708     int bucket_num(0);
709     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
710     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
711                                           false);
712     if (v && v->isLocked(ep_current_time()) &&
713         (vb->getState() == vbucket_state_replica ||
714          vb->getState() == vbucket_state_pending)) {
715         v->unlock();
716     }
717
718     bool maybeKeyExists = true;
719     // Check Bloomfilter's prediction if in full eviction policy
720     // and for a CAS operation only.
721     if (eviction_policy == FULL_EVICTION && itm.getCas() != 0) {
722         // Check Bloomfilter's prediction
723         if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
724             maybeKeyExists = false;
725         }
726     }
727
728     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(), true, false,
729                                                 eviction_policy, nru,
730                                                 maybeKeyExists);
731
732     Item& it = const_cast<Item&>(itm);
733     uint64_t seqno = 0;
734     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
735     switch (mtype) {
736     case NOMEM:
737         ret = ENGINE_ENOMEM;
738         break;
739     case INVALID_CAS:
740     case IS_LOCKED:
741         ret = ENGINE_KEY_EEXISTS;
742         break;
743     case NOT_FOUND:
744         if (cas_op) {
745             ret = ENGINE_KEY_ENOENT;
746             break;
747         }
748         // FALLTHROUGH
749     case WAS_DIRTY:
750         // Even if the item was dirty, push it into the vbucket's open
751         // checkpoint.
752     case WAS_CLEAN:
753         it.setCas(vb->nextHLCCas());
754         v->setCas(it.getCas());
755         queueDirty(vb, v, &lh, &seqno);
756         it.setBySeqno(seqno);
757         break;
758     case NEED_BG_FETCH:
759     {   // CAS operation with non-resident item + full eviction.
760         if (v) {
761             // temp item is already created. Simply schedule a bg fetch job
762             lh.unlock();
763             bgFetch(itm.getKey(), vb->getId(), cookie, true);
764             return ENGINE_EWOULDBLOCK;
765         }
766         ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
767                                     cookie, true);
768         break;
769     }
770     case INVALID_VBUCKET:
771         ret = ENGINE_NOT_MY_VBUCKET;
772         break;
773     }
774
775     return ret;
776 }
777
778 ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
779                                                  const void *cookie)
780 {
781     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
782     if (!vb) {
783         ++stats.numNotMyVBuckets;
784         return ENGINE_NOT_MY_VBUCKET;
785     }
786
787     // Obtain read-lock on VB state to ensure VB state changes are interlocked
788     // with this add
789     ReaderLockHolder rlh(vb->getStateLock());
790     if (vb->getState() == vbucket_state_dead ||
791         vb->getState() == vbucket_state_replica) {
792         ++stats.numNotMyVBuckets;
793         return ENGINE_NOT_MY_VBUCKET;
794     } else if(vb->getState() == vbucket_state_pending) {
795         if (vb->addPendingOp(cookie)) {
796             return ENGINE_EWOULDBLOCK;
797         }
798     }
799
800     if (itm.getCas() != 0) {
801         // Adding with a cas value doesn't make sense..
802         return ENGINE_NOT_STORED;
803     }
804
805     int bucket_num(0);
806     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
807     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
808                                           false);
809
810     bool maybeKeyExists = true;
811     if (eviction_policy == FULL_EVICTION) {
812         // Check bloomfilter's prediction
813         if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
814             maybeKeyExists = false;
815         }
816     }
817
818     add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
819                                            eviction_policy,
820                                            true, true,
821                                            maybeKeyExists);
822
823     Item& it = const_cast<Item&>(itm);
824     uint64_t seqno = 0;
825     switch (atype) {
826     case ADD_NOMEM:
827         return ENGINE_ENOMEM;
828     case ADD_EXISTS:
829         return ENGINE_NOT_STORED;
830     case ADD_TMP_AND_BG_FETCH:
831         return addTempItemForBgFetch(lh, bucket_num, it.getKey(), vb,
832                                      cookie, true);
833     case ADD_BG_FETCH:
834         lh.unlock();
835         bgFetch(it.getKey(), vb->getId(), cookie, true);
836         return ENGINE_EWOULDBLOCK;
837     case ADD_SUCCESS:
838     case ADD_UNDEL:
839         it.setCas(vb->nextHLCCas());
840         v->setCas(it.getCas());
841         queueDirty(vb, v, &lh, &seqno);
842         it.setBySeqno(seqno);
843         break;
844     }
845
846     return ENGINE_SUCCESS;
847 }
848
849 ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
850                                                      const void *cookie) {
851     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
852     if (!vb) {
853         ++stats.numNotMyVBuckets;
854         return ENGINE_NOT_MY_VBUCKET;
855     }
856
857     // Obtain read-lock on VB state to ensure VB state changes are interlocked
858     // with this replace
859     ReaderLockHolder rlh(vb->getStateLock());
860     if (vb->getState() == vbucket_state_dead ||
861         vb->getState() == vbucket_state_replica) {
862         ++stats.numNotMyVBuckets;
863         return ENGINE_NOT_MY_VBUCKET;
864     } else if (vb->getState() == vbucket_state_pending) {
865         if (vb->addPendingOp(cookie)) {
866             return ENGINE_EWOULDBLOCK;
867         }
868     }
869
870     int bucket_num(0);
871     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
872     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
873                                           false);
874     if (v) {
875         if (v->isDeleted() || v->isTempDeletedItem() ||
876             v->isTempNonExistentItem()) {
877             return ENGINE_KEY_ENOENT;
878         }
879
880         mutation_type_t mtype;
881         if (eviction_policy == FULL_EVICTION && v->isTempInitialItem()) {
882             mtype = NEED_BG_FETCH;
883         } else {
884             mtype = vb->ht.unlocked_set(v, itm, 0, true, false, eviction_policy,
885                                         0xff);
886         }
887
888         Item& it = const_cast<Item&>(itm);
889         uint64_t seqno = 0;
890         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
891         switch (mtype) {
892             case NOMEM:
893                 ret = ENGINE_ENOMEM;
894                 break;
895             case IS_LOCKED:
896                 ret = ENGINE_KEY_EEXISTS;
897                 break;
898             case INVALID_CAS:
899             case NOT_FOUND:
900                 ret = ENGINE_NOT_STORED;
901                 break;
902                 // FALLTHROUGH
903             case WAS_DIRTY:
904                 // Even if the item was dirty, push it into the vbucket's open
905                 // checkpoint.
906             case WAS_CLEAN:
907                 it.setCas(vb->nextHLCCas());
908                 v->setCas(it.getCas());
909                 queueDirty(vb, v, &lh, &seqno);
910                 it.setBySeqno(seqno);
911                 break;
912             case NEED_BG_FETCH:
913             {
914                 // temp item is already created. Simply schedule a bg fetch job
915                 lh.unlock();
916                 bgFetch(it.getKey(), vb->getId(), cookie, true);
917                 ret = ENGINE_EWOULDBLOCK;
918                 break;
919             }
920             case INVALID_VBUCKET:
921                 ret = ENGINE_NOT_MY_VBUCKET;
922                 break;
923         }
924
925         return ret;
926     } else {
927         if (eviction_policy == VALUE_ONLY) {
928             return ENGINE_KEY_ENOENT;
929         }
930
931         if (vb->maybeKeyExistsInFilter(itm.getKey())) {
932             return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
933                                          cookie, false);
934         } else {
935             // As bloomfilter predicted that item surely doesn't exist
936             // on disk, return ENOENT for replace().
937             return ENGINE_KEY_ENOENT;
938         }
939     }
940 }
941
942 ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(
943                                                         const Item &itm,
944                                                         uint8_t nru,
945                                                         bool genBySeqno,
946                                                         ExtendedMetaData *emd) {
947
948     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
949     if (!vb) {
950         ++stats.numNotMyVBuckets;
951         return ENGINE_NOT_MY_VBUCKET;
952     }
953
954     // Obtain read-lock on VB state to ensure VB state changes are interlocked
955     // with this add-tapbackfill
956     ReaderLockHolder rlh(vb->getStateLock());
957     if (vb->getState() == vbucket_state_dead ||
958         vb->getState() == vbucket_state_active) {
959         ++stats.numNotMyVBuckets;
960         return ENGINE_NOT_MY_VBUCKET;
961     }
962
963     //check for the incoming item's CAS validity
964     if (!Item::isValidCas(itm.getCas())) {
965         return ENGINE_KEY_EEXISTS;
966     }
967
968     int bucket_num(0);
969     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
970     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
971                                           false);
972
973     // Note that this function is only called on replica or pending vbuckets.
974     if (v && v->isLocked(ep_current_time())) {
975         v->unlock();
976     }
977     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, 0, true, true,
978                                                 eviction_policy, nru);
979
980     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
981     switch (mtype) {
982     case NOMEM:
983         ret = ENGINE_ENOMEM;
984         break;
985     case INVALID_CAS:
986     case IS_LOCKED:
987         ret = ENGINE_KEY_EEXISTS;
988         break;
989     case WAS_DIRTY:
990         // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
991         // set correctly, and also the sequence numbers are ordered correctly.
992         // (MB-14003)
993     case NOT_FOUND:
994         // FALLTHROUGH
995     case WAS_CLEAN:
996         /* set the conflict resolution mode from the extended meta data *
997          * Given that the mode is already set, we don't need to set the *
998          * conflict resolution mode in queueDirty */
999         if (emd) {
1000             v->setConflictResMode(
1001                  static_cast<enum conflict_resolution_mode>(
1002                                       emd->getConflictResMode()));
1003         }
1004         vb->setMaxCas(v->getCas());
1005         queueDirty(vb, v, &lh, NULL,true, true, genBySeqno, false);
1006         break;
1007     case INVALID_VBUCKET:
1008         ret = ENGINE_NOT_MY_VBUCKET;
1009         break;
1010     case NEED_BG_FETCH:
1011         // SET on a non-active vbucket should not require a bg_metadata_fetch.
1012         abort();
1013     }
1014
1015     // Update drift counter for vbucket upon a success only
1016     if (ret == ENGINE_SUCCESS && emd) {
1017         vb->setDriftCounter(emd->getAdjustedTime());
1018     }
1019
1020     return ret;
1021 }
1022
1023 class KVStatsCallback : public Callback<kvstats_ctx> {
1024     public:
1025         KVStatsCallback(EventuallyPersistentStore *store)
1026             : epstore(store) { }
1027
1028         void callback(kvstats_ctx &ctx) {
1029             RCPtr<VBucket> vb = epstore->getVBucket(ctx.vbucket);
1030             if (vb) {
1031                 vb->fileSpaceUsed = ctx.fileSpaceUsed;
1032                 vb->fileSize = ctx.fileSize;
1033             }
1034         }
1035
1036     private:
1037         EventuallyPersistentStore *epstore;
1038 };
1039
1040 void EventuallyPersistentStore::snapshotVBuckets(VBSnapshotTask::Priority prio,
1041                                                  uint16_t shardId) {
1042
1043     class VBucketStateVisitor : public VBucketVisitor {
1044     public:
1045         VBucketStateVisitor(VBucketMap &vb_map, uint16_t sid)
1046             : vbuckets(vb_map), shardId(sid) { }
1047         bool visitBucket(RCPtr<VBucket> &vb) {
1048             if (vbuckets.getShard(vb->getId())->getId() == shardId) {
1049                 snapshot_range_t range;
1050                 vb->getPersistedSnapshot(range);
1051                 std::string failovers = vb->failovers->toJSON();
1052                 uint64_t chkId = vbuckets.getPersistenceCheckpointId(vb->getId());
1053
1054                 vbucket_state vb_state(vb->getState(), chkId, 0,
1055                                        vb->getHighSeqno(), vb->getPurgeSeqno(),
1056                                        range.start, range.end, vb->getMaxCas(),
1057                                        vb->getDriftCounter() ,failovers);
1058                 states.insert(std::pair<uint16_t, vbucket_state>(vb->getId(), vb_state));
1059             }
1060             return false;
1061         }
1062
1063         void visit(StoredValue*) {
1064             cb_assert(false); // this does not happen
1065         }
1066
1067         std::map<uint16_t, vbucket_state> states;
1068
1069     private:
1070         VBucketMap &vbuckets;
1071         uint16_t shardId;
1072     };
1073
1074     KVShard *shard = vbMap.shards[shardId];
1075     if (prio == VBSnapshotTask::Priority::LOW) {
1076         shard->setLowPriorityVbSnapshotFlag(false);
1077     } else {
1078         shard->setHighPriorityVbSnapshotFlag(false);
1079     }
1080
1081     KVStatsCallback kvcb(this);
1082     VBucketStateVisitor v(vbMap, shard->getId());
1083     visit(v);
1084     hrtime_t start = gethrtime();
1085
1086     bool success = true;
1087     vbucket_map_t::reverse_iterator iter = v.states.rbegin();
1088     for (; iter != v.states.rend(); ++iter) {
1089         LockHolder lh(vb_mutexes[iter->first], true /*tryLock*/);
1090         if (!lh.islocked()) {
1091             continue;
1092         }
1093         KVStore *rwUnderlying = getRWUnderlying(iter->first);
1094         if (!rwUnderlying->snapshotVBucket(iter->first, iter->second,
1095                     &kvcb)) {
1096             LOG(EXTENSION_LOG_WARNING,
1097                     "VBucket snapshot task failed!!! Rescheduling");
1098             success = false;
1099             break;
1100         }
1101
1102         if (prio == VBSnapshotTask::Priority::HIGH) {
1103             if (vbMap.setBucketCreation(iter->first, false)) {
1104                 LOG(EXTENSION_LOG_INFO, "VBucket %d created", iter->first);
1105             }
1106         }
1107     }
1108
1109     if (!success) {
1110         scheduleVBSnapshot(prio, shard->getId());
1111     } else {
1112         stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
1113     }
1114 }
1115
1116 bool EventuallyPersistentStore::persistVBState(uint16_t vbid) {
1117     schedule_vbstate_persist[vbid] = false;
1118
1119     RCPtr<VBucket> vb = getVBucket(vbid);
1120     if (!vb) {
1121         LOG(EXTENSION_LOG_WARNING,
1122             "VBucket %d not exist!!! vb_state persistence task failed!!!", vbid);
1123         return false;
1124     }
1125
1126     bool inverse = false;
1127     LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1128     if (!lh.islocked()) {
1129         if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
1130                                                                    true)) {
1131             return true;
1132         } else {
1133             return false;
1134         }
1135     }
1136
1137     const hrtime_t start = gethrtime();
1138
1139     KVStatsCallback kvcb(this);
1140     uint64_t chkId = vbMap.getPersistenceCheckpointId(vbid);
1141     std::string failovers = vb->failovers->toJSON();
1142
1143     snapshot_range_t range;
1144     vb->getPersistedSnapshot(range);
1145     vbucket_state vb_state(vb->getState(), chkId, 0, vb->getHighSeqno(),
1146                            vb->getPurgeSeqno(), range.start, range.end,
1147                            vb->getMaxCas(), vb->getDriftCounter(),
1148                            failovers);
1149
1150     KVStore *rwUnderlying = getRWUnderlying(vbid);
1151     if (rwUnderlying->snapshotVBucket(vbid, vb_state, &kvcb)) {
1152         stats.persistVBStateHisto.add((gethrtime() - start) / 1000);
1153         if (vbMap.setBucketCreation(vbid, false)) {
1154             LOG(EXTENSION_LOG_INFO, "VBucket %d created", vbid);
1155         }
1156     } else {
1157         LOG(EXTENSION_LOG_WARNING,
1158             "VBucket %d: vb_state persistence task failed!!! Rescheduling", vbid);
1159
1160         if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
1161                                                                    true)) {
1162             return true;
1163         } else {
1164             return false;
1165         }
1166     }
1167     return false;
1168 }
1169
1170 ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
1171                                                            vbucket_state_t to,
1172                                                            bool transfer,
1173                                                            bool notify_dcp) {
1174     // Lock to prevent a race condition between a failed update and add.
1175     LockHolder lh(vbsetMutex);
1176     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1177     if (vb && to == vb->getState()) {
1178         return ENGINE_SUCCESS;
1179     }
1180
1181     if (vb) {
1182         vbucket_state_t oldstate = vb->getState();
1183         if (oldstate != to && notify_dcp) {
1184             engine.getDcpConnMap().vbucketStateChanged(vbid, to);
1185         }
1186
1187         vb->setState(to, engine.getServerApi());
1188
1189         if (to == vbucket_state_active && oldstate == vbucket_state_replica) {
1190             /**
1191              * Update snapshot range when vbucket goes from being a replica
1192              * to active, to maintain the correct snapshot sequence numbers
1193              * even in a failover scenario.
1194              */
1195             vb->checkpointManager.resetSnapshotRange();
1196         }
1197
1198         if (to == vbucket_state_active && !transfer) {
1199             snapshot_range_t range;
1200             vb->getPersistedSnapshot(range);
1201             if (range.end == vbMap.getPersistenceSeqno(vbid)) {
1202                 vb->failovers->createEntry(range.end);
1203             } else {
1204                 vb->failovers->createEntry(range.start);
1205             }
1206         }
1207
1208         lh.unlock();
1209         if (oldstate == vbucket_state_pending &&
1210             to == vbucket_state_active) {
1211             ExTask notifyTask = new PendingOpsNotification(engine, vb);
1212             ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
1213         }
1214         scheduleVBStatePersist(VBStatePersistTask::Priority::LOW, vbid);
1215     } else {
1216         FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
1217         KVShard* shard = vbMap.getShard(vbid);
1218         shared_ptr<Callback<uint16_t> > cb(new NotifyFlusherCB(shard));
1219         RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
1220                                          engine.getCheckpointConfig(),
1221                                          shard, 0, 0, 0, ft, cb));
1222         // The first checkpoint for active vbucket should start with id 2.
1223         uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
1224         newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
1225         if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
1226             lh.unlock();
1227             return ENGINE_ERANGE;
1228         }
1229         vbMap.setPersistenceCheckpointId(vbid, 0);
1230         vbMap.setPersistenceSeqno(vbid, 0);
1231         vbMap.setBucketCreation(vbid, true);
1232         lh.unlock();
1233         scheduleVBStatePersist(VBStatePersistTask::Priority::HIGH, vbid);
1234     }
1235     return ENGINE_SUCCESS;
1236 }
1237
1238 bool EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio) {
1239     KVShard *shard = NULL;
1240     if (prio == VBSnapshotTask::Priority::HIGH) {
1241         for (size_t i = 0; i < vbMap.numShards; ++i) {
1242             shard = vbMap.shards[i];
1243             if (shard->setHighPriorityVbSnapshotFlag(true)) {
1244                 ExTask task = new VBSnapshotTaskHigh(&engine, i, true);
1245                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1246             }
1247         }
1248     } else {
1249         for (size_t i = 0; i < vbMap.numShards; ++i) {
1250             shard = vbMap.shards[i];
1251             if (shard->setLowPriorityVbSnapshotFlag(true)) {
1252                 ExTask task = new VBSnapshotTaskLow(&engine, i, true);
1253                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1254             }
1255         }
1256     }
1257     if (stats.isShutdown) {
1258         return false;
1259     }
1260     return true;
1261 }
1262
1263 void EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio,
1264                                                    uint16_t shardId,
1265                                                    bool force) {
1266     KVShard *shard = vbMap.shards[shardId];
1267     if (prio == VBSnapshotTask::Priority::HIGH) {
1268         if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
1269             ExTask task = new VBSnapshotTaskHigh(&engine, shardId, true);
1270             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1271         }
1272     } else {
1273         if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
1274             ExTask task = new VBSnapshotTaskLow(&engine, shardId, true);
1275             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1276         }
1277     }
1278 }
1279
1280 void EventuallyPersistentStore::scheduleVBStatePersist(VBStatePersistTask::Priority priority,
1281                                                        uint16_t vbid,
1282                                                        bool force) {
1283     bool inverse = false;
1284     if (force ||
1285         schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
1286         if (priority == VBStatePersistTask::Priority::HIGH) {
1287             ExecutorPool::get()->schedule(new VBStatePersistTaskHigh(&engine, vbid, true), WRITER_TASK_IDX);
1288         } else {
1289             ExecutorPool::get()->schedule(new VBStatePersistTaskLow(&engine, vbid, true), WRITER_TASK_IDX);
1290         }
1291     }
1292 }
1293
1294 bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
1295                                                         const void* cookie) {
1296     LockHolder lh(vbsetMutex);
1297
1298     hrtime_t start_time(gethrtime());
1299     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1300     if (!vb || vb->getState() == vbucket_state_dead ||
1301          vbMap.isBucketDeletion(vbid)) {
1302         lh.unlock();
1303         LockHolder vlh(vb_mutexes[vbid]);
1304         getRWUnderlying(vbid)->delVBucket(vbid);
1305         vbMap.setBucketDeletion(vbid, false);
1306         vbMap.setPersistenceSeqno(vbid, 0);
1307         ++stats.vbucketDeletions;
1308     }
1309
1310     hrtime_t spent(gethrtime() - start_time);
1311     hrtime_t wall_time = spent / 1000;
1312     BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
1313     stats.diskVBDelHisto.add(wall_time);
1314     atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
1315     stats.vbucketDelTotWalltime.fetch_add(wall_time);
1316     if (cookie) {
1317         engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1318     }
1319
1320     return true;
1321 }
1322
1323 void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
1324                                                    const void* cookie,
1325                                                    double delay) {
1326     ExTask delTask = new VBucketMemoryDeletionTask(engine, vb, delay);
1327     ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
1328
1329     if (vbMap.setBucketDeletion(vb->getId(), true)) {
1330         ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie);
1331         ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1332     }
1333 }
1334
1335 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteVBucket(uint16_t vbid,
1336                                                            const void* c) {
1337     // Lock to prevent a race condition between a failed update and add
1338     // (and delete).
1339     LockHolder lh(vbsetMutex);
1340
1341     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1342     if (!vb) {
1343         return ENGINE_NOT_MY_VBUCKET;
1344     }
1345
1346     engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
1347     vbMap.removeBucket(vbid);
1348     lh.unlock();
1349     scheduleVBDeletion(vb, c);
1350     if (c) {
1351         return ENGINE_EWOULDBLOCK;
1352     }
1353     return ENGINE_SUCCESS;
1354 }
1355
1356 ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
1357                                                        compaction_ctx c,
1358                                                        const void *cookie) {
1359     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1360     if (!vb) {
1361         return ENGINE_NOT_MY_VBUCKET;
1362     }
1363
1364     LockHolder lh(compactionLock);
1365     ExTask task = new CompactVBucketTask(&engine, vbid, c, cookie);
1366     compactionTasks.push_back(std::make_pair(vbid, task));
1367     if (compactionTasks.size() > 1) {
1368         if ((stats.diskQueueSize > compactionWriteQueueCap &&
1369             compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
1370             engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
1371             // Snooze a new compaction task.
1372             // We will wake it up when one of the existing compaction tasks is done.
1373             task->snooze(60);
1374         }
1375     }
1376
1377     ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1378
1379     LOG(EXTENSION_LOG_DEBUG, "Scheduled compaction task %d on vbucket %d,"
1380         "purge_before_ts = %lld, purge_before_seq = %lld, dropdeletes = %d",
1381         task->getId(), vbid, c.purge_before_ts,
1382         c.purge_before_seq, c.drop_deletes);
1383
1384    return ENGINE_EWOULDBLOCK;
1385 }
1386
1387 class ExpiredItemsCallback : public Callback<compaction_ctx> {
1388     public:
1389         ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid)
1390             : epstore(store), vbucket(vbid) { }
1391
1392         void callback(compaction_ctx &ctx) {
1393             std::list<expiredItemCtx>::iterator it;
1394             for (it  = ctx.expiredItems.begin();
1395                  it != ctx.expiredItems.end(); it++) {
1396                 if (epstore->compactionCanExpireItems()) {
1397                     epstore->deleteExpiredItem(vbucket, it->keyStr,
1398                                                ctx.curr_time,
1399                                                it->revSeqno);
1400                 }
1401             }
1402         }
1403
1404     private:
1405         EventuallyPersistentStore *epstore;
1406         uint16_t vbucket;
1407 };
1408
1409 bool EventuallyPersistentStore::compactVBucket(const uint16_t vbid,
1410                                                compaction_ctx *ctx,
1411                                                const void *cookie) {
1412     ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1413     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1414     if (vb) {
1415         LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1416         if (!lh.islocked()) {
1417             return true; // Schedule a compaction task again.
1418         }
1419
1420         Configuration &config = getEPEngine().getConfiguration();
1421         if (config.isBfilterEnabled()) {
1422             size_t initial_estimation = config.getBfilterKeyCount();
1423             size_t estimated_count;
1424             size_t num_deletes =
1425                     getROUnderlying(vbid)->getNumPersistedDeletes(vbid);
1426             if (eviction_policy == VALUE_ONLY) {
1427                 /**
1428                  * VALUE-ONLY EVICTION POLICY
1429                  * Obtain number of persisted deletes from underlying kvstore.
1430                  * Bloomfilter's estimated_key_count = 1.25 * deletes
1431                  */
1432
1433                 estimated_count = round(1.25 * num_deletes);
1434                 ctx->bfcb = new BfilterCB(this, vbid, false);
1435             } else {
1436                 /**
1437                  * FULL EVICTION POLICY
1438                  * First determine if the resident ratio of vbucket is less than
1439                  * the threshold from configuration.
1440                  */
1441
1442                 bool residentRatioAlert = vb->isResidentRatioUnderThreshold(
1443                                                 getBfiltersResidencyThreshold(),
1444                                                 eviction_policy);
1445                 ctx->bfcb = new BfilterCB(this, vbid, residentRatioAlert);
1446
1447                 /**
1448                  * Based on resident ratio against threshold, estimate count.
1449                  *
1450                  * 1. If resident ratio is greater than the threshold:
1451                  * Obtain number of persisted deletes from underlying kvstore.
1452                  * Obtain number of non-resident-items for vbucket.
1453                  * Bloomfilter's estimated_key_count =
1454                  *                              1.25 * (deletes + non-resident)
1455                  *
1456                  * 2. Otherwise:
1457                  * Obtain number of items for vbucket.
1458                  * Bloomfilter's estimated_key_count =
1459                  *                              1.25 * (num_items)
1460                  */
1461
1462                 if (residentRatioAlert) {
1463                     estimated_count = round(1.25 *
1464                                             vb->getNumItems(eviction_policy));
1465                 } else {
1466                     estimated_count = round(1.25 * (num_deletes +
1467                                 vb->getNumNonResidentItems(eviction_policy)));
1468                 }
1469             }
1470             if (estimated_count < initial_estimation) {
1471                 estimated_count = initial_estimation;
1472             }
1473             vb->initTempFilter(estimated_count, config.getBfilterFpProb());
1474         }
1475
1476         if (vb->getState() == vbucket_state_active) {
1477             // Set the current time ONLY for active vbuckets.
1478             ctx->curr_time = ep_real_time();
1479         } else {
1480             ctx->curr_time = 0;
1481         }
1482         ExpiredItemsCallback cb(this, vbid);
1483         KVStatsCallback kvcb(this);
1484         if (getRWUnderlying(vbid)->compactVBucket(vbid, ctx, cb, kvcb)) {
1485             if (config.isBfilterEnabled()) {
1486                 vb->swapFilter();
1487             } else {
1488                 vb->clearFilter();
1489             }
1490         } else {
1491             LOG(EXTENSION_LOG_WARNING, "Compaction: Not successful for vb %u, "
1492                     "clearing bloom filter, if any.", vb->getId());
1493             vb->clearFilter();
1494         }
1495         vb->setPurgeSeqno(ctx->max_purged_seq);
1496     } else {
1497         err = ENGINE_NOT_MY_VBUCKET;
1498         engine.storeEngineSpecific(cookie, NULL);
1499         //Decrement session counter here, as memcached thread wouldn't
1500         //visit the engine interface in case of a NOT_MY_VB notification
1501         engine.decrementSessionCtr();
1502
1503     }
1504
1505     LockHolder lh(compactionLock);
1506     bool erased = false, woke = false;
1507     std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
1508     while (it != compactionTasks.end()) {
1509         if ((*it).first == vbid) {
1510             it = compactionTasks.erase(it);
1511             erased = true;
1512         } else {
1513             ExTask &task = (*it).second;
1514             if (task->getState() == TASK_SNOOZED) {
1515                 ExecutorPool::get()->wake(task->getId());
1516                 woke = true;
1517             }
1518             ++it;
1519         }
1520         if (erased && woke) {
1521             break;
1522         }
1523     }
1524     lh.unlock();
1525
1526     if (cookie) {
1527         engine.notifyIOComplete(cookie, err);
1528     }
1529     --stats.pendingCompactions;
1530     return false;
1531 }
1532
1533 bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
1534     LockHolder lh(vbsetMutex);
1535     bool rv(false);
1536
1537     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1538     if (vb) {
1539         vbucket_state_t vbstate = vb->getState();
1540
1541         vbMap.removeBucket(vbid);
1542         lh.unlock();
1543
1544         std::list<std::string> tap_cursors = vb->checkpointManager.
1545                                              getCursorNames();
1546         // Delete and recreate the vbucket database file
1547         scheduleVBDeletion(vb, NULL, 0);
1548         setVBucketState(vbid, vbstate, false);
1549
1550         // Copy the all cursors from the old vbucket into the new vbucket
1551         RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
1552         newvb->checkpointManager.resetCursors(tap_cursors);
1553
1554         rv = true;
1555     }
1556     return rv;
1557 }
1558
1559 extern "C" {
1560
1561     typedef struct {
1562         EventuallyPersistentEngine* engine;
1563         std::map<std::string, std::string> smap;
1564     } snapshot_stats_t;
1565
1566     static void add_stat(const char *key, const uint16_t klen,
1567                          const char *val, const uint32_t vlen,
1568                          const void *cookie) {
1569         cb_assert(cookie);
1570         void *ptr = const_cast<void *>(cookie);
1571         snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1572         ObjectRegistry::onSwitchThread(snap->engine);
1573
1574         std::string k(key, klen);
1575         std::string v(val, vlen);
1576         snap->smap.insert(std::pair<std::string, std::string>(k, v));
1577     }
1578 }
1579
1580 void EventuallyPersistentStore::snapshotStats() {
1581     snapshot_stats_t snap;
1582     snap.engine = &engine;
1583     std::map<std::string, std::string>  smap;
1584     bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1585               engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1586               engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS;
1587
1588     if (rv && stats.isShutdown) {
1589         snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1590                                                               "true" : "false";
1591         std::stringstream ss;
1592         ss << ep_real_time();
1593         snap.smap["ep_shutdown_time"] = ss.str();
1594     }
1595     getOneRWUnderlying()->snapshotStats(snap.smap);
1596 }
1597
1598 void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
1599                                               const hrtime_t start,
1600                                               const hrtime_t stop) {
1601     if (stop >= start && start >= init) {
1602         // skip the measurement if the counter wrapped...
1603         ++stats.bgNumOperations;
1604         hrtime_t w = (start - init) / 1000;
1605         BlockTimer::log(start - init, "bgwait", stats.timingLog);
1606         stats.bgWaitHisto.add(w);
1607         stats.bgWait.fetch_add(w);
1608         atomic_setIfLess(stats.bgMinWait, w);
1609         atomic_setIfBigger(stats.bgMaxWait, w);
1610
1611         hrtime_t l = (stop - start) / 1000;
1612         BlockTimer::log(stop - start, "bgload", stats.timingLog);
1613         stats.bgLoadHisto.add(l);
1614         stats.bgLoad.fetch_add(l);
1615         atomic_setIfLess(stats.bgMinLoad, l);
1616         atomic_setIfBigger(stats.bgMaxLoad, l);
1617     }
1618 }
1619
1620 void EventuallyPersistentStore::completeBGFetch(const std::string &key,
1621                                                 uint16_t vbucket,
1622                                                 const void *cookie,
1623                                                 hrtime_t init,
1624                                                 bool isMeta) {
1625     hrtime_t start(gethrtime());
1626     // Go find the data
1627     RememberingCallback<GetValue> gcb;
1628     if (isMeta) {
1629         gcb.val.setPartial();
1630         ++stats.bg_meta_fetched;
1631     } else {
1632         ++stats.bg_fetched;
1633     }
1634     getROUnderlying(vbucket)->get(key, vbucket, gcb);
1635     gcb.waitForValue();
1636     cb_assert(gcb.fired);
1637     ENGINE_ERROR_CODE status = gcb.val.getStatus();
1638
1639     // Lock to prevent a race condition between a fetch for restore and delete
1640     LockHolder lh(vbsetMutex);
1641
1642     RCPtr<VBucket> vb = getVBucket(vbucket);
1643     if (vb) {
1644         ReaderLockHolder rlh(vb->getStateLock());
1645         int bucket_num(0);
1646         LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1647         StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1648         if (isMeta) {
1649             if ((v && v->unlocked_restoreMeta(gcb.val.getValue(),
1650                                               gcb.val.getStatus(), vb->ht))
1651                 || ENGINE_KEY_ENOENT == status) {
1652                 /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1653                  key is removed from hash table by the time bgfetch returns
1654                  (in case multiple bgfetch is scheduled for a key), we still
1655                  need to return ENGINE_SUCCESS to the memcached worker thread,
1656                  so that the worker thread can visit the ep-engine and figure
1657                  out the correct flow */
1658                 status = ENGINE_SUCCESS;
1659             }
1660         } else {
1661             bool restore = false;
1662             if (v && v->isResident()) {
1663                 status = ENGINE_SUCCESS;
1664             } else {
1665                 switch (eviction_policy) {
1666                     case VALUE_ONLY:
1667                         if (v && !v->isResident() && !v->isDeleted()) {
1668                             restore = true;
1669                         }
1670                         break;
1671                     case FULL_EVICTION:
1672                         if (v) {
1673                             if (v->isTempInitialItem() ||
1674                                 (!v->isResident() && !v->isDeleted())) {
1675                                 restore = true;
1676                             }
1677                         }
1678                         break;
1679                     default:
1680                         throw std::logic_error("Unknown eviction policy");
1681                 }
1682             }
1683
1684             if (restore) {
1685                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1686                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1687                     cb_assert(v->isResident());
1688                     if (vb->getState() == vbucket_state_active &&
1689                         v->getExptime() != gcb.val.getValue()->getExptime() &&
1690                         v->getCas() == gcb.val.getValue()->getCas()) {
1691                         // MB-9306: It is possible that by the time bgfetcher
1692                         // returns, the item may have been updated and queued
1693                         // Hence test the CAS value to be the same first.
1694                         // exptime mutated, schedule it into new checkpoint
1695                         queueDirty(vb, v, &hlh, NULL);
1696                     }
1697                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1698                     v->setStoredValueState(
1699                                           StoredValue::state_non_existent_key);
1700                     if (eviction_policy == FULL_EVICTION) {
1701                         // For the full eviction, we should notify
1702                         // ENGINE_SUCCESS to the memcached worker thread, so
1703                         // that the worker thread can visit the ep-engine and
1704                         // figure out the correct error code.
1705                         status = ENGINE_SUCCESS;
1706                     }
1707                 } else {
1708                     // underlying kvstore couldn't fetch requested data
1709                     // log returned error and notify TMPFAIL to client
1710                     LOG(EXTENSION_LOG_WARNING,
1711                         "Warning: failed background fetch for vb=%d "
1712                         "seq=%" PRId64 " key=%s", vbucket, v->getBySeqno(),
1713                         key.c_str());
1714                     status = ENGINE_TMPFAIL;
1715                 }
1716             }
1717         }
1718     } else {
1719         LOG(EXTENSION_LOG_INFO, "VBucket %d's file was deleted in the middle of"
1720             " a bg fetch for key %s\n", vbucket, key.c_str());
1721         status = ENGINE_NOT_MY_VBUCKET;
1722     }
1723
1724     lh.unlock();
1725
1726     hrtime_t stop = gethrtime();
1727     updateBGStats(init, start, stop);
1728     bgFetchQueue--;
1729
1730     delete gcb.val.getValue();
1731     engine.notifyIOComplete(cookie, status);
1732 }
1733
1734 void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
1735                                  std::vector<bgfetched_item_t> &fetchedItems,
1736                                  hrtime_t startTime)
1737 {
1738     RCPtr<VBucket> vb = getVBucket(vbId);
1739     if (!vb) {
1740         std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1741         for (; itemItr != fetchedItems.end(); ++itemItr) {
1742             engine.notifyIOComplete((*itemItr).second->cookie,
1743                                     ENGINE_NOT_MY_VBUCKET);
1744         }
1745         LOG(EXTENSION_LOG_WARNING,
1746             "EP Store completes %d of batched background fetch for "
1747             "for vBucket = %d that is already deleted\n",
1748             (int)fetchedItems.size(), vbId);
1749         return;
1750     }
1751
1752     std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1753     for (; itemItr != fetchedItems.end(); ++itemItr) {
1754         VBucketBGFetchItem *bgitem = (*itemItr).second;
1755         ENGINE_ERROR_CODE status = bgitem->value.getStatus();
1756         Item *fetchedValue = bgitem->value.getValue();
1757         const std::string &key = (*itemItr).first;
1758         {   // locking scope
1759             ReaderLockHolder rlh(vb->getStateLock());
1760
1761             int bucket = 0;
1762             LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
1763             StoredValue *v = fetchValidValue(vb, key, bucket, true);
1764             if (bgitem->metaDataOnly) {
1765                 if ((v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht))
1766                     || ENGINE_KEY_ENOENT == status) {
1767                     /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1768                      key is removed from hash table by the time bgfetch returns
1769                      (in case multiple bgfetch is scheduled for a key), we still
1770                      need to return ENGINE_SUCCESS to the memcached worker thread,
1771                      so that the worker thread can visit the ep-engine and figure
1772                      out the correct flow */
1773                     status = ENGINE_SUCCESS;
1774                 }
1775             } else {
1776                 bool restore = false;
1777                 if (v && v->isResident()) {
1778                     status = ENGINE_SUCCESS;
1779                 } else {
1780                     switch (eviction_policy) {
1781                         case VALUE_ONLY:
1782                             if (v && !v->isResident() && !v->isDeleted()) {
1783                                 restore = true;
1784                             }
1785                             break;
1786                         case FULL_EVICTION:
1787                             if (v) {
1788                                 if (v->isTempInitialItem() ||
1789                                     (!v->isResident() && !v->isDeleted())) {
1790                                     restore = true;
1791                                 }
1792                             }
1793                             break;
1794                         default:
1795                             throw std::logic_error("Unknown eviction policy");
1796                     }
1797                 }
1798
1799                 if (restore) {
1800                     if (status == ENGINE_SUCCESS) {
1801                         v->unlocked_restoreValue(fetchedValue, vb->ht);
1802                         cb_assert(v->isResident());
1803                         ReaderLockHolder(vb->getStateLock());
1804                         if (vb->getState() == vbucket_state_active &&
1805                             v->getExptime() != fetchedValue->getExptime() &&
1806                             v->getCas() == fetchedValue->getCas()) {
1807                             // MB-9306: It is possible that by the time
1808                             // bgfetcher returns, the item may have been
1809                             // updated and queued
1810                             // Hence test the CAS value to be the same first.
1811                             // exptime mutated, schedule it into new checkpoint
1812                             queueDirty(vb, v, &blh, NULL);
1813                         }
1814                     } else if (status == ENGINE_KEY_ENOENT) {
1815                         v->setStoredValueState(StoredValue::state_non_existent_key);
1816                         if (eviction_policy == FULL_EVICTION) {
1817                             // For the full eviction, we should notify
1818                             // ENGINE_SUCCESS to the memcached worker thread,
1819                             // so that the worker thread can visit the
1820                             // ep-engine and figure out the correct error
1821                             // code.
1822                             status = ENGINE_SUCCESS;
1823                         }
1824                     } else {
1825                         // underlying kvstore couldn't fetch requested data
1826                         // log returned error and notify TMPFAIL to client
1827                         LOG(EXTENSION_LOG_WARNING,
1828                             "Warning: failed background fetch for vb=%d "
1829                             "key=%s", vbId, key.c_str());
1830                         status = ENGINE_TMPFAIL;
1831                     }
1832                 }
1833             }
1834         } // locking scope ends
1835
1836         if (bgitem->metaDataOnly) {
1837             ++stats.bg_meta_fetched;
1838         } else {
1839             ++stats.bg_fetched;
1840         }
1841
1842         hrtime_t endTime = gethrtime();
1843         updateBGStats(bgitem->initTime, startTime, endTime);
1844         engine.notifyIOComplete(bgitem->cookie, status);
1845     }
1846
1847     LOG(EXTENSION_LOG_DEBUG,
1848         "EP Store completes %d of batched background fetch "
1849         "for vBucket = %d endTime = %lld\n",
1850         fetchedItems.size(), vbId, gethrtime()/1000000);
1851 }
1852
1853 void EventuallyPersistentStore::bgFetch(const std::string &key,
1854                                         uint16_t vbucket,
1855                                         const void *cookie,
1856                                         bool isMeta) {
1857     std::stringstream ss;
1858
1859     if (multiBGFetchEnabled()) {
1860         RCPtr<VBucket> vb = getVBucket(vbucket);
1861         cb_assert(vb);
1862         KVShard *myShard = vbMap.getShard(vbucket);
1863
1864         // schedule to the current batch of background fetch of the given
1865         // vbucket
1866         VBucketBGFetchItem * fetchThis = new VBucketBGFetchItem(cookie,
1867                                                                 isMeta);
1868         vb->queueBGFetchItem(key, fetchThis, myShard->getBgFetcher());
1869         myShard->getBgFetcher()->notifyBGEvent();
1870         ss << "Queued a background fetch, now at "
1871            << vb->numPendingBGFetchItems() << std::endl;
1872         LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1873     } else {
1874         bgFetchQueue++;
1875         stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
1876                                             bgFetchQueue.load());
1877         ExecutorPool* iom = ExecutorPool::get();
1878         ExTask task = new SingleBGFetcherTask(&engine, key, vbucket, cookie,
1879                                               isMeta, bgFetchDelay, false);
1880         iom->schedule(task, READER_TASK_IDX);
1881         ss << "Queued a background fetch, now at " << bgFetchQueue.load()
1882            << std::endl;
1883         LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1884     }
1885 }
1886
1887 GetValue EventuallyPersistentStore::getInternal(const std::string &key,
1888                                                 uint16_t vbucket,
1889                                                 const void *cookie,
1890                                                 bool queueBG,
1891                                                 bool honorStates,
1892                                                 vbucket_state_t allowedState,
1893                                                 bool trackReference,
1894                                                 bool deleteTempItem,
1895                                                 bool hideLockedCAS) {
1896
1897     vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1898         vbucket_state_replica : vbucket_state_active;
1899     RCPtr<VBucket> vb = getVBucket(vbucket);
1900     if (!vb) {
1901         ++stats.numNotMyVBuckets;
1902         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1903     }
1904
1905     ReaderLockHolder rlh(vb->getStateLock());
1906     if (honorStates && vb->getState() == vbucket_state_dead) {
1907         ++stats.numNotMyVBuckets;
1908         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1909     } else if (honorStates && vb->getState() == disallowedState) {
1910         ++stats.numNotMyVBuckets;
1911         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1912     } else if (honorStates && vb->getState() == vbucket_state_pending) {
1913         if (vb->addPendingOp(cookie)) {
1914             return GetValue(NULL, ENGINE_EWOULDBLOCK);
1915         }
1916     }
1917
1918     int bucket_num(0);
1919     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1920     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
1921                                      trackReference);
1922     if (v) {
1923         if (v->isDeleted()) {
1924             GetValue rv;
1925             return rv;
1926         }
1927         if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1928             // Delete a temp non-existent item to ensure that
1929             // if the get were issued over an item that doesn't
1930             // exist, then we dont preserve a temp item.
1931             if (deleteTempItem) {
1932                 vb->ht.unlocked_del(key, bucket_num);
1933             }
1934             GetValue rv;
1935             return rv;
1936         }
1937
1938         // If the value is not resident, wait for it...
1939         if (!v->isResident()) {
1940             if (queueBG) {
1941                 bgFetch(key, vbucket, cookie);
1942             }
1943             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
1944                             true, v->getNRUValue());
1945         }
1946
1947         // Should we hide (return -1) for the items' CAS?
1948         const bool hide_cas = hideLockedCAS &&
1949                               v->isLocked(ep_current_time());
1950         GetValue rv(v->toItem(hide_cas, vbucket), ENGINE_SUCCESS,
1951                     v->getBySeqno(), false, v->getNRUValue());
1952         return rv;
1953     } else {
1954         if (eviction_policy == VALUE_ONLY || diskFlushAll) {
1955             GetValue rv;
1956             return rv;
1957         }
1958
1959         if (vb->maybeKeyExistsInFilter(key)) {
1960             ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1961             if (queueBG) { // Full eviction and need a bg fetch.
1962                 ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
1963                                            cookie, false);
1964             }
1965             return GetValue(NULL, ec, -1, true);
1966         } else {
1967             // As bloomfilter predicted that item surely doesn't exist
1968             // on disk, return ENONET, for getInternal().
1969             GetValue rv;
1970             return rv;
1971         }
1972     }
1973 }
1974
1975 GetValue EventuallyPersistentStore::getRandomKey() {
1976     long max = vbMap.getSize();
1977
1978     long start = random() % max;
1979     long curr = start;
1980     Item *itm = NULL;
1981
1982     while (itm == NULL) {
1983         RCPtr<VBucket> vb = getVBucket(curr++);
1984         while (!vb || vb->getState() != vbucket_state_active) {
1985             if (curr == start) {
1986                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1987             }
1988             if (curr == max) {
1989                 curr = 0;
1990             }
1991
1992             vb = getVBucket(curr++);
1993         }
1994
1995         if ((itm = vb->ht.getRandomKey(random())) != NULL) {
1996             GetValue rv(itm, ENGINE_SUCCESS);
1997             return rv;
1998         }
1999
2000         if (curr == max) {
2001             curr = 0;
2002         }
2003
2004         if (curr == start) {
2005             return GetValue(NULL, ENGINE_KEY_ENOENT);
2006         }
2007         // Search next vbucket
2008     }
2009
2010     return GetValue(NULL, ENGINE_KEY_ENOENT);
2011 }
2012
2013
2014 ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
2015                                                         const std::string &key,
2016                                                         uint16_t vbucket,
2017                                                         const void *cookie,
2018                                                         ItemMetaData &metadata,
2019                                                         uint32_t &deleted,
2020                                                         uint8_t &confResMode,
2021                                                         bool trackReferenced)
2022 {
2023     (void) cookie;
2024     RCPtr<VBucket> vb = getVBucket(vbucket);
2025     if (!vb) {
2026         ++stats.numNotMyVBuckets;
2027         return ENGINE_NOT_MY_VBUCKET;
2028     }
2029
2030     ReaderLockHolder rlh(vb->getStateLock());
2031     if (vb->getState() == vbucket_state_dead ||
2032         vb->getState() == vbucket_state_replica) {
2033         ++stats.numNotMyVBuckets;
2034         return ENGINE_NOT_MY_VBUCKET;
2035     }
2036
2037     int bucket_num(0);
2038     deleted = 0;
2039     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2040     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true,
2041                                           trackReferenced);
2042
2043     if (v) {
2044         stats.numOpsGetMeta++;
2045
2046         if (v->isTempInitialItem()) { // Need bg meta fetch.
2047             bgFetch(key, vbucket, cookie, true);
2048             return ENGINE_EWOULDBLOCK;
2049         } else if (v->isTempNonExistentItem()) {
2050             metadata.cas = v->getCas();
2051             return ENGINE_KEY_ENOENT;
2052         } else {
2053             if (v->isTempDeletedItem() || v->isDeleted() ||
2054                 v->isExpired(ep_real_time())) {
2055                 deleted |= GET_META_ITEM_DELETED_FLAG;
2056             }
2057
2058             if (v->isLocked(ep_current_time())) {
2059                 metadata.cas = static_cast<uint64_t>(-1);
2060             } else {
2061                 metadata.cas = v->getCas();
2062             }
2063             metadata.flags = v->getFlags();
2064             metadata.exptime = v->getExptime();
2065             metadata.revSeqno = v->getRevSeqno();
2066             confResMode = v->getConflictResMode();
2067             return ENGINE_SUCCESS;
2068         }
2069     } else {
2070         // The key wasn't found. However, this may be because it was previously
2071         // deleted or evicted with the full eviction strategy.
2072         // So, add a temporary item corresponding to the key to the hash table
2073         // and schedule a background fetch for its metadata from the persistent
2074         // store. The item's state will be updated after the fetch completes.
2075         //
2076         // Schedule this bgFetch only if the key is predicted to be may-be
2077         // existent on disk by the bloomfilter.
2078
2079         if (vb->maybeKeyExistsInFilter(key)) {
2080             return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
2081         } else {
2082             return ENGINE_KEY_ENOENT;
2083         }
2084     }
2085 }
2086
2087 ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
2088                                                      const Item &itm,
2089                                                      uint64_t cas,
2090                                                      uint64_t *seqno,
2091                                                      const void *cookie,
2092                                                      bool force,
2093                                                      bool allowExisting,
2094                                                      uint8_t nru,
2095                                                      bool genBySeqno,
2096                                                      ExtendedMetaData *emd,
2097                                                      bool isReplication)
2098 {
2099     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
2100     if (!vb) {
2101         ++stats.numNotMyVBuckets;
2102         return ENGINE_NOT_MY_VBUCKET;
2103     }
2104
2105     ReaderLockHolder rlh(vb->getStateLock());
2106     if (vb->getState() == vbucket_state_dead) {
2107         ++stats.numNotMyVBuckets;
2108         return ENGINE_NOT_MY_VBUCKET;
2109     } else if (vb->getState() == vbucket_state_replica && !force) {
2110         ++stats.numNotMyVBuckets;
2111         return ENGINE_NOT_MY_VBUCKET;
2112     } else if (vb->getState() == vbucket_state_pending && !force) {
2113         if (vb->addPendingOp(cookie)) {
2114             return ENGINE_EWOULDBLOCK;
2115         }
2116     }
2117
2118     //check for the incoming item's CAS validity
2119     if (!Item::isValidCas(itm.getCas())) {
2120         return ENGINE_KEY_EEXISTS;
2121     }
2122
2123     int bucket_num(0);
2124     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
2125     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
2126                                           false);
2127
2128     bool maybeKeyExists = true;
2129     if (!force) {
2130         if (v)  {
2131             if (v->isTempInitialItem()) {
2132                 bgFetch(itm.getKey(), itm.getVBucketId(), cookie, true);
2133                 return ENGINE_EWOULDBLOCK;
2134             }
2135
2136             enum conflict_resolution_mode confResMode = revision_seqno;
2137             if (emd) {
2138                 confResMode = static_cast<enum conflict_resolution_mode>(
2139                                                        emd->getConflictResMode());
2140             }
2141
2142             if (!conflictResolver->resolve(vb, v, itm.getMetaData(), false,
2143                                            confResMode)) {
2144                 ++stats.numOpsSetMetaResolutionFailed;
2145                 return ENGINE_KEY_EEXISTS;
2146             }
2147         } else {
2148             if (vb->maybeKeyExistsInFilter(itm.getKey())) {
2149                 return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
2150                                              cookie, true, isReplication);
2151             } else {
2152                 maybeKeyExists = false;
2153             }
2154         }
2155     } else {
2156         if (eviction_policy == FULL_EVICTION) {
2157             // Check Bloomfilter's prediction
2158             if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
2159                 maybeKeyExists = false;
2160             }
2161         }
2162     }
2163
2164     if (v && v->isLocked(ep_current_time()) &&
2165         (vb->getState() == vbucket_state_replica ||
2166          vb->getState() == vbucket_state_pending)) {
2167         v->unlock();
2168     }
2169
2170     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
2171                                                 true, eviction_policy, nru,
2172                                                 maybeKeyExists, isReplication);
2173
2174     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2175     switch (mtype) {
2176     case NOMEM:
2177         ret = ENGINE_ENOMEM;
2178         break;
2179     case INVALID_CAS:
2180     case IS_LOCKED:
2181         ret = ENGINE_KEY_EEXISTS;
2182         break;
2183     case INVALID_VBUCKET:
2184         ret = ENGINE_NOT_MY_VBUCKET;
2185         break;
2186     case WAS_DIRTY:
2187     case WAS_CLEAN:
2188         /* set the conflict resolution mode from the extended meta data *
2189          * Given that the mode is already set, we don't need to set the *
2190          * conflict resolution mode in queueDirty */
2191         if (emd) {
2192             v->setConflictResMode(
2193                       static_cast<enum conflict_resolution_mode>(
2194                                             emd->getConflictResMode()));
2195         }
2196         vb->setMaxCas(v->getCas());
2197         queueDirty(vb, v, &lh, seqno, false, true, genBySeqno, false);
2198         break;
2199     case NOT_FOUND:
2200         ret = ENGINE_KEY_ENOENT;
2201         break;
2202     case NEED_BG_FETCH:
2203         {            // CAS operation with non-resident item + full eviction.
2204             if (v) { // temp item is already created. Simply schedule a
2205                 lh.unlock(); // bg fetch job.
2206                 bgFetch(itm.getKey(), vb->getId(), cookie, true);
2207                 return ENGINE_EWOULDBLOCK;
2208             }
2209
2210             ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
2211                                         cookie, true, isReplication);
2212         }
2213     }
2214
2215     // Update drift counter for vbucket upon a success only
2216     if (ret == ENGINE_SUCCESS && emd) {
2217         vb->setDriftCounter(emd->getAdjustedTime());
2218     }
2219
2220     return ret;
2221 }
2222
2223 GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
2224                                                     uint16_t vbucket,
2225                                                     const void *cookie,
2226                                                     time_t exptime)
2227 {
2228     RCPtr<VBucket> vb = getVBucket(vbucket);
2229     if (!vb) {
2230         ++stats.numNotMyVBuckets;
2231         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2232     }
2233
2234     ReaderLockHolder rlh(vb->getStateLock());
2235     if (vb->getState() == vbucket_state_dead) {
2236         ++stats.numNotMyVBuckets;
2237         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2238     } else if (vb->getState() == vbucket_state_replica) {
2239         ++stats.numNotMyVBuckets;
2240         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2241     } else if (vb->getState() == vbucket_state_pending) {
2242         if (vb->addPendingOp(cookie)) {
2243             return GetValue(NULL, ENGINE_EWOULDBLOCK);
2244         }
2245     }
2246
2247     int bucket_num(0);
2248     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2249     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2250
2251     if (v) {
2252         if (v->isDeleted() || v->isTempDeletedItem() ||
2253             v->isTempNonExistentItem()) {
2254             GetValue rv;
2255             return rv;
2256         }
2257
2258         if (!v->isResident()) {
2259             bgFetch(key, vbucket, cookie);
2260             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
2261         }
2262         if (v->isLocked(ep_current_time())) {
2263             GetValue rv(NULL, ENGINE_KEY_EEXISTS, 0);
2264             return rv;
2265         }
2266
2267         bool exptime_mutated = exptime != v->getExptime() ? true : false;
2268         if (exptime_mutated) {
2269            v->markDirty();
2270            v->setExptime(exptime);
2271         }
2272
2273         GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
2274                     ENGINE_SUCCESS, v->getBySeqno());
2275
2276         if (exptime_mutated) {
2277             if (vb->getState() == vbucket_state_active) {
2278                 // persist the item in the underlying storage for
2279                 // mutated exptime but only if VB is active.
2280                 queueDirty(vb, v, &lh, NULL);
2281             }
2282         }
2283         return rv;
2284     } else {
2285         if (eviction_policy == VALUE_ONLY) {
2286             GetValue rv;
2287             return rv;
2288         } else {
2289             if (vb->maybeKeyExistsInFilter(key)) {
2290                 ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
2291                                                              key, vb, cookie,
2292                                                              false);
2293                 return GetValue(NULL, ec, -1, true);
2294             } else {
2295                 // As bloomfilter predicted that item surely doesn't exist
2296                 // on disk, return ENOENT for getAndUpdateTtl().
2297                 GetValue rv;
2298                 return rv;
2299             }
2300         }
2301     }
2302 }
2303
2304 ENGINE_ERROR_CODE
2305 EventuallyPersistentStore::statsVKey(const std::string &key,
2306                                      uint16_t vbucket,
2307                                      const void *cookie) {
2308     RCPtr<VBucket> vb = getVBucket(vbucket);
2309     if (!vb) {
2310         return ENGINE_NOT_MY_VBUCKET;
2311     }
2312
2313     int bucket_num(0);
2314     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2315     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2316
2317     if (v) {
2318         if (v->isDeleted() || v->isTempDeletedItem() ||
2319             v->isTempNonExistentItem()) {
2320             return ENGINE_KEY_ENOENT;
2321         }
2322         bgFetchQueue++;
2323         cb_assert(bgFetchQueue > 0);
2324         ExecutorPool* iom = ExecutorPool::get();
2325         ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
2326                                            v->getBySeqno(), cookie,
2327                                            bgFetchDelay, false);
2328         iom->schedule(task, READER_TASK_IDX);
2329         return ENGINE_EWOULDBLOCK;
2330     } else {
2331         if (eviction_policy == VALUE_ONLY) {
2332             return ENGINE_KEY_ENOENT;
2333         } else {
2334             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2335                                                         eviction_policy);
2336             switch(rv) {
2337             case ADD_NOMEM:
2338                 return ENGINE_ENOMEM;
2339             case ADD_EXISTS:
2340             case ADD_UNDEL:
2341             case ADD_SUCCESS:
2342             case ADD_TMP_AND_BG_FETCH:
2343                 // Since the hashtable bucket is locked, we shouldn't get here
2344                 abort();
2345             case ADD_BG_FETCH:
2346                 {
2347                     ++bgFetchQueue;
2348                     cb_assert(bgFetchQueue > 0);
2349                     ExecutorPool* iom = ExecutorPool::get();
2350                     ExTask task = new VKeyStatBGFetchTask(&engine, key,
2351                                                           vbucket, -1, cookie,
2352                                                           bgFetchDelay, false);
2353                     iom->schedule(task, READER_TASK_IDX);
2354                 }
2355             }
2356             return ENGINE_EWOULDBLOCK;
2357         }
2358     }
2359 }
2360
2361 void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
2362                                                   std::string &key,
2363                                                   uint16_t vbid,
2364                                                   uint64_t bySeqNum) {
2365     RememberingCallback<GetValue> gcb;
2366
2367     getROUnderlying(vbid)->get(key, vbid, gcb);
2368     gcb.waitForValue();
2369     cb_assert(gcb.fired);
2370
2371     if (eviction_policy == FULL_EVICTION) {
2372         RCPtr<VBucket> vb = getVBucket(vbid);
2373         if (vb) {
2374             int bucket_num(0);
2375             LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
2376             StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2377             if (v && v->isTempInitialItem()) {
2378                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2379                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
2380                     cb_assert(v->isResident());
2381                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
2382                     v->setStoredValueState(
2383                                           StoredValue::state_non_existent_key);
2384                 } else {
2385                     // underlying kvstore couldn't fetch requested data
2386                     // log returned error and notify TMPFAIL to client
2387                     LOG(EXTENSION_LOG_WARNING,
2388                         "Warning: failed background fetch for vb=%d "
2389                         "seq=%" PRId64 " key=%s", vbid, v->getBySeqno(),
2390                         key.c_str());
2391                 }
2392             }
2393         }
2394     }
2395
2396     if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2397         engine.addLookupResult(cookie, gcb.val.getValue());
2398     } else {
2399         engine.addLookupResult(cookie, NULL);
2400     }
2401
2402     bgFetchQueue--;
2403     engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
2404 }
2405
2406 bool EventuallyPersistentStore::getLocked(const std::string &key,
2407                                           uint16_t vbucket,
2408                                           Callback<GetValue> &cb,
2409                                           rel_time_t currentTime,
2410                                           uint32_t lockTimeout,
2411                                           const void *cookie) {
2412     RCPtr<VBucket> vb = getVBucket(vbucket);
2413     if (!vb || vb->getState() != vbucket_state_active) {
2414         ++stats.numNotMyVBuckets;
2415         GetValue rv(NULL, ENGINE_NOT_MY_VBUCKET);
2416         cb.callback(rv);
2417         return false;
2418     }
2419
2420     int bucket_num(0);
2421     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2422     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2423
2424     if (v) {
2425         if (v->isDeleted() || v->isTempNonExistentItem() ||
2426             v->isTempDeletedItem()) {
2427             GetValue rv;
2428             cb.callback(rv);
2429             return true;
2430         }
2431
2432         // if v is locked return error
2433         if (v->isLocked(currentTime)) {
2434             GetValue rv;
2435             cb.callback(rv);
2436             return false;
2437         }
2438
2439         // If the value is not resident, wait for it...
2440         if (!v->isResident()) {
2441             if (cookie) {
2442                 bgFetch(key, vbucket, cookie);
2443             }
2444             GetValue rv(NULL, ENGINE_EWOULDBLOCK, -1, true);
2445             cb.callback(rv);
2446             return false;
2447         }
2448
2449         // acquire lock and increment cas value
2450         v->lock(currentTime + lockTimeout);
2451
2452         Item *it = v->toItem(false, vbucket);
2453         it->setCas(vb->nextHLCCas());
2454         v->setCas(it->getCas());
2455
2456         GetValue rv(it);
2457         cb.callback(rv);
2458         return true;
2459     } else {
2460         if (eviction_policy == VALUE_ONLY) {
2461             GetValue rv;
2462             cb.callback(rv);
2463             return true;
2464         } else {
2465             if (vb->maybeKeyExistsInFilter(key)) {
2466                 ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
2467                                                              key, vb, cookie,
2468                                                              false);
2469                 GetValue rv(NULL, ec, -1, true);
2470                 cb.callback(rv);
2471                 return false;
2472             } else {
2473                 // As bloomfilter predicted that item surely doesn't exist
2474                 // on disk, return ENOENT for getLocked().
2475                 GetValue rv;
2476                 cb.callback(rv);
2477                 return true;
2478             }
2479         }
2480     }
2481 }
2482
2483 ENGINE_ERROR_CODE
2484 EventuallyPersistentStore::unlockKey(const std::string &key,
2485                                      uint16_t vbucket,
2486                                      uint64_t cas,
2487                                      rel_time_t currentTime)
2488 {
2489
2490     RCPtr<VBucket> vb = getVBucket(vbucket);
2491     if (!vb || vb->getState() != vbucket_state_active) {
2492         ++stats.numNotMyVBuckets;
2493         return ENGINE_NOT_MY_VBUCKET;
2494     }
2495
2496     int bucket_num(0);
2497     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2498     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2499
2500     if (v) {
2501         if (v->isDeleted() || v->isTempNonExistentItem() ||
2502             v->isTempDeletedItem()) {
2503             return ENGINE_KEY_ENOENT;
2504         }
2505         if (v->isLocked(currentTime)) {
2506             if (v->getCas() == cas) {
2507                 v->unlock();
2508                 return ENGINE_SUCCESS;
2509             }
2510         }
2511         return ENGINE_TMPFAIL;
2512     } else {
2513         if (eviction_policy == VALUE_ONLY) {
2514             return ENGINE_KEY_ENOENT;
2515         } else {
2516             // With the full eviction, an item's lock is automatically
2517             // released when the item is evicted from memory. Therefore,
2518             // we simply return ENGINE_TMPFAIL when we receive unlockKey
2519             // for an item that is not in memocy cache. Note that we don't
2520             // spawn any bg fetch job to figure out if an item actually
2521             // exists in disk or not.
2522             return ENGINE_TMPFAIL;
2523         }
2524     }
2525 }
2526
2527
2528 ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
2529                                             const std::string &key,
2530                                             uint16_t vbucket,
2531                                             const void *cookie,
2532                                             struct key_stats &kstats,
2533                                             bool bgfetch,
2534                                             bool wantsDeleted)
2535 {
2536     RCPtr<VBucket> vb = getVBucket(vbucket);
2537     if (!vb) {
2538         return ENGINE_NOT_MY_VBUCKET;
2539     }
2540
2541     int bucket_num(0);
2542     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2543     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2544
2545     if (v) {
2546         if ((v->isDeleted() && !wantsDeleted) ||
2547             v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2548             return ENGINE_KEY_ENOENT;
2549         }
2550         if (eviction_policy == FULL_EVICTION &&
2551             v->isTempInitialItem() && bgfetch) {
2552             lh.unlock();
2553             bgFetch(key, vbucket, cookie, true);
2554             return ENGINE_EWOULDBLOCK;
2555         }
2556         kstats.logically_deleted = v->isDeleted();
2557         kstats.dirty = v->isDirty();
2558         kstats.exptime = v->getExptime();
2559         kstats.flags = v->getFlags();
2560         kstats.cas = v->getCas();
2561         kstats.vb_state = vb->getState();
2562         return ENGINE_SUCCESS;
2563     } else {
2564         if (eviction_policy == VALUE_ONLY) {
2565             return ENGINE_KEY_ENOENT;
2566         } else {
2567             if (bgfetch && vb->maybeKeyExistsInFilter(key)) {
2568                 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2569                                              cookie, true);
2570             } else {
2571                 // If bgFetch were false, or bloomfilter predicted that
2572                 // item surely doesn't exist on disk, return ENOENT for
2573                 // getKeyStats().
2574                 return ENGINE_KEY_ENOENT;
2575             }
2576         }
2577     }
2578 }
2579
2580 std::string EventuallyPersistentStore::validateKey(const std::string &key,
2581                                                    uint16_t vbucket,
2582                                                    Item &diskItem) {
2583     int bucket_num(0);
2584     RCPtr<VBucket> vb = getVBucket(vbucket);
2585     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2586     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
2587                                      false, true);
2588
2589     if (v) {
2590         if (v->isDeleted() || v->isTempNonExistentItem() ||
2591             v->isTempDeletedItem()) {
2592             return "item_deleted";
2593         }
2594
2595         if (diskItem.getFlags() != v->getFlags()) {
2596             return "flags_mismatch";
2597         } else if (v->isResident() && memcmp(diskItem.getData(),
2598                                              v->getValue()->getData(),
2599                                              diskItem.getNBytes())) {
2600             return "data_mismatch";
2601         } else {
2602             return "valid";
2603         }
2604     } else {
2605         return "item_deleted";
2606     }
2607
2608 }
2609
2610 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
2611                                                         uint64_t *cas,
2612                                                         uint16_t vbucket,
2613                                                         const void *cookie,
2614                                                         bool force,
2615                                                         ItemMetaData *itemMeta,
2616                                                         mutation_descr_t *mutInfo,
2617                                                         bool tapBackfill)
2618 {
2619     RCPtr<VBucket> vb = getVBucket(vbucket);
2620     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2621         ++stats.numNotMyVBuckets;
2622         return ENGINE_NOT_MY_VBUCKET;
2623     } else if(vb->getState() == vbucket_state_replica && !force) {
2624         ++stats.numNotMyVBuckets;
2625         return ENGINE_NOT_MY_VBUCKET;
2626     } else if(vb->getState() == vbucket_state_pending && !force) {
2627         if (vb->addPendingOp(cookie)) {
2628             return ENGINE_EWOULDBLOCK;
2629         }
2630     }
2631
2632     int bucket_num(0);
2633     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2634     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2635     if (!v || v->isDeleted() || v->isTempItem()) {
2636         if (eviction_policy == VALUE_ONLY) {
2637             return ENGINE_KEY_ENOENT;
2638         } else { // Full eviction.
2639             if (!force) {
2640                 if (!v) { // Item might be evicted from cache.
2641                     if (vb->maybeKeyExistsInFilter(key)) {
2642                         return addTempItemForBgFetch(lh, bucket_num, key, vb,
2643                                                      cookie, true);
2644                     } else {
2645                         // As bloomfilter predicted that item surely doesn't
2646                         // exist on disk, return ENOENT for deleteItem().
2647                         return ENGINE_KEY_ENOENT;
2648                     }
2649                 } else if (v->isTempInitialItem()) {
2650                     lh.unlock();
2651                     bgFetch(key, vbucket, cookie, true);
2652                     return ENGINE_EWOULDBLOCK;
2653                 } else { // Non-existent or deleted key.
2654                     if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2655                         // Delete a temp non-existent item to ensure that
2656                         // if a delete were issued over an item that doesn't
2657                         // exist, then we don't preserve a temp item.
2658                         vb->ht.unlocked_del(key, bucket_num);
2659                     }
2660                     return ENGINE_KEY_ENOENT;
2661                 }
2662             } else {
2663                 if (!v) { // Item might be evicted from cache.
2664                     // Create a temp item and delete it below as it is a
2665                     // force deletion, only if bloomfilter predicts that
2666                     // item may exist on disk.
2667                     if (vb->maybeKeyExistsInFilter(key)) {
2668                         add_type_t rv = vb->ht.unlocked_addTempItem(
2669                                                                bucket_num,
2670                                                                key,
2671                                                                eviction_policy);
2672                         if (rv == ADD_NOMEM) {
2673                             return ENGINE_ENOMEM;
2674                         }
2675                         v = vb->ht.unlocked_find(key, bucket_num, true, false);
2676                         v->setStoredValueState(StoredValue::state_deleted_key);
2677                     } else {
2678                         return ENGINE_KEY_ENOENT;
2679                     }
2680                 } else if (v->isTempInitialItem()) {
2681                     v->setStoredValueState(StoredValue::state_deleted_key);
2682                 } else { // Non-existent or deleted key.
2683                     if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2684                         // Delete a temp non-existent item to ensure that
2685                         // if a delete were issued over an item that doesn't
2686                         // exist, then we don't preserve a temp item.
2687                         vb->ht.unlocked_del(key, bucket_num);
2688                     }
2689                     return ENGINE_KEY_ENOENT;
2690                 }
2691             }
2692         }
2693     }
2694
2695     if (v && v->isLocked(ep_current_time()) &&
2696         (vb->getState() == vbucket_state_replica ||
2697          vb->getState() == vbucket_state_pending)) {
2698         v->unlock();
2699     }
2700     mutation_type_t delrv;
2701     delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
2702
2703     if (itemMeta && v) {
2704         itemMeta->revSeqno = v->getRevSeqno();
2705         itemMeta->cas = v->getCas();
2706         itemMeta->flags = v->getFlags();
2707         itemMeta->exptime = v->getExptime();
2708     }
2709     *cas = v ? v->getCas() : 0;
2710
2711     uint64_t seqno = 0;
2712     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2713     switch (delrv) {
2714     case NOMEM:
2715         ret = ENGINE_ENOMEM;
2716         break;
2717     case INVALID_VBUCKET:
2718         ret = ENGINE_NOT_MY_VBUCKET;
2719         break;
2720     case INVALID_CAS:
2721         ret = ENGINE_KEY_EEXISTS;
2722         break;
2723     case IS_LOCKED:
2724         ret = ENGINE_TMPFAIL;
2725         break;
2726     case NOT_FOUND:
2727         ret = ENGINE_KEY_ENOENT;
2728         if (v) {
2729             queueDirty(vb, v, &lh, NULL, tapBackfill);
2730         }
2731         break;
2732     case WAS_DIRTY:
2733     case WAS_CLEAN:
2734         queueDirty(vb, v, &lh, &seqno, tapBackfill);
2735         mutInfo->seqno = seqno;
2736         mutInfo->vbucket_uuid = vb->failovers->getLatestUUID();
2737         break;
2738     case NEED_BG_FETCH:
2739         // We already figured out if a bg fetch is requred for a full-evicted
2740         // item above.
2741         abort();
2742     }
2743     return ret;
2744 }
2745
2746 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
2747                                                      const std::string &key,
2748                                                      uint64_t *cas,
2749                                                      uint64_t *seqno,
2750                                                      uint16_t vbucket,
2751                                                      const void *cookie,
2752                                                      bool force,
2753                                                      ItemMetaData *itemMeta,
2754                                                      bool tapBackfill,
2755                                                      bool genBySeqno,
2756                                                      uint64_t bySeqno,
2757                                                      ExtendedMetaData *emd,
2758                                                      bool isReplication)
2759 {
2760     RCPtr<VBucket> vb = getVBucket(vbucket);
2761     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2762         ++stats.numNotMyVBuckets;
2763         return ENGINE_NOT_MY_VBUCKET;
2764     } else if(vb->getState() == vbucket_state_replica && !force) {
2765         ++stats.numNotMyVBuckets;
2766         return ENGINE_NOT_MY_VBUCKET;
2767     } else if(vb->getState() == vbucket_state_pending && !force) {
2768         if (vb->addPendingOp(cookie)) {
2769             return ENGINE_EWOULDBLOCK;
2770         }
2771     }
2772
2773     //check for the incoming item's CAS validity
2774     if (!Item::isValidCas(itemMeta->cas)) {
2775         return ENGINE_KEY_EEXISTS;
2776     }
2777
2778     int bucket_num(0);
2779     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2780     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2781     if (!force) { // Need conflict resolution.
2782         if (v)  {
2783             if (v->isTempInitialItem()) {
2784                 bgFetch(key, vbucket, cookie, true);
2785                 return ENGINE_EWOULDBLOCK;
2786             }
2787
2788             enum conflict_resolution_mode confResMode = revision_seqno;
2789             if (emd) {
2790                 confResMode = static_cast<enum conflict_resolution_mode>(
2791                                                        emd->getConflictResMode());
2792             }
2793
2794             if (!conflictResolver->resolve(vb, v, *itemMeta, true, confResMode)) {
2795                 ++stats.numOpsDelMetaResolutionFailed;
2796                 return ENGINE_KEY_EEXISTS;
2797             }
2798         } else {
2799             // Item is 1) deleted or not existent in the value eviction case OR
2800             // 2) deleted or evicted in the full eviction.
2801             if (vb->maybeKeyExistsInFilter(key)) {
2802                 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2803                                              cookie, true, isReplication);
2804             } else {
2805                 // Even though bloomfilter predicted that item doesn't exist
2806                 // on disk, we must put this delete on disk if the cas is valid.
2807                 add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2808                                                             eviction_policy,
2809                                                             isReplication);
2810                 if (rv == ADD_NOMEM) {
2811                     return ENGINE_ENOMEM;
2812                 }
2813                 v = vb->ht.unlocked_find(key, bucket_num, true, false);
2814                 v->setStoredValueState(StoredValue::state_deleted_key);
2815             }
2816         }
2817     } else {
2818         if (!v) {
2819             // We should always try to persist a delete here.
2820             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2821                                                         eviction_policy,
2822                                                         isReplication);
2823             if (rv == ADD_NOMEM) {
2824                 return ENGINE_ENOMEM;
2825             }
2826             v = vb->ht.unlocked_find(key, bucket_num, true, false);
2827             v->setStoredValueState(StoredValue::state_deleted_key);
2828             v->setCas(*cas);
2829         } else if (v->isTempInitialItem()) {
2830             v->setStoredValueState(StoredValue::state_deleted_key);
2831             v->setCas(*cas);
2832         }
2833     }
2834
2835     if (v && v->isLocked(ep_current_time()) &&
2836         (vb->getState() == vbucket_state_replica ||
2837          vb->getState() == vbucket_state_pending)) {
2838         v->unlock();
2839     }
2840     mutation_type_t delrv;
2841     delrv = vb->ht.unlocked_softDelete(v, *cas, *itemMeta,
2842                                        eviction_policy, true);
2843     *cas = v ? v->getCas() : 0;
2844
2845     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2846     switch (delrv) {
2847     case NOMEM:
2848         ret = ENGINE_ENOMEM;
2849         break;
2850     case INVALID_VBUCKET:
2851         ret = ENGINE_NOT_MY_VBUCKET;
2852         break;
2853     case INVALID_CAS:
2854         ret = ENGINE_KEY_EEXISTS;
2855         break;
2856     case IS_LOCKED:
2857         ret = ENGINE_TMPFAIL;
2858         break;
2859     case NOT_FOUND:
2860         ret = ENGINE_KEY_ENOENT;
2861         break;
2862     case WAS_DIRTY:
2863     case WAS_CLEAN:
2864         if (!genBySeqno) {
2865             v->setBySeqno(bySeqno);
2866         }
2867
2868         /* set the conflict resolution mode from the extended meta data *
2869          * Given that the mode is already set, we don't need to set the *
2870          * conflict resolution mode in queueDirty */
2871         if (emd) {
2872             v->setConflictResMode(
2873                static_cast<enum conflict_resolution_mode>(
2874                                          emd->getConflictResMode()));
2875         }
2876         vb->setMaxCas(v->getCas());
2877         queueDirty(vb, v, &lh, seqno, tapBackfill, true, genBySeqno, false);
2878         break;
2879     case NEED_BG_FETCH:
2880         lh.unlock();
2881         bgFetch(key, vbucket, cookie, true);
2882         ret = ENGINE_EWOULDBLOCK;
2883     }
2884
2885     // Update drift counter for vbucket upon a success only
2886     if (ret == ENGINE_SUCCESS && emd) {
2887         vb->setDriftCounter(emd->getAdjustedTime());
2888     }
2889
2890     return ret;
2891 }
2892
2893 void EventuallyPersistentStore::reset() {
2894     std::vector<int> buckets = vbMap.getBuckets();
2895     std::vector<int>::iterator it;
2896     for (it = buckets.begin(); it != buckets.end(); ++it) {
2897         RCPtr<VBucket> vb = getVBucket(*it);
2898         if (vb) {
2899             LockHolder lh(vb_mutexes[vb->getId()]);
2900             vb->ht.clear();
2901             vb->checkpointManager.clear(vb->getState());
2902             vb->resetStats();
2903             vb->setPersistedSnapshot(0, 0);
2904         }
2905     }
2906
2907     ++stats.diskQueueSize;
2908     bool inverse = true;
2909     flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, false);
2910     // Waking up (notifying) one flusher is good enough for diskFlushAll
2911     vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
2912 }
2913
2914 /**
2915  * Callback invoked after persisting an item from memory to disk.
2916  *
2917  * This class exists to create a closure around a few variables within
2918  * EventuallyPersistentStore::flushOne so that an object can be
2919  * requeued in case of failure to store in the underlying layer.
2920  */
2921 class PersistenceCallback : public Callback<mutation_result>,
2922                             public Callback<int> {
2923 public:
2924
2925     PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
2926                         EventuallyPersistentStore *st, EPStats *s, uint64_t c)
2927         : queuedItem(qi), vbucket(vb), store(st), stats(s), cas(c) {
2928         cb_assert(vb);
2929         cb_assert(s);
2930     }
2931
2932     // This callback is invoked for set only.
2933     void callback(mutation_result &value) {
2934         if (value.first == 1) {
2935             int bucket_num(0);
2936             LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2937                                                         &bucket_num);
2938             StoredValue *v = store->fetchValidValue(vbucket,
2939                                                     queuedItem->getKey(),
2940                                                     bucket_num, true, false);
2941             if (v) {
2942                 if (v->getCas() == cas) {
2943                     // mark this item clean only if current and stored cas
2944                     // value match
2945                     v->markClean();
2946                 }
2947                 if (v->isNewCacheItem()) {
2948                     if (value.second) {
2949                         // Insert in value-only or full eviction mode.
2950                         ++vbucket->opsCreate;
2951                         vbucket->incrMetaDataDisk(*queuedItem);
2952                     } else { // Update in full eviction mode.
2953                         vbucket->ht.decrNumTotalItems();
2954                         ++vbucket->opsUpdate;
2955                     }
2956                     v->setNewCacheItem(false);
2957                 } else { // Update in value-only or full eviction mode.
2958                     ++vbucket->opsUpdate;
2959                 }
2960             }
2961
2962             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2963             stats->decrDiskQueueSize(1);
2964             stats->totalPersisted++;
2965         } else {
2966             // If the return was 0 here, we're in a bad state because
2967             // we do not know the rowid of this object.
2968             if (value.first == 0) {
2969                 int bucket_num(0);
2970                 LockHolder lh = vbucket->ht.getLockedBucket(
2971                                            queuedItem->getKey(), &bucket_num);
2972                 StoredValue *v = store->fetchValidValue(vbucket,
2973                                                         queuedItem->getKey(),
2974                                                         bucket_num, true,
2975                                                         false);
2976                 if (v) {
2977                     std::stringstream ss;
2978                     ss << "Persisting ``" << queuedItem->getKey() << "'' on vb"
2979                        << queuedItem->getVBucketId() << " (rowid="
2980                        << v->getBySeqno() << ") returned 0 updates\n";
2981                     LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2982                 } else {
2983                     LOG(EXTENSION_LOG_WARNING,
2984                         "Error persisting now missing ``%s'' from vb%d",
2985                         queuedItem->getKey().c_str(),
2986                         queuedItem->getVBucketId());
2987                 }
2988
2989                 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2990                 stats->decrDiskQueueSize(1);
2991             } else {
2992                 std::stringstream ss;
2993                 ss <<
2994                 "Fatal error in persisting SET ``" <<
2995                 queuedItem->getKey() << "'' on vb "
2996                    << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2997                 LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2998                 redirty();
2999             }
3000         }
3001     }
3002
3003     // This callback is invoked for deletions only.
3004     //
3005     // The boolean indicates whether the underlying storage
3006     // successfully deleted the item.
3007     void callback(int &value) {
3008         // > 1 would be bad.  We were only trying to delete one row.
3009         cb_assert(value < 2);
3010         // -1 means fail
3011         // 1 means we deleted one row
3012         // 0 means we did not delete a row, but did not fail (did not exist)
3013         if (value >= 0) {
3014             // We have succesfully removed an item from the disk, we
3015             // may now remove it from the hash table.
3016             int bucket_num(0);
3017             LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
3018                                                         &bucket_num);
3019             StoredValue *v = store->fetchValidValue(vbucket,
3020                                                     queuedItem->getKey(),
3021                                                     bucket_num, true, false);
3022             if (v && v->isDeleted()) {
3023                 bool newCacheItem = v->isNewCacheItem();
3024                 bool deleted = vbucket->ht.unlocked_del(queuedItem->getKey(),
3025                                                         bucket_num);
3026                 cb_assert(deleted);
3027                 if (newCacheItem && value > 0) {
3028                     // Need to decrement the item counter again for an item that
3029                     // exists on DB file, but not in memory (i.e., full eviction),
3030                     // because we created the temp item in memory and incremented
3031                     // the item counter when a deletion is pushed in the queue.
3032                     vbucket->ht.decrNumTotalItems();
3033                 }
3034             }
3035
3036             /**
3037              * Deleted items are to be added to the bloomfilter,
3038              * in either eviction policy.
3039              */
3040             vbucket->addToFilter(queuedItem->getKey());
3041
3042             if (value > 0) {
3043                 ++stats->totalPersisted;
3044                 ++vbucket->opsDelete;
3045             }
3046             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
3047             stats->decrDiskQueueSize(1);
3048             vbucket->decrMetaDataDisk(*queuedItem);
3049         } else {
3050             std::stringstream ss;
3051             ss << "Fatal error in persisting DELETE ``" <<
3052             queuedItem->getKey() << "'' on vb "
3053                << queuedItem->getVBucketId() << "!!! Requeue it...\n";
3054             LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
3055             redirty();
3056         }
3057     }
3058
3059 private:
3060
3061     void redirty() {
3062         if (store->vbMap.isBucketDeletion(vbucket->getId())) {
3063             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
3064             stats->decrDiskQueueSize(1);
3065             return;
3066         }
3067         ++stats->flushFailed;
3068         store->invokeOnLockedStoredValue(queuedItem->getKey(),
3069                                          queuedItem->getVBucketId(),
3070                                          &StoredValue::reDirty);
3071         vbucket->rejectQueue.push(queuedItem);
3072     }
3073
3074     const queued_item queuedItem;
3075     RCPtr<VBucket> &vbucket;
3076     EventuallyPersistentStore *store;
3077     EPStats *stats;
3078     uint64_t cas;
3079     DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
3080 };
3081
3082 bool EventuallyPersistentStore::scheduleFlushAllTask(const void* cookie,
3083                                                      time_t when) {
3084     bool inverse = false;
3085     if (diskFlushAll.compare_exchange_strong(inverse, true)) {
3086         flushAllTaskCtx.cookie = cookie;
3087         flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
3088         ExTask task = new FlushAllTask(&engine, static_cast<double>(when));
3089         ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
3090         return true;
3091     } else {
3092         return false;
3093     }
3094 }
3095
3096 void EventuallyPersistentStore::setFlushAllComplete() {
3097     // Notify memcached about flushAll task completion, and
3098     // set diskFlushall flag to false
3099     if (flushAllTaskCtx.cookie) {
3100         engine.notifyIOComplete(flushAllTaskCtx.cookie, ENGINE_SUCCESS);
3101     }
3102     bool inverse = false;
3103     flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
3104     inverse = true;
3105     diskFlushAll.compare_exchange_strong(inverse, false);
3106 }
3107
3108 void EventuallyPersistentStore::flushOneDeleteAll() {
3109     for (size_t i = 0; i < vbMap.getSize(); ++i) {
3110         RCPtr<VBucket> vb = getVBucket(i);
3111         if (vb) {
3112             LockHolder lh(vb_mutexes[vb->getId()]);
3113             getRWUnderlying(vb->getId())->reset(i);
3114         }
3115     }
3116
3117     stats.decrDiskQueueSize(1);
3118     setFlushAllComplete();
3119 }
3120
3121 int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
3122     KVShard *shard = vbMap.getShard(vbid);
3123     if (diskFlushAll && !flushAllTaskCtx.delayFlushAll) {
3124         if (shard->getId() == EP_PRIMARY_SHARD) {
3125             flushOneDeleteAll();
3126         } else {
3127             // disk flush is pending just return
3128             return 0;
3129         }
3130     }
3131
3132     if (vbMap.isBucketCreation(vbid)) {
3133         return RETRY_FLUSH_VBUCKET;
3134     }
3135
3136     int items_flushed = 0;
3137     rel_time_t flush_start = ep_current_time();
3138
3139     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3140     if (vb) {
3141         LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
3142         if (!lh.islocked()) { // Try another bucket if this one is locked
3143             return RETRY_FLUSH_VBUCKET; // to avoid blocking flusher
3144         }
3145
3146         KVStatsCallback cb(this);
3147         std::vector<queued_item> items;
3148         KVStore *rwUnderlying = getRWUnderlying(vbid);
3149
3150         while (!vb->rejectQueue.empty()) {
3151             items.push_back(vb->rejectQueue.front());
3152             vb->rejectQueue.pop();
3153         }
3154
3155         const std::string cursor(CheckpointManager::pCursorName);
3156         vb->getBackfillItems(items);
3157
3158         snapshot_range_t range;
3159         range = vb->checkpointManager.getAllItemsForCursor(cursor, items);
3160
3161         if (!items.empty()) {
3162             while (!rwUnderlying->begin()) {
3163                 ++stats.beginFailed;
3164                 LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
3165                     "Retry in 1 sec ...");
3166                 sleep(1);
3167             }
3168             rwUnderlying->optimizeWrites(items);
3169
3170             Item *prev = NULL;
3171             uint64_t maxSeqno = 0;
3172             uint64_t maxCas = 0;
3173             std::list<PersistenceCallback*> pcbs;
3174             std::vector<queued_item>::iterator it = items.begin();
3175             for(; it != items.end(); ++it) {
3176                 if ((*it)->getOperation() != queue_op_set &&
3177                     (*it)->getOperation() != queue_op_del) {
3178                     continue;
3179                 } else if (!prev || prev->getKey() != (*it)->getKey()) {
3180                     prev = (*it).get();
3181                     ++items_flushed;
3182                     PersistenceCallback *cb = flushOneDelOrSet(*it, vb);
3183                     if (cb) {
3184                         pcbs.push_back(cb);
3185                     }
3186
3187                     maxSeqno = std::max(maxSeqno, (uint64_t)(*it)->getBySeqno());
3188                     maxCas = std::max(maxCas, (uint64_t)(*it)->getCas());
3189                     ++stats.flusher_todo;
3190                 } else {
3191                     stats.decrDiskQueueSize(1);
3192                     vb->doStatsForFlushing(*(*it), (*it)->size());
3193                 }
3194             }
3195
3196             BlockTimer timer(&stats.diskCommitHisto, "disk_commit",
3197                              stats.timingLog);
3198             hrtime_t start = gethrtime();
3199
3200             if (vb->getState() == vbucket_state_active) {
3201                 range.start = maxSeqno;
3202                 range.end = maxSeqno;
3203             }
3204
3205             while (!rwUnderlying->commit(&cb, range.start, range.end, maxCas,
3206                                          vb->getDriftCounter())) {
3207                 ++stats.commitFailed;
3208                 LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
3209                     "1 sec...\n");
3210                 sleep(1);
3211
3212             }
3213
3214             if (vb->rejectQueue.empty()) {
3215                 vb->setPersistedSnapshot(range.start, range.end);
3216                 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
3217                 if (highSeqno > 0 &&
3218                     highSeqno != vbMap.getPersistenceSeqno(vbid)) {
3219                     vbMap.setPersistenceSeqno(vbid, highSeqno);
3220                     vb->notifySeqnoPersisted(highSeqno);
3221                 }
3222             }
3223
3224             while (!pcbs.empty()) {
3225                 delete pcbs.front();
3226                 pcbs.pop_front();
3227             }
3228
3229             ++stats.flusherCommits;
3230             hrtime_t end = gethrtime();
3231             uint64_t commit_time = (end - start) / 1000000;
3232             uint64_t trans_time = (end - flush_start) / 1000000;
3233
3234             lastTransTimePerItem.store((items_flushed == 0) ? 0 :
3235                                        static_cast<double>(trans_time) /
3236                                        static_cast<double>(items_flushed));
3237             stats.commit_time.store(commit_time);
3238             stats.cumulativeCommitTime.fetch_add(commit_time);
3239             stats.cumulativeFlushTime.fetch_add(ep_current_time()
3240                                                 - flush_start);
3241             stats.flusher_todo.store(0);
3242         }
3243
3244         rwUnderlying->pendingTasks();
3245
3246         if (vb->checkpointManager.getNumCheckpoints() > 1) {
3247             wakeUpCheckpointRemover();
3248         }
3249
3250         if (vb->rejectQueue.empty()) {
3251             vb->checkpointManager.itemsPersisted();
3252             uint64_t seqno = vbMap.getPersistenceSeqno(vbid);
3253             uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
3254             vb->notifyOnPersistence(engine, seqno, true);
3255             vb->notifyOnPersistence(engine, chkid, false);
3256             if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
3257                 vbMap.setPersistenceCheckpointId(vbid, chkid);
3258             }
3259         } else {
3260             return RETRY_FLUSH_VBUCKET;
3261         }
3262     }
3263
3264     return items_flushed;
3265 }
3266
3267 PersistenceCallback*
3268 EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
3269                                             RCPtr<VBucket> &vb) {
3270
3271     if (!vb) {
3272         stats.decrDiskQueueSize(1);
3273         return NULL;
3274     }
3275
3276     int64_t bySeqno = qi->getBySeqno();
3277     bool deleted = qi->isDeleted();
3278     rel_time_t queued(qi->getQueuedTime());
3279
3280     int dirtyAge = ep_current_time() - queued;
3281     stats.dirtyAgeHisto.add(dirtyAge * 1000000);
3282     stats.dirtyAge.store(dirtyAge);
3283     stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
3284                                          stats.dirtyAgeHighWat.load()));
3285
3286     // Wait until the vbucket database is created by the vbucket state
3287     // snapshot task.
3288     if (vbMap.isBucketCreation(qi->getVBucketId()) ||
3289         vbMap.isBucketDeletion(qi->getVBucketId())) {
3290         vb->rejectQueue.push(qi);
3291         ++vb->opsReject;
3292         return NULL;
3293     }
3294
3295     KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
3296     if (!deleted) {
3297         // TODO: Need to separate disk_insert from disk_update because
3298         // bySeqno doesn't give us that information.
3299         BlockTimer timer(bySeqno == -1 ?
3300                          &stats.diskInsertHisto : &stats.diskUpdateHisto,
3301                          bySeqno == -1 ? "disk_insert" : "disk_update",
3302                          stats.timingLog);
3303         PersistenceCallback *cb =
3304             new PersistenceCallback(qi, vb, this, &stats, qi->getCas());
3305         rwUnderlying->set(*qi, *cb);
3306         return cb;
3307     } else {
3308         BlockTimer timer(&stats.diskDelHisto, "disk_delete",
3309                          stats.timingLog);
3310         PersistenceCallback *cb =
3311             new PersistenceCallback(qi, vb, this, &stats, 0);
3312         rwUnderlying->del(*qi, *cb);
3313         return cb;
3314     }
3315 }
3316
3317 void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
3318                                            StoredValue* v,
3319                                            LockHolder *plh,
3320                                            uint64_t *seqno,
3321                                            bool tapBackfill,
3322                                            bool notifyReplicator,
3323                                            bool genBySeqno,
3324                                            bool setConflictMode) {
3325     if (vb) {
3326         if (setConflictMode && (v->getConflictResMode() != last_write_wins) &&
3327             vb->isTimeSyncEnabled()) {
3328             v->setConflictResMode(last_write_wins);
3329         }
3330
3331         queued_item qi(v->toItem(false, vb->getId()));
3332
3333         bool rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
3334                                 vb->checkpointManager.queueDirty(vb, qi,
3335                                                                  genBySeqno);
3336         v->setBySeqno(qi->getBySeqno());
3337
3338         if (seqno) {
3339             *seqno = v->getBySeqno();
3340         }
3341
3342         if (plh) {
3343             plh->unlock();
3344         }
3345
3346         if (rv) {
3347             KVShard* shard = vbMap.getShard(vb->getId());
3348             shard->getFlusher()->notifyFlushEvent();
3349
3350         }
3351         if (!tapBackfill && notifyReplicator) {
3352             engine.getTapConnMap().notifyVBConnections(vb->getId());
3353             engine.getDcpConnMap().notifyVBConnections(vb->getId(),
3354                                                        qi->getBySeqno());
3355         }
3356     }
3357 }
3358
3359 std::vector<vbucket_state *> EventuallyPersistentStore::loadVBucketState()
3360 {
3361     return getOneROUnderlying()->listPersistedVbuckets();
3362 }
3363
3364 void EventuallyPersistentStore::warmupCompleted() {
3365     // Run the vbucket state snapshot job once after the warmup
3366     scheduleVBSnapshot(VBSnapshotTask::Priority::HIGH);
3367
3368     if (engine.getConfiguration().getAlogPath().length() > 0) {
3369
3370         if (engine.getConfiguration().isAccessScannerEnabled()) {
3371             LockHolder lh(accessScanner.mutex);
3372             accessScanner.enabled = true;
3373             lh.unlock();
3374             LOG(EXTENSION_LOG_WARNING, "Access Scanner task enabled");
3375             size_t smin = engine.getConfiguration().getAlogSleepTime();
3376             setAccessScannerSleeptime(smin);
3377         } else {
3378             LockHolder lh(accessScanner.mutex);
3379             accessScanner.enabled = false;
3380             LOG(EXTENSION_LOG_WARNING, "Access Scanner task disabled");
3381         }