MB-20105: Ensure purge_seq is not reset when no items are purged in a compaction
[ep-engine.git] / src / ep.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <string.h>
21 #include <time.h>
22
23 #include <fstream>
24 #include <functional>
25 #include <iostream>
26 #include <map>
27 #include <sstream>
28 #include <string>
29 #include <utility>
30 #include <vector>
31
32 #include "access_scanner.h"
33 #include "checkpoint_remover.h"
34 #include "conflict_resolution.h"
35 #include "ep.h"
36 #include "ep_engine.h"
37 #include "failover-table.h"
38 #include "flusher.h"
39 #include "htresizer.h"
40 #include "kvshard.h"
41 #include "kvstore.h"
42 #include "locks.h"
43 #include "mutation_log.h"
44 #include "warmup.h"
45 #include "connmap.h"
46 #include "tapthrottle.h"
47
48 class StatsValueChangeListener : public ValueChangedListener {
49 public:
50     StatsValueChangeListener(EPStats &st) : stats(st) {
51         // EMPTY
52     }
53
54     virtual void sizeValueChanged(const std::string &key, size_t value) {
55         if (key.compare("max_size") == 0) {
56             stats.setMaxDataSize(value);
57             size_t low_wat = static_cast<size_t>
58                              (static_cast<double>(value) * 0.6);
59             size_t high_wat = static_cast<size_t>(
60                               static_cast<double>(value) * 0.75);
61             stats.mem_low_wat.store(low_wat);
62             stats.mem_high_wat.store(high_wat);
63         } else if (key.compare("mem_low_wat") == 0) {
64             stats.mem_low_wat.store(value);
65         } else if (key.compare("mem_high_wat") == 0) {
66             stats.mem_high_wat.store(value);
67         } else if (key.compare("tap_throttle_threshold") == 0) {
68             stats.tapThrottleThreshold.store(
69                                           static_cast<double>(value) / 100.0);
70         } else if (key.compare("warmup_min_memory_threshold") == 0) {
71             stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
72         } else if (key.compare("warmup_min_items_threshold") == 0) {
73             stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
74         } else {
75             LOG(EXTENSION_LOG_WARNING,
76                 "Failed to change value for unknown variable, %s\n",
77                 key.c_str());
78         }
79     }
80
81 private:
82     EPStats &stats;
83 };
84
85 /**
86  * A configuration value changed listener that responds to ep-engine
87  * parameter changes by invoking engine-specific methods on
88  * configuration change events.
89  */
90 class EPStoreValueChangeListener : public ValueChangedListener {
91 public:
92     EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
93     }
94
95     virtual void sizeValueChanged(const std::string &key, size_t value) {
96         if (key.compare("bg_fetch_delay") == 0) {
97             store.setBGFetchDelay(static_cast<uint32_t>(value));
98         } else if (key.compare("compaction_write_queue_cap") == 0) {
99             store.setCompactionWriteQueueCap(value);
100         } else if (key.compare("exp_pager_stime") == 0) {
101             store.setExpiryPagerSleeptime(value);
102         } else if (key.compare("alog_sleep_time") == 0) {
103             store.setAccessScannerSleeptime(value);
104         } else if (key.compare("alog_task_time") == 0) {
105             store.resetAccessScannerStartTime();
106         } else if (key.compare("mutation_mem_threshold") == 0) {
107             double mem_threshold = static_cast<double>(value) / 100;
108             StoredValue::setMutationMemoryThreshold(mem_threshold);
109         } else if (key.compare("backfill_mem_threshold") == 0) {
110             double backfill_threshold = static_cast<double>(value) / 100;
111             store.setBackfillMemoryThreshold(backfill_threshold);
112         } else if (key.compare("compaction_exp_mem_threshold") == 0) {
113             store.setCompactionExpMemThreshold(value);
114         } else if (key.compare("tap_throttle_queue_cap") == 0) {
115             store.getEPEngine().getTapThrottle().setQueueCap(value);
116         } else if (key.compare("tap_throttle_cap_pcnt") == 0) {
117             store.getEPEngine().getTapThrottle().setCapPercent(value);
118         } else {
119             LOG(EXTENSION_LOG_WARNING,
120                 "Failed to change value for unknown variable, %s\n",
121                 key.c_str());
122         }
123     }
124
125     virtual void booleanValueChanged(const std::string &key, bool value) {
126         if (key.compare("access_scanner_enabled") == 0) {
127             if (value) {
128                 store.enableAccessScannerTask();
129             } else {
130                 store.disableAccessScannerTask();
131             }
132         }
133     }
134
135 private:
136     EventuallyPersistentStore &store;
137 };
138
139 class VBucketMemoryDeletionTask : public GlobalTask {
140 public:
141     VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
142                               RCPtr<VBucket> &vb, double delay) :
143                               GlobalTask(&eng,
144                               Priority::VBMemoryDeletionPriority, delay, true),
145                               e(eng), vbucket(vb), vbid(vb->getId()) { }
146
147     std::string getDescription() {
148         std::stringstream ss;
149         ss << "Removing (dead) vbucket " << vbid << " from memory";
150         return ss.str();
151     }
152
153     bool run(void) {
154         vbucket->notifyAllPendingConnsFailed(e);
155         vbucket->ht.clear();
156         vbucket.reset();
157         return false;
158     }
159
160 private:
161     EventuallyPersistentEngine &e;
162     RCPtr<VBucket> vbucket;
163     uint16_t vbid;
164 };
165
166 class PendingOpsNotification : public GlobalTask {
167 public:
168     PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
169         GlobalTask(&e, Priority::VBMemoryDeletionPriority, 0, false),
170         engine(e), vbucket(vb) { }
171
172     std::string getDescription() {
173         std::stringstream ss;
174         ss << "Notify pending operations for vbucket " << vbucket->getId();
175         return ss.str();
176     }
177
178     bool run(void) {
179         vbucket->fireAllOps(engine);
180         return false;
181     }
182
183 private:
184     EventuallyPersistentEngine &engine;
185     RCPtr<VBucket> vbucket;
186 };
187
188 EventuallyPersistentStore::EventuallyPersistentStore(
189     EventuallyPersistentEngine &theEngine) :
190     engine(theEngine), stats(engine.getEpStats()),
191     vbMap(theEngine.getConfiguration(), *this),
192     bgFetchQueue(0),
193     diskFlushAll(false), bgFetchDelay(0), backfillMemoryThreshold(0.95),
194     statsSnapshotTaskId(0), lastTransTimePerItem(0)
195 {
196     cachedResidentRatio.activeRatio.store(0);
197     cachedResidentRatio.replicaRatio.store(0);
198
199     Configuration &config = engine.getConfiguration();
200     MutationLog *shardlog;
201     for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
202         std::stringstream s;
203         s << i;
204         shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
205                                  "." + s.str(),
206                                  engine.getConfiguration().getAlogBlockSize());
207         accessLog.push_back(shardlog);
208     }
209
210     storageProperties = new StorageProperties(true, true, true, true);
211
212     stats.schedulingHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
213     stats.taskRuntimeHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
214
215     for (size_t i = 0; i < MAX_TYPE_ID; i++) {
216         stats.schedulingHisto[i].reset();
217         stats.taskRuntimeHisto[i].reset();
218     }
219
220     ExecutorPool::get()->registerBucket(ObjectRegistry::getCurrentEngine());
221
222     size_t num_vbs = config.getMaxVbuckets();
223     vb_mutexes = new Mutex[num_vbs];
224     schedule_vbstate_persist = new AtomicValue<bool>[num_vbs];
225     for (size_t i = 0; i < num_vbs; ++i) {
226         schedule_vbstate_persist[i] = false;
227     }
228
229     stats.memOverhead = sizeof(EventuallyPersistentStore);
230
231     if (config.getConflictResolutionType().compare("seqno") == 0) {
232         conflictResolver = new SeqBasedResolution();
233     }
234
235     stats.setMaxDataSize(config.getMaxSize());
236     config.addValueChangedListener("max_size",
237                                    new StatsValueChangeListener(stats));
238
239     stats.mem_low_wat.store(config.getMemLowWat());
240     config.addValueChangedListener("mem_low_wat",
241                                    new StatsValueChangeListener(stats));
242
243     stats.mem_high_wat.store(config.getMemHighWat());
244     config.addValueChangedListener("mem_high_wat",
245                                    new StatsValueChangeListener(stats));
246
247     stats.tapThrottleThreshold.store(static_cast<double>
248                                     (config.getTapThrottleThreshold())
249                                      / 100.0);
250     config.addValueChangedListener("tap_throttle_threshold",
251                                    new StatsValueChangeListener(stats));
252
253     stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
254     config.addValueChangedListener("tap_throttle_queue_cap",
255                                    new EPStoreValueChangeListener(*this));
256     config.addValueChangedListener("tap_throttle_cap_pcnt",
257                                    new EPStoreValueChangeListener(*this));
258
259     setBGFetchDelay(config.getBgFetchDelay());
260     config.addValueChangedListener("bg_fetch_delay",
261                                    new EPStoreValueChangeListener(*this));
262
263     stats.warmupMemUsedCap.store(static_cast<double>
264                                (config.getWarmupMinMemoryThreshold()) / 100.0);
265     config.addValueChangedListener("warmup_min_memory_threshold",
266                                    new StatsValueChangeListener(stats));
267     stats.warmupNumReadCap.store(static_cast<double>
268                                 (config.getWarmupMinItemsThreshold()) / 100.0);
269     config.addValueChangedListener("warmup_min_items_threshold",
270                                    new StatsValueChangeListener(stats));
271
272     double mem_threshold = static_cast<double>
273                                       (config.getMutationMemThreshold()) / 100;
274     StoredValue::setMutationMemoryThreshold(mem_threshold);
275     config.addValueChangedListener("mutation_mem_threshold",
276                                    new EPStoreValueChangeListener(*this));
277
278     double backfill_threshold = static_cast<double>
279                                       (config.getBackfillMemThreshold()) / 100;
280     setBackfillMemoryThreshold(backfill_threshold);
281     config.addValueChangedListener("backfill_mem_threshold",
282                                    new EPStoreValueChangeListener(*this));
283
284     compactionExpMemThreshold = config.getCompactionExpMemThreshold();
285     config.addValueChangedListener("compaction_exp_mem_threshold",
286                                    new EPStoreValueChangeListener(*this));
287
288     compactionWriteQueueCap = config.getCompactionWriteQueueCap();
289     config.addValueChangedListener("compaction_write_queue_cap",
290                                    new EPStoreValueChangeListener(*this));
291
292     const std::string &policy = config.getItemEvictionPolicy();
293     if (policy.compare("value_only") == 0) {
294         eviction_policy = VALUE_ONLY;
295     } else {
296         eviction_policy = FULL_EVICTION;
297     }
298
299     warmupTask = new Warmup(this);
300 }
301
302 bool EventuallyPersistentStore::initialize() {
303     // We should nuke everything unless we want warmup
304     Configuration &config = engine.getConfiguration();
305     if (!config.isWarmup()) {
306         reset();
307     }
308
309     if (!startFlusher()) {
310         LOG(EXTENSION_LOG_WARNING,
311             "FATAL: Failed to create and start flushers");
312         return false;
313     }
314     if (!startBgFetcher()) {
315         LOG(EXTENSION_LOG_WARNING,
316            "FATAL: Failed to create and start bgfetchers");
317         return false;
318     }
319
320     warmupTask->start();
321
322     if (config.isFailpartialwarmup() && stats.warmOOM > 0) {
323         LOG(EXTENSION_LOG_WARNING,
324             "Warmup failed to load %d records due to OOM, exiting.\n",
325             static_cast<unsigned int>(stats.warmOOM));
326         return false;
327     }
328
329     itmpTask = new ItemPager(&engine, stats);
330     ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
331
332     size_t expiryPagerSleeptime = config.getExpPagerStime();
333     setExpiryPagerSleeptime(expiryPagerSleeptime);
334     config.addValueChangedListener("exp_pager_stime",
335                                    new EPStoreValueChangeListener(*this));
336
337     ExTask htrTask = new HashtableResizerTask(this, 10);
338     ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
339
340     size_t checkpointRemoverInterval = config.getChkRemoverStime();
341     chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
342                                                    checkpointRemoverInterval);
343     ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
344
345     ExTask vbSnapshotTask = new DaemonVBSnapshotTask(&engine);
346     ExecutorPool::get()->schedule(vbSnapshotTask, WRITER_TASK_IDX);
347
348     ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
349     ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
350
351     return true;
352 }
353
354 EventuallyPersistentStore::~EventuallyPersistentStore() {
355     stopWarmup();
356     stopBgFetcher();
357     ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX);
358
359     ExecutorPool::get()->cancel(statsSnapshotTaskId);
360     LockHolder lh(accessScanner.mutex);
361     ExecutorPool::get()->cancel(accessScanner.task);
362     lh.unlock();
363
364     stopFlusher();
365     ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine());
366
367     delete [] vb_mutexes;
368     delete [] schedule_vbstate_persist;
369     delete [] stats.schedulingHisto;
370     delete [] stats.taskRuntimeHisto;
371     delete conflictResolver;
372     delete warmupTask;
373     delete storageProperties;
374
375     std::vector<MutationLog*>::iterator it;
376     for (it = accessLog.begin(); it != accessLog.end(); it++) {
377         delete *it;
378     }
379 }
380
381 const Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
382     return vbMap.getShard(shardId)->getFlusher();
383 }
384
385 Warmup* EventuallyPersistentStore::getWarmup(void) const {
386     return warmupTask;
387 }
388
389 bool EventuallyPersistentStore::startFlusher() {
390     for (uint16_t i = 0; i < vbMap.numShards; ++i) {
391         Flusher *flusher = vbMap.shards[i]->getFlusher();
392         flusher->start();
393     }
394     return true;
395 }
396
397 void EventuallyPersistentStore::stopFlusher() {
398     for (uint16_t i = 0; i < vbMap.numShards; i++) {
399         Flusher *flusher = vbMap.shards[i]->getFlusher();
400         bool rv = flusher->stop(stats.forceShutdown);
401         if (rv && !stats.forceShutdown) {
402             flusher->wait();
403         }
404     }
405 }
406
407 bool EventuallyPersistentStore::pauseFlusher() {
408     bool rv = true;
409     for (uint16_t i = 0; i < vbMap.numShards; i++) {
410         Flusher *flusher = vbMap.shards[i]->getFlusher();
411         if (!flusher->pause()) {
412             LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
413                 "[%s], shard = %d", flusher->stateName(), i);
414             rv = false;
415         }
416     }
417     return rv;
418 }
419
420 bool EventuallyPersistentStore::resumeFlusher() {
421     bool rv = true;
422     for (uint16_t i = 0; i < vbMap.numShards; i++) {
423         Flusher *flusher = vbMap.shards[i]->getFlusher();
424         if (!flusher->resume()) {
425             LOG(EXTENSION_LOG_WARNING,
426                     "Warning: attempted to resume flusher in state [%s], "
427                     "shard = %d", flusher->stateName(), i);
428             rv = false;
429         }
430     }
431     return rv;
432 }
433
434 void EventuallyPersistentStore::wakeUpFlusher() {
435     if (stats.diskQueueSize.load() == 0) {
436         for (uint16_t i = 0; i < vbMap.numShards; i++) {
437             Flusher *flusher = vbMap.shards[i]->getFlusher();
438             flusher->wake();
439         }
440     }
441 }
442
443 bool EventuallyPersistentStore::startBgFetcher() {
444     for (uint16_t i = 0; i < vbMap.numShards; i++) {
445         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
446         if (bgfetcher == NULL) {
447             LOG(EXTENSION_LOG_WARNING,
448                 "Falied to start bg fetcher for shard %d", i);
449             return false;
450         }
451         bgfetcher->start();
452     }
453     return true;
454 }
455
456 void EventuallyPersistentStore::stopBgFetcher() {
457     for (uint16_t i = 0; i < vbMap.numShards; i++) {
458         BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
459         if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
460             LOG(EXTENSION_LOG_WARNING,
461                 "Shutting down engine while there are still pending data "
462                 "read for shard %d from database storage", i);
463         }
464         LOG(EXTENSION_LOG_INFO, "Stopping bg fetcher for underlying storage");
465         bgfetcher->stop();
466     }
467 }
468
469 void
470 EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
471                                              time_t startTime,
472                                              uint64_t revSeqno) {
473     RCPtr<VBucket> vb = getVBucket(vbid);
474     if (vb) {
475         // Obtain reader access to the VB state change lock so that
476         // the VB can't switch state whilst we're processing
477         ReaderLockHolder rlh(vb->getStateLock());
478         if (vb->getState() == vbucket_state_active) {
479             int bucket_num(0);
480             incExpirationStat(vb);
481             LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
482             StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
483             if (v) {
484                 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
485                     // This is a temporary item whose background fetch for metadata
486                     // has completed.
487                     bool deleted = vb->ht.unlocked_del(key, bucket_num);
488                     cb_assert(deleted);
489                 } else if (v->isExpired(startTime) && !v->isDeleted()) {
490                     vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
491                     queueDirty(vb, v, &lh, false);
492                 }
493             } else {
494                 if (eviction_policy == FULL_EVICTION) {
495                     // Create a temp item and delete and push it
496                     // into the checkpoint queue.
497                     add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
498                                                                 eviction_policy);
499                     if (rv == ADD_NOMEM) {
500                         return;
501                     }
502                     v = vb->ht.unlocked_find(key, bucket_num, true, false);
503                     v->setStoredValueState(StoredValue::state_deleted_key);
504                     v->setRevSeqno(revSeqno);
505                     vb->ht.unlocked_softDelete(v, 0, eviction_policy);
506                     queueDirty(vb, v, &lh, false);
507                 }
508             }
509         }
510     }
511 }
512
513 void
514 EventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
515                                                         std::string> > &keys) {
516     std::list<std::pair<uint16_t, std::string> >::iterator it;
517     time_t startTime = ep_real_time();
518     for (it = keys.begin(); it != keys.end(); it++) {
519         deleteExpiredItem(it->first, it->second, startTime, 0);
520     }
521 }
522
523 StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
524                                                         const std::string &key,
525                                                         int bucket_num,
526                                                         bool wantDeleted,
527                                                         bool trackReference,
528                                                         bool queueExpired) {
529     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
530                                           trackReference);
531     if (v && !v->isDeleted() && !v->isTempItem()) {
532         // In the deleted case, we ignore expiration time.
533         if (v->isExpired(ep_real_time())) {
534             if (vb->getState() != vbucket_state_active) {
535                 return wantDeleted ? v : NULL;
536             }
537
538             // queueDirty only allowed on active VB
539             if (queueExpired && vb->getState() == vbucket_state_active) {
540                 incExpirationStat(vb, false);
541                 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
542                 queueDirty(vb, v, NULL, false, true);
543             }
544             return wantDeleted ? v : NULL;
545         }
546     }
547     return v;
548 }
549
550 protocol_binary_response_status EventuallyPersistentStore::evictKey(
551                                                         const std::string &key,
552                                                         uint16_t vbucket,
553                                                         const char **msg,
554                                                         size_t *msg_size,
555                                                         bool force) {
556     RCPtr<VBucket> vb = getVBucket(vbucket);
557     if (!vb || (vb->getState() != vbucket_state_active && !force)) {
558         return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
559     }
560
561     int bucket_num(0);
562     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
563     StoredValue *v = fetchValidValue(vb, key, bucket_num, force, false);
564
565     protocol_binary_response_status rv(PROTOCOL_BINARY_RESPONSE_SUCCESS);
566
567     *msg_size = 0;
568     if (v) {
569         if (force)  {
570             v->markClean();
571         }
572         if (v->isResident()) {
573             if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
574                 *msg = "Ejected.";
575             } else {
576                 *msg = "Can't eject: Dirty object.";
577                 rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
578             }
579         } else {
580             *msg = "Already ejected.";
581         }
582     } else {
583         if (eviction_policy == VALUE_ONLY) {
584             *msg = "Not found.";
585             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
586         } else {
587             *msg = "Already ejected.";
588         }
589     }
590
591     return rv;
592 }
593
594 ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
595                                                         LockHolder &lock,
596                                                         int bucket_num,
597                                                         const std::string &key,
598                                                         RCPtr<VBucket> &vb,
599                                                         const void *cookie,
600                                                         bool metadataOnly,
601                                                         bool isReplication) {
602
603     add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
604                                                 eviction_policy,
605                                                 isReplication);
606     switch(rv) {
607         case ADD_NOMEM:
608             return ENGINE_ENOMEM;
609         case ADD_EXISTS:
610         case ADD_UNDEL:
611         case ADD_SUCCESS:
612         case ADD_TMP_AND_BG_FETCH:
613             // Since the hashtable bucket is locked, we shouldn't get here
614             abort();
615         case ADD_BG_FETCH:
616             lock.unlock();
617             bgFetch(key, vb->getId(), -1, cookie, metadataOnly);
618     }
619     return ENGINE_EWOULDBLOCK;
620 }
621
622 ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
623                                                  const void *cookie,
624                                                  bool force,
625                                                  uint8_t nru) {
626
627     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
628     if (!vb) {
629         ++stats.numNotMyVBuckets;
630         return ENGINE_NOT_MY_VBUCKET;
631     }
632
633     // Obtain read-lock on VB state to ensure VB state changes are interlocked
634     // with this set
635     ReaderLockHolder rlh(vb->getStateLock());
636     if (vb->getState() == vbucket_state_dead) {
637         ++stats.numNotMyVBuckets;
638         return ENGINE_NOT_MY_VBUCKET;
639     } else if (vb->getState() == vbucket_state_replica && !force) {
640         ++stats.numNotMyVBuckets;
641         return ENGINE_NOT_MY_VBUCKET;
642     } else if (vb->getState() == vbucket_state_pending && !force) {
643         if (vb->addPendingOp(cookie)) {
644             return ENGINE_EWOULDBLOCK;
645         }
646     }
647
648     bool cas_op = (itm.getCas() != 0);
649     int bucket_num(0);
650     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
651     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
652                                           false);
653     if (v && v->isLocked(ep_current_time()) &&
654         (vb->getState() == vbucket_state_replica ||
655          vb->getState() == vbucket_state_pending)) {
656         v->unlock();
657     }
658     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(),
659                                                 true, false,
660                                                 eviction_policy, nru);
661
662     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
663     switch (mtype) {
664     case NOMEM:
665         ret = ENGINE_ENOMEM;
666         break;
667     case INVALID_CAS:
668     case IS_LOCKED:
669         ret = ENGINE_KEY_EEXISTS;
670         break;
671     case NOT_FOUND:
672         if (cas_op) {
673             ret = ENGINE_KEY_ENOENT;
674             break;
675         }
676         // FALLTHROUGH
677     case WAS_DIRTY:
678         // Even if the item was dirty, push it into the vbucket's open
679         // checkpoint.
680     case WAS_CLEAN:
681         queueDirty(vb, v, &lh);
682         break;
683     case NEED_BG_FETCH:
684         { // CAS operation with non-resident item + full eviction.
685             if (v) {
686                 // temp item is already created. Simply schedule a bg fetch job
687                 lh.unlock();
688                 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
689                 return ENGINE_EWOULDBLOCK;
690             }
691             ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
692                                         cookie, true);
693             break;
694         }
695     case INVALID_VBUCKET:
696         ret = ENGINE_NOT_MY_VBUCKET;
697         break;
698     }
699
700     return ret;
701 }
702
703 ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
704                                                  const void *cookie)
705 {
706     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
707     if (!vb) {
708         ++stats.numNotMyVBuckets;
709         return ENGINE_NOT_MY_VBUCKET;
710     }
711
712     // Obtain read-lock on VB state to ensure VB state changes are interlocked
713     // with this add
714     ReaderLockHolder rlh(vb->getStateLock());
715     if (vb->getState() == vbucket_state_dead ||
716         vb->getState() == vbucket_state_replica) {
717         ++stats.numNotMyVBuckets;
718         return ENGINE_NOT_MY_VBUCKET;
719     } else if(vb->getState() == vbucket_state_pending) {
720         if (vb->addPendingOp(cookie)) {
721             return ENGINE_EWOULDBLOCK;
722         }
723     }
724
725     if (itm.getCas() != 0) {
726         // Adding with a cas value doesn't make sense..
727         return ENGINE_NOT_STORED;
728     }
729
730     int bucket_num(0);
731     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
732     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
733                                           false);
734     add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
735                                            eviction_policy);
736
737     switch (atype) {
738     case ADD_NOMEM:
739         return ENGINE_ENOMEM;
740     case ADD_EXISTS:
741         return ENGINE_NOT_STORED;
742     case ADD_TMP_AND_BG_FETCH:
743         return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
744                                      cookie, true);
745     case ADD_BG_FETCH:
746         lh.unlock();
747         bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
748         return ENGINE_EWOULDBLOCK;
749     case ADD_SUCCESS:
750     case ADD_UNDEL:
751         queueDirty(vb, v, &lh);
752         break;
753     }
754     return ENGINE_SUCCESS;
755 }
756
757 ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
758                                                      const void *cookie) {
759     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
760     if (!vb) {
761         ++stats.numNotMyVBuckets;
762         return ENGINE_NOT_MY_VBUCKET;
763     }
764
765     // Obtain read-lock on VB state to ensure VB state changes are interlocked
766     // with this replace
767     ReaderLockHolder rlh(vb->getStateLock());
768     if (vb->getState() == vbucket_state_dead ||
769         vb->getState() == vbucket_state_replica) {
770         ++stats.numNotMyVBuckets;
771         return ENGINE_NOT_MY_VBUCKET;
772     } else if (vb->getState() == vbucket_state_pending) {
773         if (vb->addPendingOp(cookie)) {
774             return ENGINE_EWOULDBLOCK;
775         }
776     }
777
778     int bucket_num(0);
779     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
780     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
781                                           false);
782     if (v) {
783         if (v->isDeleted() || v->isTempDeletedItem() ||
784             v->isTempNonExistentItem()) {
785             return ENGINE_KEY_ENOENT;
786         }
787
788         mutation_type_t mtype;
789         if (eviction_policy == FULL_EVICTION && v->isTempInitialItem()) {
790             mtype = NEED_BG_FETCH;
791         } else {
792             mtype = vb->ht.unlocked_set(v, itm, 0, true, false, eviction_policy,
793                                         0xff);
794         }
795
796         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
797         switch (mtype) {
798             case NOMEM:
799                 ret = ENGINE_ENOMEM;
800                 break;
801             case IS_LOCKED:
802                 ret = ENGINE_KEY_EEXISTS;
803                 break;
804             case INVALID_CAS:
805             case NOT_FOUND:
806                 ret = ENGINE_NOT_STORED;
807                 break;
808                 // FALLTHROUGH
809             case WAS_DIRTY:
810                 // Even if the item was dirty, push it into the vbucket's open
811                 // checkpoint.
812             case WAS_CLEAN:
813                 queueDirty(vb, v, &lh);
814                 break;
815             case NEED_BG_FETCH:
816             {
817                 // temp item is already created. Simply schedule a bg fetch job
818                 lh.unlock();
819                 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
820                 ret = ENGINE_EWOULDBLOCK;
821                 break;
822             }
823             case INVALID_VBUCKET:
824                 ret = ENGINE_NOT_MY_VBUCKET;
825                 break;
826         }
827
828         return ret;
829     } else {
830         if (eviction_policy == VALUE_ONLY) {
831             return ENGINE_KEY_ENOENT;
832         }
833
834         return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
835                                      cookie, false);
836     }
837 }
838
839 ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
840                                                                 uint8_t nru,
841                                                                 bool genBySeqno) {
842
843     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
844     if (!vb) {
845         ++stats.numNotMyVBuckets;
846         return ENGINE_NOT_MY_VBUCKET;
847     }
848
849     // Obtain read-lock on VB state to ensure VB state changes are interlocked
850     // with this add-tapbackfill
851     ReaderLockHolder rlh(vb->getStateLock());
852     if (vb->getState() == vbucket_state_dead ||
853         vb->getState() == vbucket_state_active) {
854         ++stats.numNotMyVBuckets;
855         return ENGINE_NOT_MY_VBUCKET;
856     }
857
858     int bucket_num(0);
859     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
860     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
861                                           false);
862
863     // Note that this function is only called on replica or pending vbuckets.
864     if (v && v->isLocked(ep_current_time())) {
865         v->unlock();
866     }
867     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, 0, true, true,
868                                                 eviction_policy, nru);
869
870     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
871     switch (mtype) {
872     case NOMEM:
873         ret = ENGINE_ENOMEM;
874         break;
875     case INVALID_CAS:
876     case IS_LOCKED:
877         ret = ENGINE_KEY_EEXISTS;
878         break;
879     case WAS_DIRTY:
880         // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
881         // set correctly, and also the sequence numbers are ordered correctly.
882         // (MB-14003)
883     case NOT_FOUND:
884         // FALLTHROUGH
885     case WAS_CLEAN:
886         queueDirty(vb, v, &lh, true, true, genBySeqno);
887         break;
888     case INVALID_VBUCKET:
889         ret = ENGINE_NOT_MY_VBUCKET;
890         break;
891     case NEED_BG_FETCH:
892         // SET on a non-active vbucket should not require a bg_metadata_fetch.
893         abort();
894     }
895
896     return ret;
897 }
898
899 class KVStatsCallback : public Callback<kvstats_ctx> {
900     public:
901         KVStatsCallback(EventuallyPersistentStore *store)
902             : epstore(store) { }
903
904         void callback(kvstats_ctx &ctx) {
905             RCPtr<VBucket> vb = epstore->getVBucket(ctx.vbucket);
906             if (vb) {
907                 vb->fileSpaceUsed = ctx.fileSpaceUsed;
908                 vb->fileSize = ctx.fileSize;
909             }
910         }
911
912     private:
913         EventuallyPersistentStore *epstore;
914 };
915
916 void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
917                                                  uint16_t shardId) {
918
919     class VBucketStateVisitor : public VBucketVisitor {
920     public:
921         VBucketStateVisitor(VBucketMap &vb_map, uint16_t sid)
922             : vbuckets(vb_map), shardId(sid) { }
923         bool visitBucket(RCPtr<VBucket> &vb) {
924             if (vbuckets.getShard(vb->getId())->getId() == shardId) {
925                 uint64_t snapStart = 0;
926                 uint64_t snapEnd = 0;
927                 std::string failovers = vb->failovers->toJSON();
928                 uint64_t chkId = vbuckets.getPersistenceCheckpointId(vb->getId());
929
930                 vb->getCurrentSnapshot(snapStart, snapEnd);
931                 vbucket_state vb_state(vb->getState(), chkId, 0,
932                                        vb->getHighSeqno(), vb->getPurgeSeqno(),
933                                        snapStart, snapEnd, failovers);
934                 states.insert(std::pair<uint16_t, vbucket_state>(vb->getId(), vb_state));
935             }
936             return false;
937         }
938
939         void visit(StoredValue*) {
940             cb_assert(false); // this does not happen
941         }
942
943         std::map<uint16_t, vbucket_state> states;
944
945     private:
946         VBucketMap &vbuckets;
947         uint16_t shardId;
948     };
949
950     KVShard *shard = vbMap.shards[shardId];
951     if (priority == Priority::VBucketPersistLowPriority) {
952         shard->setLowPriorityVbSnapshotFlag(false);
953     } else {
954         shard->setHighPriorityVbSnapshotFlag(false);
955     }
956
957     KVStatsCallback kvcb(this);
958     VBucketStateVisitor v(vbMap, shard->getId());
959     visit(v);
960     hrtime_t start = gethrtime();
961
962     bool success = true;
963     vbucket_map_t::reverse_iterator iter = v.states.rbegin();
964     for (; iter != v.states.rend(); ++iter) {
965         LockHolder lh(vb_mutexes[iter->first], true /*tryLock*/);
966         if (!lh.islocked()) {
967             continue;
968         }
969         KVStore *rwUnderlying = getRWUnderlying(iter->first);
970         if (!rwUnderlying->snapshotVBucket(iter->first, iter->second,
971                     &kvcb)) {
972             LOG(EXTENSION_LOG_WARNING,
973                     "VBucket snapshot task failed!!! Rescheduling");
974             success = false;
975             break;
976         }
977
978         if (priority == Priority::VBucketPersistHighPriority) {
979             if (vbMap.setBucketCreation(iter->first, false)) {
980                 LOG(EXTENSION_LOG_INFO, "VBucket %d created", iter->first);
981             }
982         }
983     }
984
985     if (!success) {
986         scheduleVBSnapshot(priority, shard->getId());
987     } else {
988         stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
989     }
990 }
991
992 bool EventuallyPersistentStore::persistVBState(const Priority &priority,
993                                                uint16_t vbid) {
994     schedule_vbstate_persist[vbid] = false;
995
996     RCPtr<VBucket> vb = getVBucket(vbid);
997     if (!vb) {
998         LOG(EXTENSION_LOG_WARNING,
999             "VBucket %d not exist!!! vb_state persistence task failed!!!", vbid);
1000         return false;
1001     }
1002
1003     bool inverse = false;
1004     LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1005     if (!lh.islocked()) {
1006         if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
1007                                                                    true)) {
1008             return true;
1009         } else {
1010             return false;
1011         }
1012     }
1013
1014     KVStatsCallback kvcb(this);
1015     uint64_t chkId = vbMap.getPersistenceCheckpointId(vbid);
1016     std::string failovers = vb->failovers->toJSON();
1017     uint64_t snapStart = 0;
1018     uint64_t snapEnd = 0;
1019
1020     vb->getCurrentSnapshot(snapStart, snapEnd);
1021     vbucket_state vb_state(vb->getState(), chkId, 0, vb->getHighSeqno(),
1022                            vb->getPurgeSeqno(), snapStart, snapEnd, failovers);
1023
1024     KVStore *rwUnderlying = getRWUnderlying(vbid);
1025     if (rwUnderlying->snapshotVBucket(vbid, vb_state, &kvcb)) {
1026         if (vbMap.setBucketCreation(vbid, false)) {
1027             LOG(EXTENSION_LOG_INFO, "VBucket %d created", vbid);
1028         }
1029     } else {
1030         LOG(EXTENSION_LOG_WARNING,
1031             "VBucket %d: vb_state persistence task failed!!! Rescheduling", vbid);
1032
1033         if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
1034                                                                    true)) {
1035             return true;
1036         } else {
1037             return false;
1038         }
1039     }
1040     return false;
1041 }
1042
1043 ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
1044                                                            vbucket_state_t to,
1045                                                            bool transfer,
1046                                                            bool notify_dcp) {
1047     // Lock to prevent a race condition between a failed update and add.
1048     LockHolder lh(vbsetMutex);
1049     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1050     if (vb && to == vb->getState()) {
1051         return ENGINE_SUCCESS;
1052     }
1053
1054     if (vb) {
1055         vbucket_state_t oldstate = vb->getState();
1056         if (oldstate != to && notify_dcp) {
1057             engine.getDcpConnMap().vbucketStateChanged(vbid, to);
1058         }
1059
1060         vb->setState(to, engine.getServerApi());
1061         if (to == vbucket_state_active && !transfer) {
1062             uint64_t snapStart = 0;
1063             uint64_t snapEnd = 0;
1064             vb->getCurrentSnapshot(snapStart, snapEnd);
1065             if (snapEnd == vbMap.getPersistenceSeqno(vbid)) {
1066                 vb->failovers->createEntry(snapEnd);
1067             } else {
1068                 vb->failovers->createEntry(snapStart);
1069             }
1070         }
1071
1072         lh.unlock();
1073         if (oldstate == vbucket_state_pending &&
1074             to == vbucket_state_active) {
1075             ExTask notifyTask = new PendingOpsNotification(engine, vb);
1076             ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
1077         }
1078         scheduleVBStatePersist(Priority::VBucketPersistLowPriority, vbid);
1079     } else {
1080         FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
1081         RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
1082                                          engine.getCheckpointConfig(),
1083                                          vbMap.getShard(vbid), 0, 0, 0, ft));
1084         // The first checkpoint for active vbucket should start with id 2.
1085         uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
1086         newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
1087         if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
1088             lh.unlock();
1089             return ENGINE_ERANGE;
1090         }
1091         vbMap.setPersistenceCheckpointId(vbid, 0);
1092         vbMap.setPersistenceSeqno(vbid, 0);
1093         vbMap.setBucketCreation(vbid, true);
1094         lh.unlock();
1095         scheduleVBStatePersist(Priority::VBucketPersistHighPriority, vbid);
1096     }
1097     return ENGINE_SUCCESS;
1098 }
1099
1100 bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
1101     KVShard *shard = NULL;
1102     if (p == Priority::VBucketPersistHighPriority) {
1103         for (size_t i = 0; i < vbMap.numShards; ++i) {
1104             shard = vbMap.shards[i];
1105             if (shard->setHighPriorityVbSnapshotFlag(true)) {
1106                 ExTask task = new VBSnapshotTask(&engine, p, i, false);
1107                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1108             }
1109         }
1110     } else {
1111         for (size_t i = 0; i < vbMap.numShards; ++i) {
1112             shard = vbMap.shards[i];
1113             if (shard->setLowPriorityVbSnapshotFlag(true)) {
1114                 ExTask task = new VBSnapshotTask(&engine, p, i, false);
1115                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1116             }
1117         }
1118     }
1119     if (stats.isShutdown) {
1120         return false;
1121     }
1122     return true;
1123 }
1124
1125 void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p,
1126                                                    uint16_t shardId,
1127                                                    bool force) {
1128     KVShard *shard = vbMap.shards[shardId];
1129     if (p == Priority::VBucketPersistHighPriority) {
1130         if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
1131             ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
1132             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1133         }
1134     } else {
1135         if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
1136             ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
1137             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1138         }
1139     }
1140 }
1141
1142 void EventuallyPersistentStore::scheduleVBStatePersist(const Priority &priority,
1143                                                        uint16_t vbid,
1144                                                        bool force) {
1145     bool inverse = false;
1146     if (force ||
1147         schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
1148         ExTask task = new VBStatePersistTask(&engine, priority, vbid, false);
1149         ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1150     }
1151 }
1152
1153 bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
1154                                                         const void* cookie) {
1155     LockHolder lh(vbsetMutex);
1156
1157     hrtime_t start_time(gethrtime());
1158     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1159     if (!vb || vb->getState() == vbucket_state_dead ||
1160          vbMap.isBucketDeletion(vbid)) {
1161         lh.unlock();
1162         LockHolder vlh(vb_mutexes[vbid]);
1163         getRWUnderlying(vbid)->delVBucket(vbid);
1164         vbMap.setBucketDeletion(vbid, false);
1165         vbMap.setPersistenceSeqno(vbid, 0);
1166         ++stats.vbucketDeletions;
1167     }
1168
1169     hrtime_t spent(gethrtime() - start_time);
1170     hrtime_t wall_time = spent / 1000;
1171     BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
1172     stats.diskVBDelHisto.add(wall_time);
1173     atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
1174     stats.vbucketDelTotWalltime.fetch_add(wall_time);
1175     if (cookie) {
1176         engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1177     }
1178
1179     return true;
1180 }
1181
1182 void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
1183                                                    const void* cookie,
1184                                                    double delay) {
1185     ExTask delTask = new VBucketMemoryDeletionTask(engine, vb, delay);
1186     ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
1187
1188     if (vbMap.setBucketDeletion(vb->getId(), true)) {
1189         ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie,
1190                                        Priority::VBucketDeletionPriority);
1191         ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1192     }
1193 }
1194
1195 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteVBucket(uint16_t vbid,
1196                                                            const void* c) {
1197     // Lock to prevent a race condition between a failed update and add
1198     // (and delete).
1199     LockHolder lh(vbsetMutex);
1200
1201     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1202     if (!vb) {
1203         return ENGINE_NOT_MY_VBUCKET;
1204     }
1205
1206     engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
1207     vbMap.removeBucket(vbid);
1208     lh.unlock();
1209     scheduleVBDeletion(vb, c);
1210     if (c) {
1211         return ENGINE_EWOULDBLOCK;
1212     }
1213     return ENGINE_SUCCESS;
1214 }
1215
1216 ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
1217                                                        compaction_ctx c,
1218                                                        const void *cookie) {
1219     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1220     if (!vb) {
1221         return ENGINE_NOT_MY_VBUCKET;
1222     }
1223
1224     /* Update the compaction ctx with the previous purge seqno */
1225     c.max_purged_seq = vb->getPurgeSeqno();
1226
1227     LockHolder lh(compactionLock);
1228     ExTask task = new CompactVBucketTask(&engine, Priority::CompactorPriority,
1229                                          vbid, c, cookie);
1230     compactionTasks.push_back(std::make_pair(vbid, task));
1231     if (compactionTasks.size() > 1) {
1232         if ((stats.diskQueueSize > compactionWriteQueueCap &&
1233             compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
1234             engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
1235             // Snooze a new compaction task.
1236             // We will wake it up when one of the existing compaction tasks is done.
1237             task->snooze(60);
1238         }
1239     }
1240
1241     ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1242
1243     LOG(EXTENSION_LOG_DEBUG, "Scheduled compaction task %d on vbucket %d,"
1244         "purge_before_ts = %lld, purge_before_seq = %lld, dropdeletes = %d",
1245         task->getId(), vbid, c.purge_before_ts,
1246         c.purge_before_seq, c.drop_deletes);
1247
1248    return ENGINE_EWOULDBLOCK;
1249 }
1250
1251 class ExpiredItemsCallback : public Callback<compaction_ctx> {
1252     public:
1253         ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid)
1254             : epstore(store), vbucket(vbid) { }
1255
1256         void callback(compaction_ctx &ctx) {
1257             std::list<expiredItemCtx>::iterator it;
1258             for (it  = ctx.expiredItems.begin();
1259                  it != ctx.expiredItems.end(); it++) {
1260                 if (epstore->compactionCanExpireItems()) {
1261                     epstore->deleteExpiredItem(vbucket, it->keyStr,
1262                                                ctx.curr_time,
1263                                                it->revSeqno);
1264                 }
1265             }
1266         }
1267
1268     private:
1269         EventuallyPersistentStore *epstore;
1270         uint16_t vbucket;
1271 };
1272
1273 bool EventuallyPersistentStore::compactVBucket(const uint16_t vbid,
1274                                                compaction_ctx *ctx,
1275                                                const void *cookie) {
1276     ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1277     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1278     if (vb) {
1279         LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1280         if (!lh.islocked()) {
1281             return true; // Schedule a compaction task again.
1282         }
1283
1284         if (vb->getState() == vbucket_state_active) {
1285             // Set the current time ONLY for active vbuckets.
1286             ctx->curr_time = ep_real_time();
1287         } else {
1288             ctx->curr_time = 0;
1289         }
1290         ExpiredItemsCallback cb(this, vbid);
1291         KVStatsCallback kvcb(this);
1292         getRWUnderlying(vbid)->compactVBucket(vbid, ctx, cb, kvcb);
1293         vb->setPurgeSeqno(ctx->max_purged_seq);
1294     } else {
1295         err = ENGINE_NOT_MY_VBUCKET;
1296         engine.storeEngineSpecific(cookie, NULL);
1297         //Decrement session counter here, as memcached thread wouldn't
1298         //visit the engine interface in case of a NOT_MY_VB notification
1299         engine.decrementSessionCtr();
1300
1301     }
1302
1303     LockHolder lh(compactionLock);
1304     bool erased = false, woke = false;
1305     std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
1306     while (it != compactionTasks.end()) {
1307         if ((*it).first == vbid) {
1308             it = compactionTasks.erase(it);
1309             erased = true;
1310         } else {
1311             ExTask &task = (*it).second;
1312             if (task->getState() == TASK_SNOOZED) {
1313                 ExecutorPool::get()->wake(task->getId());
1314                 woke = true;
1315             }
1316             ++it;
1317         }
1318         if (erased && woke) {
1319             break;
1320         }
1321     }
1322     lh.unlock();
1323
1324     if (cookie) {
1325         engine.notifyIOComplete(cookie, err);
1326     }
1327     --stats.pendingCompactions;
1328     return false;
1329 }
1330
1331 bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
1332     LockHolder lh(vbsetMutex);
1333     bool rv(false);
1334
1335     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1336     if (vb) {
1337         vbucket_state_t vbstate = vb->getState();
1338
1339         vbMap.removeBucket(vbid);
1340         lh.unlock();
1341
1342         std::list<std::string> tap_cursors = vb->checkpointManager.
1343                                              getTAPCursorNames();
1344         // Delete and recreate the vbucket database file
1345         scheduleVBDeletion(vb, NULL, 0);
1346         setVBucketState(vbid, vbstate, false);
1347
1348         // Copy the all cursors from the old vbucket into the new vbucket
1349         RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
1350         newvb->checkpointManager.resetTAPCursors(tap_cursors);
1351
1352         rv = true;
1353     }
1354     return rv;
1355 }
1356
1357 extern "C" {
1358
1359     typedef struct {
1360         EventuallyPersistentEngine* engine;
1361         std::map<std::string, std::string> smap;
1362     } snapshot_stats_t;
1363
1364     static void add_stat(const char *key, const uint16_t klen,
1365                          const char *val, const uint32_t vlen,
1366                          const void *cookie) {
1367         cb_assert(cookie);
1368         void *ptr = const_cast<void *>(cookie);
1369         snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1370         ObjectRegistry::onSwitchThread(snap->engine);
1371
1372         std::string k(key, klen);
1373         std::string v(val, vlen);
1374         snap->smap.insert(std::pair<std::string, std::string>(k, v));
1375     }
1376 }
1377
1378 void EventuallyPersistentStore::snapshotStats() {
1379     snapshot_stats_t snap;
1380     snap.engine = &engine;
1381     std::map<std::string, std::string>  smap;
1382     bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1383               engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1384               engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS;
1385
1386     if (rv && stats.isShutdown) {
1387         snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1388                                                               "true" : "false";
1389         std::stringstream ss;
1390         ss << ep_real_time();
1391         snap.smap["ep_shutdown_time"] = ss.str();
1392     }
1393     getOneRWUnderlying()->snapshotStats(snap.smap);
1394 }
1395
1396 void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
1397                                               const hrtime_t start,
1398                                               const hrtime_t stop) {
1399     if (stop >= start && start >= init) {
1400         // skip the measurement if the counter wrapped...
1401         ++stats.bgNumOperations;
1402         hrtime_t w = (start - init) / 1000;
1403         BlockTimer::log(start - init, "bgwait", stats.timingLog);
1404         stats.bgWaitHisto.add(w);
1405         stats.bgWait.fetch_add(w);
1406         atomic_setIfLess(stats.bgMinWait, w);
1407         atomic_setIfBigger(stats.bgMaxWait, w);
1408
1409         hrtime_t l = (stop - start) / 1000;
1410         BlockTimer::log(stop - start, "bgload", stats.timingLog);
1411         stats.bgLoadHisto.add(l);
1412         stats.bgLoad.fetch_add(l);
1413         atomic_setIfLess(stats.bgMinLoad, l);
1414         atomic_setIfBigger(stats.bgMaxLoad, l);
1415     }
1416 }
1417
1418 void EventuallyPersistentStore::completeBGFetch(const std::string &key,
1419                                                 uint16_t vbucket,
1420                                                 uint64_t rowid,
1421                                                 const void *cookie,
1422                                                 hrtime_t init,
1423                                                 bool isMeta) {
1424     hrtime_t start(gethrtime());
1425     // Go find the data
1426     RememberingCallback<GetValue> gcb;
1427     if (isMeta) {
1428         gcb.val.setPartial();
1429         ++stats.bg_meta_fetched;
1430     } else {
1431         ++stats.bg_fetched;
1432     }
1433     getROUnderlying(vbucket)->get(key, rowid, vbucket, gcb);
1434     gcb.waitForValue();
1435     cb_assert(gcb.fired);
1436     ENGINE_ERROR_CODE status = gcb.val.getStatus();
1437
1438     // Lock to prevent a race condition between a fetch for restore and delete
1439     LockHolder lh(vbsetMutex);
1440
1441     RCPtr<VBucket> vb = getVBucket(vbucket);
1442     if (vb) {
1443         ReaderLockHolder rlh(vb->getStateLock());
1444         int bucket_num(0);
1445         LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1446         StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1447         if (isMeta) {
1448             if ((v && v->unlocked_restoreMeta(gcb.val.getValue(),
1449                                               gcb.val.getStatus(), vb->ht))
1450                 || ENGINE_KEY_ENOENT == status) {
1451                 /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1452                  key is removed from hash table by the time bgfetch returns
1453                  (in case multiple bgfetch is scheduled for a key), we still
1454                  need to return ENGINE_SUCCESS to the memcached worker thread,
1455                  so that the worker thread can visit the ep-engine and figure
1456                  out the correct flow */
1457                 status = ENGINE_SUCCESS;
1458             }
1459         } else {
1460             bool restore = false;
1461             if (v && v->isResident()) {
1462                 status = ENGINE_SUCCESS;
1463             } else {
1464                 switch (eviction_policy) {
1465                     case VALUE_ONLY:
1466                         if (v && !v->isResident() && !v->isDeleted()) {
1467                             restore = true;
1468                         }
1469                         break;
1470                     case FULL_EVICTION:
1471                         if (v) {
1472                             if (v->isTempInitialItem() ||
1473                                 (!v->isResident() && !v->isDeleted())) {
1474                                 restore = true;
1475                             }
1476                         }
1477                         break;
1478                     default:
1479                         throw std::logic_error("Unknown eviction policy");
1480                 }
1481             }
1482
1483             if (restore) {
1484                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1485                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1486                     cb_assert(v->isResident());
1487                     if (vb->getState() == vbucket_state_active &&
1488                         v->getExptime() != gcb.val.getValue()->getExptime() &&
1489                         v->getCas() == gcb.val.getValue()->getCas()) {
1490                         // MB-9306: It is possible that by the time bgfetcher
1491                         // returns, the item may have been updated and queued
1492                         // Hence test the CAS value to be the same first.
1493                         // exptime mutated, schedule it into new checkpoint
1494                         queueDirty(vb, v, &hlh);
1495                     }
1496                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1497                     v->setStoredValueState(
1498                                           StoredValue::state_non_existent_key);
1499                     if (eviction_policy == FULL_EVICTION) {
1500                         // For the full eviction, we should notify
1501                         // ENGINE_SUCCESS to the memcached worker thread, so
1502                         // that the worker thread can visit the ep-engine and
1503                         // figure out the correct error code.
1504                         status = ENGINE_SUCCESS;
1505                     }
1506                 } else {
1507                     // underlying kvstore couldn't fetch requested data
1508                     // log returned error and notify TMPFAIL to client
1509                     LOG(EXTENSION_LOG_WARNING,
1510                         "Warning: failed background fetch for vb=%d seq=%d "
1511                         "key=%s", vbucket, v->getBySeqno(), key.c_str());
1512                     status = ENGINE_TMPFAIL;
1513                 }
1514             }
1515         }
1516     } else {
1517         LOG(EXTENSION_LOG_INFO, "VBucket %d's file was deleted in the middle of"
1518             " a bg fetch for key %s\n", vbucket, key.c_str());
1519         status = ENGINE_NOT_MY_VBUCKET;
1520     }
1521
1522     lh.unlock();
1523
1524     hrtime_t stop = gethrtime();
1525     updateBGStats(init, start, stop);
1526     bgFetchQueue--;
1527
1528     delete gcb.val.getValue();
1529     engine.notifyIOComplete(cookie, status);
1530 }
1531
1532 void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
1533                                  std::vector<bgfetched_item_t> &fetchedItems,
1534                                  hrtime_t startTime)
1535 {
1536     RCPtr<VBucket> vb = getVBucket(vbId);
1537     if (!vb) {
1538         std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1539         for (; itemItr != fetchedItems.end(); ++itemItr) {
1540             engine.notifyIOComplete((*itemItr).second->cookie,
1541                                     ENGINE_NOT_MY_VBUCKET);
1542         }
1543         LOG(EXTENSION_LOG_WARNING,
1544             "EP Store completes %d of batched background fetch for "
1545             "for vBucket = %d that is already deleted\n",
1546             (int)fetchedItems.size(), vbId);
1547         return;
1548     }
1549
1550     std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1551     for (; itemItr != fetchedItems.end(); ++itemItr) {
1552         VBucketBGFetchItem *bgitem = (*itemItr).second;
1553         ENGINE_ERROR_CODE status = bgitem->value.getStatus();
1554         Item *fetchedValue = bgitem->value.getValue();
1555         const std::string &key = (*itemItr).first;
1556         {   // locking scope
1557             ReaderLockHolder rlh(vb->getStateLock());
1558
1559             int bucket = 0;
1560             LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
1561             StoredValue *v = fetchValidValue(vb, key, bucket, true);
1562             if (bgitem->metaDataOnly) {
1563                 if ((v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht))
1564                     || ENGINE_KEY_ENOENT == status) {
1565                     /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1566                      key is removed from hash table by the time bgfetch returns
1567                      (in case multiple bgfetch is scheduled for a key), we still
1568                      need to return ENGINE_SUCCESS to the memcached worker thread,
1569                      so that the worker thread can visit the ep-engine and figure
1570                      out the correct flow */
1571                     status = ENGINE_SUCCESS;
1572                 }
1573             } else {
1574                 bool restore = false;
1575                 if (v && v->isResident()) {
1576                     status = ENGINE_SUCCESS;
1577                 } else {
1578                     switch (eviction_policy) {
1579                         case VALUE_ONLY:
1580                             if (v && !v->isResident() && !v->isDeleted()) {
1581                                 restore = true;
1582                             }
1583                             break;
1584                         case FULL_EVICTION:
1585                             if (v) {
1586                                 if (v->isTempInitialItem() ||
1587                                     (!v->isResident() && !v->isDeleted())) {
1588                                     restore = true;
1589                                 }
1590                             }
1591                             break;
1592                         default:
1593                             throw std::logic_error("Unknown eviction policy");
1594                     }
1595                 }
1596
1597                 if (restore) {
1598                     if (status == ENGINE_SUCCESS) {
1599                         v->unlocked_restoreValue(fetchedValue, vb->ht);
1600                         cb_assert(v->isResident());
1601                         if (vb->getState() == vbucket_state_active &&
1602                             v->getExptime() != fetchedValue->getExptime() &&
1603                             v->getCas() == fetchedValue->getCas()) {
1604                             // MB-9306: It is possible that by the time
1605                             // bgfetcher returns, the item may have been
1606                             // updated and queued
1607                             // Hence test the CAS value to be the same first.
1608                             // exptime mutated, schedule it into new checkpoint
1609                             queueDirty(vb, v, &blh);
1610                         }
1611                     } else if (status == ENGINE_KEY_ENOENT) {
1612                         v->setStoredValueState(StoredValue::state_non_existent_key);
1613                         if (eviction_policy == FULL_EVICTION) {
1614                             // For the full eviction, we should notify
1615                             // ENGINE_SUCCESS to the memcached worker thread,
1616                             // so that the worker thread can visit the
1617                             // ep-engine and figure out the correct error
1618                             // code.
1619                             status = ENGINE_SUCCESS;
1620                         }
1621                     } else {
1622                         // underlying kvstore couldn't fetch requested data
1623                         // log returned error and notify TMPFAIL to client
1624                         LOG(EXTENSION_LOG_WARNING,
1625                             "Warning: failed background fetch for vb=%d "
1626                             "key=%s", vbId, key.c_str());
1627                         status = ENGINE_TMPFAIL;
1628                     }
1629                 }
1630             }
1631         } // locking scope ends
1632
1633         if (bgitem->metaDataOnly) {
1634             ++stats.bg_meta_fetched;
1635         } else {
1636             ++stats.bg_fetched;
1637         }
1638
1639         hrtime_t endTime = gethrtime();
1640         updateBGStats(bgitem->initTime, startTime, endTime);
1641         engine.notifyIOComplete(bgitem->cookie, status);
1642     }
1643
1644     LOG(EXTENSION_LOG_DEBUG,
1645         "EP Store completes %d of batched background fetch "
1646         "for vBucket = %d endTime = %lld\n",
1647         fetchedItems.size(), vbId, gethrtime()/1000000);
1648 }
1649
1650 void EventuallyPersistentStore::bgFetch(const std::string &key,
1651                                         uint16_t vbucket,
1652                                         uint64_t rowid,
1653                                         const void *cookie,
1654                                         bool isMeta) {
1655     std::stringstream ss;
1656
1657     if (multiBGFetchEnabled()) {
1658         RCPtr<VBucket> vb = getVBucket(vbucket);
1659         cb_assert(vb);
1660         KVShard *myShard = vbMap.getShard(vbucket);
1661
1662         // schedule to the current batch of background fetch of the given
1663         // vbucket
1664         VBucketBGFetchItem * fetchThis = new VBucketBGFetchItem(cookie,
1665                                                                 isMeta);
1666         vb->queueBGFetchItem(key, fetchThis, myShard->getBgFetcher());
1667         myShard->getBgFetcher()->notifyBGEvent();
1668         ss << "Queued a background fetch, now at "
1669            << vb->numPendingBGFetchItems() << std::endl;
1670         LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1671     } else {
1672         bgFetchQueue++;
1673         stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
1674                                             bgFetchQueue.load());
1675         ExecutorPool* iom = ExecutorPool::get();
1676         ExTask task = new BGFetchTask(&engine, key, vbucket, rowid, cookie,
1677                                       isMeta,
1678                                       Priority::BgFetcherGetMetaPriority,
1679                                       bgFetchDelay, false);
1680         iom->schedule(task, READER_TASK_IDX);
1681         ss << "Queued a background fetch, now at " << bgFetchQueue.load()
1682            << std::endl;
1683         LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1684     }
1685 }
1686
1687 GetValue EventuallyPersistentStore::getInternal(const std::string &key,
1688                                                 uint16_t vbucket,
1689                                                 const void *cookie,
1690                                                 bool queueBG,
1691                                                 bool honorStates,
1692                                                 vbucket_state_t allowedState,
1693                                                 bool trackReference) {
1694
1695     vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1696         vbucket_state_replica : vbucket_state_active;
1697     RCPtr<VBucket> vb = getVBucket(vbucket);
1698     if (!vb) {
1699         ++stats.numNotMyVBuckets;
1700         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1701     }
1702
1703     ReaderLockHolder rlh(vb->getStateLock());
1704     if (honorStates && vb->getState() == vbucket_state_dead) {
1705         ++stats.numNotMyVBuckets;
1706         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1707     } else if (honorStates && vb->getState() == disallowedState) {
1708         ++stats.numNotMyVBuckets;
1709         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1710     } else if (honorStates && vb->getState() == vbucket_state_pending) {
1711         if (vb->addPendingOp(cookie)) {
1712             return GetValue(NULL, ENGINE_EWOULDBLOCK);
1713         }
1714     }
1715
1716     int bucket_num(0);
1717     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1718     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
1719                                      trackReference);
1720     if (v) {
1721         if (v->isDeleted() || v->isTempDeletedItem() ||
1722             v->isTempNonExistentItem()) {
1723             GetValue rv;
1724             return rv;
1725         }
1726         // If the value is not resident, wait for it...
1727         if (!v->isResident()) {
1728             if (queueBG) {
1729                 bgFetch(key, vbucket, v->getBySeqno(), cookie);
1730             }
1731             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
1732                             true, v->getNRUValue());
1733         }
1734
1735         GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
1736                     ENGINE_SUCCESS, v->getBySeqno(), false, v->getNRUValue());
1737         return rv;
1738     } else {
1739         if (eviction_policy == VALUE_ONLY || diskFlushAll) {
1740             GetValue rv;
1741             return rv;
1742         }
1743         ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1744         if (queueBG) { // Full eviction and need a bg fetch.
1745             ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
1746                                        cookie, false);
1747         }
1748         return GetValue(NULL, ec, -1, true);
1749     }
1750 }
1751
1752 GetValue EventuallyPersistentStore::getRandomKey() {
1753     long max = vbMap.getSize();
1754
1755     long start = random() % max;
1756     long curr = start;
1757     Item *itm = NULL;
1758
1759     while (itm == NULL) {
1760         RCPtr<VBucket> vb = getVBucket(curr++);
1761         while (!vb || vb->getState() != vbucket_state_active) {
1762             if (curr == start) {
1763                 return GetValue(NULL, ENGINE_KEY_ENOENT);
1764             }
1765             if (curr == max) {
1766                 curr = 0;
1767             }
1768
1769             vb = getVBucket(curr++);
1770         }
1771
1772         if ((itm = vb->ht.getRandomKey(random())) != NULL) {
1773             GetValue rv(itm, ENGINE_SUCCESS);
1774             return rv;
1775         }
1776
1777         if (curr == max) {
1778             curr = 0;
1779         }
1780
1781         if (curr == start) {
1782             return GetValue(NULL, ENGINE_KEY_ENOENT);
1783         }
1784         // Search next vbucket
1785     }
1786
1787     return GetValue(NULL, ENGINE_KEY_ENOENT);
1788 }
1789
1790
1791 ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
1792                                                         const std::string &key,
1793                                                         uint16_t vbucket,
1794                                                         const void *cookie,
1795                                                         ItemMetaData &metadata,
1796                                                         uint32_t &deleted,
1797                                                         bool trackReferenced)
1798 {
1799     (void) cookie;
1800     RCPtr<VBucket> vb = getVBucket(vbucket);
1801     if (!vb) {
1802         ++stats.numNotMyVBuckets;
1803         return ENGINE_NOT_MY_VBUCKET;
1804     }
1805
1806     ReaderLockHolder rlh(vb->getStateLock());
1807     if (vb->getState() == vbucket_state_dead ||
1808         vb->getState() == vbucket_state_replica) {
1809         ++stats.numNotMyVBuckets;
1810         return ENGINE_NOT_MY_VBUCKET;
1811     }
1812
1813     int bucket_num(0);
1814     deleted = 0;
1815     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1816     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true,
1817                                           trackReferenced);
1818
1819     if (v) {
1820         stats.numOpsGetMeta++;
1821
1822         if (v->isTempInitialItem()) { // Need bg meta fetch.
1823             bgFetch(key, vbucket, -1, cookie, true);
1824             return ENGINE_EWOULDBLOCK;
1825         } else if (v->isTempNonExistentItem()) {
1826             metadata.cas = v->getCas();
1827             return ENGINE_KEY_ENOENT;
1828         } else {
1829             if (v->isTempDeletedItem() || v->isDeleted() ||
1830                 v->isExpired(ep_real_time())) {
1831                 deleted |= GET_META_ITEM_DELETED_FLAG;
1832             }
1833
1834             if (v->isLocked(ep_current_time())) {
1835                 metadata.cas = static_cast<uint64_t>(-1);
1836             } else {
1837                 metadata.cas = v->getCas();
1838             }
1839             metadata.flags = v->getFlags();
1840             metadata.exptime = v->getExptime();
1841             metadata.revSeqno = v->getRevSeqno();
1842             return ENGINE_SUCCESS;
1843         }
1844     } else {
1845         // The key wasn't found. However, this may be because it was previously
1846         // deleted or evicted with the full eviction strategy.
1847         // So, add a temporary item corresponding to the key to the hash table
1848         // and schedule a background fetch for its metadata from the persistent
1849         // store. The item's state will be updated after the fetch completes.
1850         return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
1851     }
1852 }
1853
1854 ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
1855                                                          uint64_t cas,
1856                                                          const void *cookie,
1857                                                          bool force,
1858                                                          bool allowExisting,
1859                                                          uint8_t nru,
1860                                                          bool genBySeqno,
1861                                                          bool isReplication)
1862 {
1863     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
1864     if (!vb) {
1865         ++stats.numNotMyVBuckets;
1866         return ENGINE_NOT_MY_VBUCKET;
1867     }
1868
1869     ReaderLockHolder rlh(vb->getStateLock());
1870     if (vb->getState() == vbucket_state_dead) {
1871         ++stats.numNotMyVBuckets;
1872         return ENGINE_NOT_MY_VBUCKET;
1873     } else if (vb->getState() == vbucket_state_replica && !force) {
1874         ++stats.numNotMyVBuckets;
1875         return ENGINE_NOT_MY_VBUCKET;
1876     } else if (vb->getState() == vbucket_state_pending && !force) {
1877         if (vb->addPendingOp(cookie)) {
1878             return ENGINE_EWOULDBLOCK;
1879         }
1880     }
1881
1882     int bucket_num(0);
1883     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
1884     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
1885                                           false);
1886
1887     if (!force) {
1888         if (v)  {
1889             if (v->isTempInitialItem()) {
1890                 bgFetch(itm.getKey(), itm.getVBucketId(), -1, cookie, true);
1891                 return ENGINE_EWOULDBLOCK;
1892             }
1893             if (!conflictResolver->resolve(v, itm.getMetaData(), false)) {
1894                 ++stats.numOpsSetMetaResolutionFailed;
1895                 return ENGINE_KEY_EEXISTS;
1896             }
1897         } else {
1898             return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1899                                          cookie, true, isReplication);
1900         }
1901     }
1902
1903     if (v && v->isLocked(ep_current_time()) &&
1904         (vb->getState() == vbucket_state_replica ||
1905          vb->getState() == vbucket_state_pending)) {
1906         v->unlock();
1907     }
1908     mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
1909                                                 true, eviction_policy, nru,
1910                                                 isReplication);
1911
1912     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1913     switch (mtype) {
1914     case NOMEM:
1915         ret = ENGINE_ENOMEM;
1916         break;
1917     case INVALID_CAS:
1918     case IS_LOCKED:
1919         ret = ENGINE_KEY_EEXISTS;
1920         break;
1921     case INVALID_VBUCKET:
1922         ret = ENGINE_NOT_MY_VBUCKET;
1923         break;
1924     case WAS_DIRTY:
1925     case WAS_CLEAN:
1926         queueDirty(vb, v, &lh, false, true, genBySeqno);
1927         break;
1928     case NOT_FOUND:
1929         ret = ENGINE_KEY_ENOENT;
1930         break;
1931     case NEED_BG_FETCH:
1932         {            // CAS operation with non-resident item + full eviction.
1933             if (v) { // temp item is already created. Simply schedule a
1934                 lh.unlock(); // bg fetch job.
1935                 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
1936                 return ENGINE_EWOULDBLOCK;
1937             }
1938             ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1939                                         cookie, true, isReplication);
1940         }
1941     }
1942
1943     return ret;
1944 }
1945
1946 GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
1947                                                     uint16_t vbucket,
1948                                                     const void *cookie,
1949                                                     time_t exptime)
1950 {
1951     RCPtr<VBucket> vb = getVBucket(vbucket);
1952     if (!vb) {
1953         ++stats.numNotMyVBuckets;
1954         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1955     }
1956
1957     ReaderLockHolder rlh(vb->getStateLock());
1958     if (vb->getState() == vbucket_state_dead) {
1959         ++stats.numNotMyVBuckets;
1960         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1961     } else if (vb->getState() == vbucket_state_replica) {
1962         ++stats.numNotMyVBuckets;
1963         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1964     } else if (vb->getState() == vbucket_state_pending) {
1965         if (vb->addPendingOp(cookie)) {
1966             return GetValue(NULL, ENGINE_EWOULDBLOCK);
1967         }
1968     }
1969
1970     int bucket_num(0);
1971     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1972     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1973
1974     if (v) {
1975         if (v->isDeleted() || v->isTempDeletedItem() ||
1976             v->isTempNonExistentItem()) {
1977             GetValue rv;
1978             return rv;
1979         }
1980
1981         if (!v->isResident()) {
1982             bgFetch(key, vbucket, v->getBySeqno(), cookie);
1983             return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
1984         }
1985         if (v->isLocked(ep_current_time())) {
1986             GetValue rv(NULL, ENGINE_KEY_EEXISTS, 0);
1987             return rv;
1988         }
1989
1990         bool exptime_mutated = exptime != v->getExptime() ? true : false;
1991         if (exptime_mutated) {
1992            v->markDirty();
1993            v->setExptime(exptime);
1994         }
1995
1996         GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
1997                     ENGINE_SUCCESS, v->getBySeqno());
1998
1999         if (exptime_mutated) {
2000             if (vb->getState() == vbucket_state_active) {
2001                 // persist the item in the underlying storage for
2002                 // mutated exptime but only if VB is active.
2003                 queueDirty(vb, v, &lh);
2004             }
2005         }
2006         return rv;
2007     } else {
2008         if (eviction_policy == VALUE_ONLY) {
2009             GetValue rv;
2010             return rv;
2011         } else {
2012             ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
2013                                                          vb, cookie, false);
2014             return GetValue(NULL, ec, -1, true);
2015         }
2016     }
2017 }
2018
2019 ENGINE_ERROR_CODE
2020 EventuallyPersistentStore::statsVKey(const std::string &key,
2021                                      uint16_t vbucket,
2022                                      const void *cookie) {
2023     RCPtr<VBucket> vb = getVBucket(vbucket);
2024     if (!vb) {
2025         return ENGINE_NOT_MY_VBUCKET;
2026     }
2027
2028     int bucket_num(0);
2029     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2030     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2031
2032     if (v) {
2033         if (v->isDeleted() || v->isTempDeletedItem() ||
2034             v->isTempNonExistentItem()) {
2035             return ENGINE_KEY_ENOENT;
2036         }
2037         bgFetchQueue++;
2038         cb_assert(bgFetchQueue > 0);
2039         ExecutorPool* iom = ExecutorPool::get();
2040         ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
2041                                            v->getBySeqno(), cookie,
2042                                            Priority::VKeyStatBgFetcherPriority,
2043                                            bgFetchDelay, false);
2044         iom->schedule(task, READER_TASK_IDX);
2045         return ENGINE_EWOULDBLOCK;
2046     } else {
2047         if (eviction_policy == VALUE_ONLY) {
2048             return ENGINE_KEY_ENOENT;
2049         } else {
2050             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2051                                                         eviction_policy);
2052             switch(rv) {
2053             case ADD_NOMEM:
2054                 return ENGINE_ENOMEM;
2055             case ADD_EXISTS:
2056             case ADD_UNDEL:
2057             case ADD_SUCCESS:
2058             case ADD_TMP_AND_BG_FETCH:
2059                 // Since the hashtable bucket is locked, we shouldn't get here
2060                 abort();
2061             case ADD_BG_FETCH:
2062                 {
2063                     ++bgFetchQueue;
2064                     cb_assert(bgFetchQueue > 0);
2065                     ExecutorPool* iom = ExecutorPool::get();
2066                     ExTask task = new VKeyStatBGFetchTask(&engine, key,
2067                                                           vbucket, -1, cookie,
2068                                            Priority::VKeyStatBgFetcherPriority,
2069                                                           bgFetchDelay, false);
2070                     iom->schedule(task, READER_TASK_IDX);
2071                 }
2072             }
2073             return ENGINE_EWOULDBLOCK;
2074         }
2075     }
2076 }
2077
2078 void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
2079                                                   std::string &key,
2080                                                   uint16_t vbid,
2081                                                   uint64_t bySeqNum) {
2082     RememberingCallback<GetValue> gcb;
2083
2084     getROUnderlying(vbid)->get(key, bySeqNum, vbid, gcb);
2085     gcb.waitForValue();
2086     cb_assert(gcb.fired);
2087
2088     if (eviction_policy == FULL_EVICTION) {
2089         RCPtr<VBucket> vb = getVBucket(vbid);
2090         if (vb) {
2091             int bucket_num(0);
2092             LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
2093             StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2094             if (v && v->isTempInitialItem()) {
2095                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2096                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
2097                     cb_assert(v->isResident());
2098                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
2099                     v->setStoredValueState(
2100                                           StoredValue::state_non_existent_key);
2101                 } else {
2102                     // underlying kvstore couldn't fetch requested data
2103                     // log returned error and notify TMPFAIL to client
2104                     LOG(EXTENSION_LOG_WARNING,
2105                         "Warning: failed background fetch for vb=%d seq=%d "
2106                         "key=%s", vbid, v->getBySeqno(), key.c_str());
2107                 }
2108             }
2109         }
2110     }
2111
2112     if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2113         engine.addLookupResult(cookie, gcb.val.getValue());
2114     } else {
2115         engine.addLookupResult(cookie, NULL);
2116     }
2117
2118     bgFetchQueue--;
2119     engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
2120 }
2121
2122 bool EventuallyPersistentStore::getLocked(const std::string &key,
2123                                           uint16_t vbucket,
2124                                           Callback<GetValue> &cb,
2125                                           rel_time_t currentTime,
2126                                           uint32_t lockTimeout,
2127                                           const void *cookie) {
2128     RCPtr<VBucket> vb = getVBucket(vbucket);
2129     if (!vb || vb->getState() != vbucket_state_active) {
2130         ++stats.numNotMyVBuckets;
2131         GetValue rv(NULL, ENGINE_NOT_MY_VBUCKET);
2132         cb.callback(rv);
2133         return false;
2134     }
2135
2136     int bucket_num(0);
2137     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2138     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2139
2140     if (v) {
2141         if (v->isDeleted() || v->isTempNonExistentItem() ||
2142             v->isTempDeletedItem()) {
2143             GetValue rv;
2144             cb.callback(rv);
2145             return true;
2146         }
2147
2148         // if v is locked return error
2149         if (v->isLocked(currentTime)) {
2150             GetValue rv;
2151             cb.callback(rv);
2152             return false;
2153         }
2154
2155         // If the value is not resident, wait for it...
2156         if (!v->isResident()) {
2157             if (cookie) {
2158                 bgFetch(key, vbucket, v->getBySeqno(), cookie);
2159             }
2160             GetValue rv(NULL, ENGINE_EWOULDBLOCK, -1, true);
2161             cb.callback(rv);
2162             return false;
2163         }
2164
2165         // acquire lock and increment cas value
2166         v->lock(currentTime + lockTimeout);
2167
2168         Item *it = v->toItem(false, vbucket);
2169         it->setCas();
2170         v->setCas(it->getCas());
2171
2172         GetValue rv(it);
2173         cb.callback(rv);
2174         return true;
2175     } else {
2176         if (eviction_policy == VALUE_ONLY) {
2177             GetValue rv;
2178             cb.callback(rv);
2179             return true;
2180         } else {
2181             ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
2182                                                          vb, cookie, false);
2183             GetValue rv(NULL, ec, -1, true);
2184             cb.callback(rv);
2185             return false;
2186         }
2187     }
2188 }
2189
2190 ENGINE_ERROR_CODE
2191 EventuallyPersistentStore::unlockKey(const std::string &key,
2192                                      uint16_t vbucket,
2193                                      uint64_t cas,
2194                                      rel_time_t currentTime)
2195 {
2196
2197     RCPtr<VBucket> vb = getVBucket(vbucket);
2198     if (!vb || vb->getState() != vbucket_state_active) {
2199         ++stats.numNotMyVBuckets;
2200         return ENGINE_NOT_MY_VBUCKET;
2201     }
2202
2203     int bucket_num(0);
2204     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2205     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2206
2207     if (v) {
2208         if (v->isDeleted() || v->isTempNonExistentItem() ||
2209             v->isTempDeletedItem()) {
2210             return ENGINE_KEY_ENOENT;
2211         }
2212         if (v->isLocked(currentTime)) {
2213             if (v->getCas() == cas) {
2214                 v->unlock();
2215                 return ENGINE_SUCCESS;
2216             }
2217         }
2218         return ENGINE_TMPFAIL;
2219     } else {
2220         if (eviction_policy == VALUE_ONLY) {
2221             return ENGINE_KEY_ENOENT;
2222         } else {
2223             // With the full eviction, an item's lock is automatically
2224             // released when the item is evicted from memory. Therefore,
2225             // we simply return ENGINE_TMPFAIL when we receive unlockKey
2226             // for an item that is not in memocy cache. Note that we don't
2227             // spawn any bg fetch job to figure out if an item actually
2228             // exists in disk or not.
2229             return ENGINE_TMPFAIL;
2230         }
2231     }
2232 }
2233
2234
2235 ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
2236                                             const std::string &key,
2237                                             uint16_t vbucket,
2238                                             const void *cookie,
2239                                             struct key_stats &kstats,
2240                                             bool bgfetch,
2241                                             bool wantsDeleted)
2242 {
2243     RCPtr<VBucket> vb = getVBucket(vbucket);
2244     if (!vb) {
2245         return ENGINE_NOT_MY_VBUCKET;
2246     }
2247
2248     int bucket_num(0);
2249     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2250     StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2251
2252     if (v) {
2253         if ((v->isDeleted() && !wantsDeleted) ||
2254             v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2255             return ENGINE_KEY_ENOENT;
2256         }
2257         if (eviction_policy == FULL_EVICTION &&
2258             v->isTempInitialItem() && bgfetch) {
2259             lh.unlock();
2260             bgFetch(key, vbucket, -1, cookie, true);
2261             return ENGINE_EWOULDBLOCK;
2262         }
2263         kstats.logically_deleted = v->isDeleted();
2264         kstats.dirty = v->isDirty();
2265         kstats.exptime = v->getExptime();
2266         kstats.flags = v->getFlags();
2267         kstats.cas = v->getCas();
2268         kstats.vb_state = vb->getState();
2269         return ENGINE_SUCCESS;
2270     } else {
2271         if (eviction_policy == VALUE_ONLY) {
2272             return ENGINE_KEY_ENOENT;
2273         } else {
2274             if (bgfetch) {
2275                 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2276                                              cookie, true);
2277             } else {
2278                 return ENGINE_KEY_ENOENT;
2279             }
2280         }
2281     }
2282 }
2283
2284 std::string EventuallyPersistentStore::validateKey(const std::string &key,
2285                                                    uint16_t vbucket,
2286                                                    Item &diskItem) {
2287     int bucket_num(0);
2288     RCPtr<VBucket> vb = getVBucket(vbucket);
2289     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2290     StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
2291                                      false, true);
2292
2293     if (v) {
2294         if (v->isDeleted() || v->isTempNonExistentItem() ||
2295             v->isTempDeletedItem()) {
2296             return "item_deleted";
2297         }
2298
2299         if (diskItem.getFlags() != v->getFlags()) {
2300             return "flags_mismatch";
2301         } else if (v->isResident() && memcmp(diskItem.getData(),
2302                                              v->getValue()->getData(),
2303                                              diskItem.getNBytes())) {
2304             return "data_mismatch";
2305         } else {
2306             return "valid";
2307         }
2308     } else {
2309         return "item_deleted";
2310     }
2311
2312 }
2313
2314 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
2315                                                         uint64_t* cas,
2316                                                         uint16_t vbucket,
2317                                                         const void *cookie,
2318                                                         bool force,
2319                                                         ItemMetaData *itemMeta,
2320                                                         bool tapBackfill)
2321 {
2322     RCPtr<VBucket> vb = getVBucket(vbucket);
2323     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2324         ++stats.numNotMyVBuckets;
2325         return ENGINE_NOT_MY_VBUCKET;
2326     } else if(vb->getState() == vbucket_state_replica && !force) {
2327         ++stats.numNotMyVBuckets;
2328         return ENGINE_NOT_MY_VBUCKET;
2329     } else if(vb->getState() == vbucket_state_pending && !force) {
2330         if (vb->addPendingOp(cookie)) {
2331             return ENGINE_EWOULDBLOCK;
2332         }
2333     }
2334
2335     int bucket_num(0);
2336     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2337     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2338     if (!v || v->isDeleted() || v->isTempItem()) {
2339         if (eviction_policy == VALUE_ONLY) {
2340             return ENGINE_KEY_ENOENT;
2341         } else { // Full eviction.
2342             if (!force) {
2343                 if (!v) { // Item might be evicted from cache.
2344                     return addTempItemForBgFetch(lh, bucket_num, key, vb,
2345                                                  cookie, true);
2346                 } else if (v->isTempInitialItem()) {
2347                     lh.unlock();
2348                     bgFetch(key, vbucket, -1, cookie, true);
2349                     return ENGINE_EWOULDBLOCK;
2350                 } else { // Non-existent or deleted key.
2351                     return ENGINE_KEY_ENOENT;
2352                 }
2353             } else {
2354                 if (!v) { // Item might be evicted from cache.
2355                     // Create a temp item and delete it below as it is a
2356                     // force deletion.
2357                     add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num,
2358                                                               key,
2359                                                               eviction_policy);
2360                     if (rv == ADD_NOMEM) {
2361                         return ENGINE_ENOMEM;
2362                     }
2363                     v = vb->ht.unlocked_find(key, bucket_num, true, false);
2364                     v->setStoredValueState(StoredValue::state_deleted_key);
2365                 } else if (v->isTempInitialItem()) {
2366                     v->setStoredValueState(StoredValue::state_deleted_key);
2367                 } else { // Non-existent or deleted key.
2368                     return ENGINE_KEY_ENOENT;
2369                 }
2370             }
2371         }
2372     }
2373
2374     if (v && v->isLocked(ep_current_time()) &&
2375         (vb->getState() == vbucket_state_replica ||
2376          vb->getState() == vbucket_state_pending)) {
2377         v->unlock();
2378     }
2379     mutation_type_t delrv;
2380     delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
2381
2382     if (itemMeta && v) {
2383         itemMeta->revSeqno = v->getRevSeqno();
2384         itemMeta->cas = v->getCas();
2385         itemMeta->flags = v->getFlags();
2386         itemMeta->exptime = v->getExptime();
2387     }
2388     *cas = v ? v->getCas() : 0;
2389
2390     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2391     switch (delrv) {
2392     case NOMEM:
2393         ret = ENGINE_ENOMEM;
2394         break;
2395     case INVALID_VBUCKET:
2396         ret = ENGINE_NOT_MY_VBUCKET;
2397         break;
2398     case INVALID_CAS:
2399         ret = ENGINE_KEY_EEXISTS;
2400         break;
2401     case IS_LOCKED:
2402         ret = ENGINE_TMPFAIL;
2403         break;
2404     case NOT_FOUND:
2405         ret = ENGINE_KEY_ENOENT;
2406         if (v) {
2407             queueDirty(vb, v, &lh, tapBackfill);
2408         }
2409         break;
2410     case WAS_DIRTY:
2411     case WAS_CLEAN:
2412         queueDirty(vb, v, &lh, tapBackfill);
2413         break;
2414     case NEED_BG_FETCH:
2415         // We already figured out if a bg fetch is requred for a full-evicted
2416         // item above.
2417         abort();
2418     }
2419     return ret;
2420 }
2421
2422 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
2423                                                         const std::string &key,
2424                                                         uint64_t* cas,
2425                                                         uint16_t vbucket,
2426                                                         const void *cookie,
2427                                                         bool force,
2428                                                         ItemMetaData *itemMeta,
2429                                                         bool tapBackfill,
2430                                                         bool genBySeqno,
2431                                                         uint64_t bySeqno,
2432                                                         bool isReplication)
2433 {
2434     RCPtr<VBucket> vb = getVBucket(vbucket);
2435     if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2436         ++stats.numNotMyVBuckets;
2437         return ENGINE_NOT_MY_VBUCKET;
2438     } else if(vb->getState() == vbucket_state_replica && !force) {
2439         ++stats.numNotMyVBuckets;
2440         return ENGINE_NOT_MY_VBUCKET;
2441     } else if(vb->getState() == vbucket_state_pending && !force) {
2442         if (vb->addPendingOp(cookie)) {
2443             return ENGINE_EWOULDBLOCK;
2444         }
2445     }
2446
2447     int bucket_num(0);
2448     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2449     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2450     if (!force) { // Need conflict resolution.
2451         if (v)  {
2452             if (v->isTempInitialItem()) {
2453                 bgFetch(key, vbucket, -1, cookie, true);
2454                 return ENGINE_EWOULDBLOCK;
2455             }
2456             if (!conflictResolver->resolve(v, *itemMeta, true)) {
2457                 ++stats.numOpsDelMetaResolutionFailed;
2458                 return ENGINE_KEY_EEXISTS;
2459             }
2460         } else {
2461             // Item is 1) deleted or not existent in the value eviction case OR
2462             // 2) deleted or evicted in the full eviction.
2463             return addTempItemForBgFetch(lh, bucket_num, key, vb,
2464                                          cookie, true, isReplication);
2465         }
2466     } else {
2467         if (!v) {
2468             // Create a temp item and delete it below as it is a force deletion
2469             add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2470                                                         eviction_policy,
2471                                                         isReplication);
2472             if (rv == ADD_NOMEM) {
2473                 return ENGINE_ENOMEM;
2474             }
2475             v = vb->ht.unlocked_find(key, bucket_num, true, false);
2476             v->setStoredValueState(StoredValue::state_deleted_key);
2477         } else if (v->isTempInitialItem()) {
2478             v->setStoredValueState(StoredValue::state_deleted_key);
2479         }
2480     }
2481
2482     if (v && v->isLocked(ep_current_time()) &&
2483         (vb->getState() == vbucket_state_replica ||
2484          vb->getState() == vbucket_state_pending)) {
2485         v->unlock();
2486     }
2487     mutation_type_t delrv;
2488     delrv = vb->ht.unlocked_softDelete(v, *cas, *itemMeta,
2489                                        eviction_policy, true);
2490     *cas = v ? v->getCas() : 0;
2491
2492     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2493     switch (delrv) {
2494     case NOMEM:
2495         ret = ENGINE_ENOMEM;
2496         break;
2497     case INVALID_VBUCKET:
2498         ret = ENGINE_NOT_MY_VBUCKET;
2499         break;
2500     case INVALID_CAS:
2501         ret = ENGINE_KEY_EEXISTS;
2502         break;
2503     case IS_LOCKED:
2504         ret = ENGINE_TMPFAIL;
2505         break;
2506     case NOT_FOUND:
2507         ret = ENGINE_KEY_ENOENT;
2508         break;
2509     case WAS_DIRTY:
2510     case WAS_CLEAN:
2511         if (!genBySeqno) {
2512             v->setBySeqno(bySeqno);
2513         }
2514         queueDirty(vb, v, &lh, tapBackfill, true, genBySeqno);
2515         break;
2516     case NEED_BG_FETCH:
2517         lh.unlock();
2518         bgFetch(key, vbucket, -1, cookie, true);
2519         ret = ENGINE_EWOULDBLOCK;
2520     }
2521
2522     return ret;
2523 }
2524
2525 void EventuallyPersistentStore::reset() {
2526     std::vector<int> buckets = vbMap.getBuckets();
2527     std::vector<int>::iterator it;
2528     for (it = buckets.begin(); it != buckets.end(); ++it) {
2529         RCPtr<VBucket> vb = getVBucket(*it);
2530         if (vb) {
2531             LockHolder lh(vb_mutexes[vb->getId()]);
2532             vb->ht.clear();
2533             vb->checkpointManager.clear(vb->getState());
2534             vb->resetStats();
2535             vb->setCurrentSnapshot(0, 0);
2536         }
2537     }
2538
2539     bool inverse = false;
2540     if (diskFlushAll.compare_exchange_strong(inverse, true)) {
2541         ++stats.diskQueueSize;
2542         // wake up (notify) one flusher is good enough for diskFlushAll
2543         vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
2544     }
2545 }
2546
2547 /**
2548  * Callback invoked after persisting an item from memory to disk.
2549  *
2550  * This class exists to create a closure around a few variables within
2551  * EventuallyPersistentStore::flushOne so that an object can be
2552  * requeued in case of failure to store in the underlying layer.
2553  */
2554 class PersistenceCallback : public Callback<mutation_result>,
2555                             public Callback<int> {
2556 public:
2557
2558     PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
2559                         EventuallyPersistentStore *st, EPStats *s, uint64_t c)
2560         : queuedItem(qi), vbucket(vb), store(st), stats(s), cas(c) {
2561         cb_assert(vb);
2562         cb_assert(s);
2563     }
2564
2565     // This callback is invoked for set only.
2566     void callback(mutation_result &value) {
2567         if (value.first == 1) {
2568             int bucket_num(0);
2569             LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2570                                                         &bucket_num);
2571             StoredValue *v = store->fetchValidValue(vbucket,
2572                                                     queuedItem->getKey(),
2573                                                     bucket_num, true, false);
2574             if (v) {
2575                 if (v->getCas() == cas) {
2576                     // mark this item clean only if current and stored cas
2577                     // value match
2578                     v->markClean();
2579                 }
2580                 if (v->isNewCacheItem()) {
2581                     if (value.second) {
2582                         // Insert in value-only or full eviction mode.
2583                         ++vbucket->opsCreate;
2584                         vbucket->incrMetaDataDisk(*queuedItem);
2585                     } else { // Update in full eviction mode.
2586                         --vbucket->ht.numTotalItems;
2587                         ++vbucket->opsUpdate;
2588                     }
2589                     v->setNewCacheItem(false);
2590                 } else { // Update in value-only or full eviction mode.
2591                     ++vbucket->opsUpdate;
2592                 }
2593             }
2594
2595             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2596             stats->decrDiskQueueSize(1);
2597             stats->totalPersisted++;
2598         } else {
2599             // If the return was 0 here, we're in a bad state because
2600             // we do not know the rowid of this object.
2601             if (value.first == 0) {
2602                 int bucket_num(0);
2603                 LockHolder lh = vbucket->ht.getLockedBucket(
2604                                            queuedItem->getKey(), &bucket_num);
2605                 StoredValue *v = store->fetchValidValue(vbucket,
2606                                                         queuedItem->getKey(),
2607                                                         bucket_num, true,
2608                                                         false);
2609                 if (v) {
2610                     std::stringstream ss;
2611                     ss << "Persisting ``" << queuedItem->getKey() << "'' on vb"
2612                        << queuedItem->getVBucketId() << " (rowid="
2613                        << v->getBySeqno() << ") returned 0 updates\n";
2614                     LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2615                 } else {
2616                     LOG(EXTENSION_LOG_WARNING,
2617                         "Error persisting now missing ``%s'' from vb%d",
2618                         queuedItem->getKey().c_str(),
2619                         queuedItem->getVBucketId());
2620                 }
2621
2622                 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2623                 stats->decrDiskQueueSize(1);
2624             } else {
2625                 std::stringstream ss;
2626                 ss <<
2627                 "Fatal error in persisting SET ``" <<
2628                 queuedItem->getKey() << "'' on vb "
2629                    << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2630                 LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2631                 redirty();
2632             }
2633         }
2634     }
2635
2636     // This callback is invoked for deletions only.
2637     //
2638     // The boolean indicates whether the underlying storage
2639     // successfully deleted the item.
2640     void callback(int &value) {
2641         // > 1 would be bad.  We were only trying to delete one row.
2642         cb_assert(value < 2);
2643         // -1 means fail
2644         // 1 means we deleted one row
2645         // 0 means we did not delete a row, but did not fail (did not exist)
2646         if (value >= 0) {
2647             // We have succesfully removed an item from the disk, we
2648             // may now remove it from the hash table.
2649             int bucket_num(0);
2650             LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2651                                                         &bucket_num);
2652             StoredValue *v = store->fetchValidValue(vbucket,
2653                                                     queuedItem->getKey(),
2654                                                     bucket_num, true, false);
2655             if (v && v->isDeleted()) {
2656                 bool newCacheItem = v->isNewCacheItem();
2657                 bool deleted = vbucket->ht.unlocked_del(queuedItem->getKey(),
2658                                                         bucket_num);
2659                 cb_assert(deleted);
2660                 if (newCacheItem && value > 0) {
2661                     // Need to decrement the item counter again for an item that
2662                     // exists on DB file, but not in memory (i.e., full eviction),
2663                     // because we created the temp item in memory and incremented
2664                     // the item counter when a deletion is pushed in the queue.
2665                     --vbucket->ht.numTotalItems;
2666                 }
2667             }
2668
2669             if (value > 0) {
2670                 ++stats->totalPersisted;
2671                 ++vbucket->opsDelete;
2672             }
2673             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2674             stats->decrDiskQueueSize(1);
2675             vbucket->decrMetaDataDisk(*queuedItem);
2676         } else {
2677             std::stringstream ss;
2678             ss << "Fatal error in persisting DELETE ``" <<
2679             queuedItem->getKey() << "'' on vb "
2680                << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2681             LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2682             redirty();
2683         }
2684     }
2685
2686 private:
2687
2688     void redirty() {
2689         if (store->vbMap.isBucketDeletion(vbucket->getId())) {
2690             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2691             stats->decrDiskQueueSize(1);
2692             return;
2693         }
2694         ++stats->flushFailed;
2695         store->invokeOnLockedStoredValue(queuedItem->getKey(),
2696                                          queuedItem->getVBucketId(),
2697                                          &StoredValue::reDirty);
2698         vbucket->rejectQueue.push(queuedItem);
2699     }
2700
2701     const queued_item queuedItem;
2702     RCPtr<VBucket> &vbucket;
2703     EventuallyPersistentStore *store;
2704     EPStats *stats;
2705     uint64_t cas;
2706     DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
2707 };
2708
2709 void EventuallyPersistentStore::flushOneDeleteAll() {
2710     for (size_t i = 0; i < vbMap.getSize(); ++i) {
2711         RCPtr<VBucket> vb = getVBucket(i);
2712         if (vb) {
2713             LockHolder lh(vb_mutexes[vb->getId()]);
2714             getRWUnderlying(vb->getId())->reset(i);
2715         }
2716     }
2717
2718     bool inverse = true;
2719     diskFlushAll.compare_exchange_strong(inverse, false);
2720     stats.decrDiskQueueSize(1);
2721 }
2722
2723 int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
2724     KVShard *shard = vbMap.getShard(vbid);
2725     if (diskFlushAll) {
2726         if (shard->getId() == EP_PRIMARY_SHARD) {
2727             flushOneDeleteAll();
2728         } else {
2729             // disk flush is pending just return
2730             return 0;
2731         }
2732     }
2733
2734     if (vbMap.isBucketCreation(vbid)) {
2735         return RETRY_FLUSH_VBUCKET;
2736     }
2737
2738     int items_flushed = 0;
2739     rel_time_t flush_start = ep_current_time();
2740
2741     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
2742     if (vb) {
2743         LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
2744         if (!lh.islocked()) { // Try another bucket if this one is locked
2745             return RETRY_FLUSH_VBUCKET; // to avoid blocking flusher
2746         }
2747
2748         KVStatsCallback cb(this);
2749         std::vector<queued_item> items;
2750         KVStore *rwUnderlying = getRWUnderlying(vbid);
2751
2752         while (!vb->rejectQueue.empty()) {
2753             items.push_back(vb->rejectQueue.front());
2754             vb->rejectQueue.pop();
2755         }
2756
2757         LockHolder slh = vb->getSnapshotLock();
2758         uint64_t snapStart;
2759         uint64_t snapEnd;
2760         vb->getCurrentSnapshot_UNLOCKED(snapStart, snapEnd);
2761
2762         vb->getBackfillItems(items);
2763         vb->checkpointManager.getAllItemsForPersistence(items);
2764         slh.unlock();
2765
2766         if (!items.empty()) {
2767             while (!rwUnderlying->begin()) {
2768                 ++stats.beginFailed;
2769                 LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
2770                     "Retry in 1 sec ...");
2771                 sleep(1);
2772             }
2773             rwUnderlying->optimizeWrites(items);
2774
2775             Item *prev = NULL;
2776             uint64_t maxSeqno = 0;
2777             std::list<PersistenceCallback*> pcbs;
2778             std::vector<queued_item>::iterator it = items.begin();
2779             for(; it != items.end(); ++it) {
2780                 if ((*it)->getOperation() != queue_op_set &&
2781                     (*it)->getOperation() != queue_op_del) {
2782                     continue;
2783                 } else if (!prev || prev->getKey() != (*it)->getKey()) {
2784                     prev = (*it).get();
2785                     ++items_flushed;
2786                     PersistenceCallback *cb = flushOneDelOrSet(*it, vb);
2787                     if (cb) {
2788                         pcbs.push_back(cb);
2789                     }
2790                     maxSeqno = std::max(maxSeqno, (uint64_t)(*it)->getBySeqno());
2791                     ++stats.flusher_todo;
2792                 } else {
2793                     stats.decrDiskQueueSize(1);
2794                     vb->doStatsForFlushing(*(*it), (*it)->size());
2795                 }
2796             }
2797
2798             BlockTimer timer(&stats.diskCommitHisto, "disk_commit",
2799                              stats.timingLog);
2800             hrtime_t start = gethrtime();
2801
2802             if (vb->getState() == vbucket_state_active) {
2803                 snapStart = maxSeqno;
2804                 snapEnd = maxSeqno;
2805             }
2806
2807             while (!rwUnderlying->commit(&cb, snapStart, snapEnd)) {
2808                 ++stats.commitFailed;
2809                 LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
2810                     "1 sec...\n");
2811                 sleep(1);
2812
2813             }
2814
2815             if (vb->rejectQueue.empty()) {
2816                 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
2817                 if (highSeqno > 0 &&
2818                     highSeqno != vbMap.getPersistenceSeqno(vbid)) {
2819                     vbMap.setPersistenceSeqno(vbid, highSeqno);
2820                 }
2821             }
2822
2823             while (!pcbs.empty()) {
2824                 delete pcbs.front();
2825                 pcbs.pop_front();
2826             }
2827
2828             ++stats.flusherCommits;
2829             hrtime_t end = gethrtime();
2830             uint64_t commit_time = (end - start) / 1000000;
2831             uint64_t trans_time = (end - flush_start) / 1000000;
2832
2833             lastTransTimePerItem.store((items_flushed == 0) ? 0 :
2834                                        static_cast<double>(trans_time) /
2835                                        static_cast<double>(items_flushed));
2836             stats.commit_time.store(commit_time);
2837             stats.cumulativeCommitTime.fetch_add(commit_time);
2838             stats.cumulativeFlushTime.fetch_add(ep_current_time()
2839                                                 - flush_start);
2840             stats.flusher_todo.store(0);
2841         }
2842
2843         rwUnderlying->pendingTasks();
2844
2845         if (vb->checkpointManager.getNumCheckpoints() > 1) {
2846             wakeUpCheckpointRemover();
2847         }
2848
2849         if (vb->rejectQueue.empty()) {
2850             vb->checkpointManager.itemsPersisted();
2851             uint64_t seqno = vbMap.getPersistenceSeqno(vbid);
2852             uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
2853             vb->notifyOnPersistence(engine, seqno, true);
2854             vb->notifyOnPersistence(engine, chkid, false);
2855             if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
2856                 vbMap.setPersistenceCheckpointId(vbid, chkid);
2857             }
2858         }
2859     }
2860
2861     return items_flushed;
2862 }
2863
2864 PersistenceCallback*
2865 EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
2866                                             RCPtr<VBucket> &vb) {
2867
2868     if (!vb) {
2869         stats.decrDiskQueueSize(1);
2870         return NULL;
2871     }
2872
2873     int64_t bySeqno = qi->getBySeqno();
2874     bool deleted = qi->isDeleted();
2875     rel_time_t queued(qi->getQueuedTime());
2876
2877     int dirtyAge = ep_current_time() - queued;
2878     stats.dirtyAgeHisto.add(dirtyAge * 1000000);
2879     stats.dirtyAge.store(dirtyAge);
2880     stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
2881                                          stats.dirtyAgeHighWat.load()));
2882
2883     // Wait until the vbucket database is created by the vbucket state
2884     // snapshot task.
2885     if (vbMap.isBucketCreation(qi->getVBucketId()) ||
2886         vbMap.isBucketDeletion(qi->getVBucketId())) {
2887         vb->rejectQueue.push(qi);
2888         ++vb->opsReject;
2889         return NULL;
2890     }
2891
2892     KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
2893     if (!deleted) {
2894         // TODO: Need to separate disk_insert from disk_update because
2895         // bySeqno doesn't give us that information.
2896         BlockTimer timer(bySeqno == -1 ?
2897                          &stats.diskInsertHisto : &stats.diskUpdateHisto,
2898                          bySeqno == -1 ? "disk_insert" : "disk_update",
2899                          stats.timingLog);
2900         PersistenceCallback *cb =
2901             new PersistenceCallback(qi, vb, this, &stats, qi->getCas());
2902         rwUnderlying->set(*qi, *cb);
2903         return cb;
2904     } else {
2905         BlockTimer timer(&stats.diskDelHisto, "disk_delete",
2906                          stats.timingLog);
2907         PersistenceCallback *cb =
2908             new PersistenceCallback(qi, vb, this, &stats, 0);
2909         rwUnderlying->del(*qi, *cb);
2910         return cb;
2911     }
2912 }
2913
2914 void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
2915                                            StoredValue* v,
2916                                            LockHolder *plh,
2917                                            bool tapBackfill,
2918                                            bool notifyReplicator,
2919                                            bool genBySeqno) {
2920     if (vb) {
2921         queued_item qi(v->toItem(false, vb->getId()));
2922         bool rv;
2923         if (genBySeqno) {
2924             LockHolder slh = vb->getSnapshotLock();
2925             rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
2926                                vb->checkpointManager.queueDirty(vb, qi,
2927                                                                 genBySeqno);
2928             v->setBySeqno(qi->getBySeqno());
2929             vb->setCurrentSnapshot_UNLOCKED(qi->getBySeqno(), qi->getBySeqno());
2930         } else {
2931             rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
2932                                vb->checkpointManager.queueDirty(vb, qi,
2933                                                                 genBySeqno);
2934             v->setBySeqno(qi->getBySeqno());
2935         }
2936
2937         if (plh) {
2938             plh->unlock();
2939         }
2940
2941         if (rv) {
2942             KVShard* shard = vbMap.getShard(vb->getId());
2943             shard->getFlusher()->notifyFlushEvent();
2944
2945         }
2946         if (!tapBackfill && notifyReplicator) {
2947             engine.getTapConnMap().notifyVBConnections(vb->getId());
2948             engine.getDcpConnMap().notifyVBConnections(vb->getId(),
2949                                                        qi->getBySeqno());
2950         }
2951     }
2952 }
2953
2954 std::vector<vbucket_state *> EventuallyPersistentStore::loadVBucketState()
2955 {
2956     return getOneROUnderlying()->listPersistedVbuckets();
2957 }
2958
2959 void EventuallyPersistentStore::warmupCompleted() {
2960     // Run the vbucket state snapshot job once after the warmup
2961     scheduleVBSnapshot(Priority::VBucketPersistHighPriority);
2962
2963     if (engine.getConfiguration().getAlogPath().length() > 0) {
2964
2965         if (engine.getConfiguration().isAccessScannerEnabled()) {
2966             LockHolder lh(accessScanner.mutex);
2967             accessScanner.enabled = true;
2968             lh.unlock();
2969             LOG(EXTENSION_LOG_WARNING, "Access Scanner task enabled");
2970             size_t smin = engine.getConfiguration().getAlogSleepTime();
2971             setAccessScannerSleeptime(smin);
2972         } else {
2973             LockHolder lh(accessScanner.mutex);
2974             accessScanner.enabled = false;
2975             LOG(EXTENSION_LOG_WARNING, "Access Scanner task disabled");
2976         }
2977
2978         Configuration &config = engine.getConfiguration();
2979         config.addValueChangedListener("access_scanner_enabled",
2980                                        new EPStoreValueChangeListener(*this));
2981         config.addValueChangedListener("alog_sleep_time",
2982                                        new EPStoreValueChangeListener(*this));
2983         config.addValueChangedListener("alog_task_time",
2984                                        new EPStoreValueChangeListener(*this));
2985     }
2986
2987     // "0" sleep_time means that the first snapshot task will be executed
2988     // right after warmup. Subsequent snapshot tasks will be scheduled every
2989     // 60 sec by default.
2990     ExecutorPool *iom = ExecutorPool::get();
2991     ExTask task = new StatSnap(&engine, Priority::StatSnapPriority, 0, false);
2992     statsSnapshotTaskId = iom->schedule(task, WRITER_TASK_IDX);
2993 }
2994
2995 bool EventuallyPersistentStore::maybeEnableTraffic()
2996 {
2997     // @todo rename.. skal vaere isTrafficDisabled elns
2998     double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
2999     double maxSize = static_cast<double>(stats.getMaxDataSize());
3000
3001     if (memoryUsed  >= stats.mem_low_wat) {
3002         LOG(EXTENSION_LOG_WARNING,
3003             "Total memory use reached to the low water mark, stop warmup");
3004         return true;
3005     } else if (memoryUsed > (maxSize * stats.warmupMemUsedCap)) {
3006         LOG(EXTENSION_LOG_WARNING,
3007                 "Enough MB of data loaded to enable traffic");
3008         return true;
3009     } else if (eviction_policy == VALUE_ONLY &&
3010                stats.warmedUpValues >=
3011                                (stats.warmedUpKeys * stats.warmupNumReadCap)) {
3012         // Let ep-engine think we're done with the warmup phase
3013         // (we should refactor this into "enableTraffic")
3014         LOG(EXTENSION_LOG_WARNING,
3015             "Enough number of items loaded to enable traffic");
3016         return true;
3017     } else if (eviction_policy == FULL_EVICTION &&
3018                stats.warmedUpValues >=
3019                             (warmupTask->getEstimatedItemCount() *
3020                              stats.warmupNumReadCap)) {
3021         // In case of FULL EVICTION, warmed up keys always matches the number
3022         // of warmed up values, therefore for honoring the min_item threshold
3023         // in this scenario, we can consider warmup's estimated item count.
3024         LOG(EXTENSION_LOG_WARNING,
3025             "Enough number of items loaded to enable traffic");
3026         return true;
3027     }
3028     return false;
3029 }
3030
3031 bool EventuallyPersistentStore::isWarmingUp() {
3032     return !warmupTask->isComplete();
3033 }
3034
3035 void EventuallyPersistentStore::stopWarmup(void)
3036 {
3037     // forcefully stop current warmup task
3038     if (isWarmingUp()) {
3039         LOG(EXTENSION_LOG_WARNING, "Stopping warmup while engine is loading "
3040             "data from underlying storage, shutdown = %s\n",
3041             stats.isShutdown ? "yes" : "no");
3042         warmupTask->stop();
3043     }
3044 }
3045
3046 bool EventuallyPersistentStore::isMemoryUsageTooHigh() {
3047     double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
3048     double maxSize = static_cast<double>(stats.getMaxDataSize());
3049     return memoryUsed > (maxSize * backfillMemoryThreshold);
3050 }
3051
3052 void EventuallyPersistentStore::setBackfillMemoryThreshold(
3053                                                 double threshold) {
3054     backfillMemoryThreshold = threshold;
3055 }
3056
3057 void EventuallyPersistentStore::setExpiryPagerSleeptime(size_t val) {
3058     LockHolder lh(expiryPager.mutex);
3059
3060     if (expiryPager.sleeptime != 0) {
3061         ExecutorPool::get()->cancel(expiryPager.task);
3062     }
3063
3064     expiryPager.sleeptime = val;
3065     if (val != 0) {
3066         ExTask expTask = new ExpiredItemPager(&engine, stats,
3067                                                 expiryPager.sleeptime);
3068         expiryPager.task = ExecutorPool::get()->schedule(expTask,
3069                                                         NONIO_TASK_IDX);
3070     }
3071 }
3072
3073 void EventuallyPersistentStore::enableAccessScannerTask() {
3074     LockHolder lh(accessScanner.mutex);
3075     if (!accessScanner.enabled) {
3076         accessScanner.enabled = true;
3077
3078         if (accessScanner.sleeptime != 0) {
3079             ExecutorPool::get()->cancel(accessScanner.task);
3080         }
3081
3082         size_t alogSleepTime = engine.getConfiguration().getAlogSleepTime();
3083         accessScanner.sleeptime = alogSleepTime * 60;
3084         if (accessScanner.sleeptime != 0) {
3085             ExTask task = new AccessScanner(*this, stats,
3086                                             Priority::AccessScannerPriority,
3087                                             accessScanner.sleeptime);
3088             accessScanner.task = ExecutorPool::get()->schedule(task,
3089                                                                AUXIO_TASK_IDX);
3090             struct timeval tv;
3091             gettimeofday(&tv, NULL);
3092             advance_tv(tv, accessScanner.sleeptime);
3093             stats.alogTime.store(tv.tv_sec);
3094         } else {
3095             LOG(EXTENSION_LOG_WARNING, "Did not enable access scanner task, "
3096                                        "as alog_sleep_time is set to zero!");
3097         }
3098     } else {
3099         LOG(EXTENSION_LOG_DEBUG, "Access scanner already enabled!");
3100     }
3101 }
3102
3103 void EventuallyPersistentStore::disableAccessScannerTask() {
3104     LockHolder lh(accessScanner.mutex);
3105     if (accessScanner.enabled) {
3106         ExecutorPool::get()->cancel(accessScanner.task);
3107         accessScanner.sleeptime = 0;
3108         accessScanner.enabled = false;
3109     } else {
3110         LOG(EXTENSION_LOG_DEBUG, "Access scanner already disabled!");
3111     }
3112 }
3113
3114 void EventuallyPersistentStore::setAccessScannerSleeptime(size_t val) {
3115     LockHolder lh(accessScanner.mutex);
3116
3117     if (accessScanner.enabled) {
3118         if (accessScanner.sleeptime != 0) {
3119             ExecutorPool::get()->cancel(accessScanner.task);
3120         }
3121
3122         // store sleeptime in seconds
3123         accessScanner.sleeptime = val * 60;
3124         if (accessScanner.sleeptime != 0) {
3125             ExTask task = new AccessScanner(*this, stats,
3126                                             Priority::AccessScannerPriority,
3127                                             accessScanner.sleeptime);
3128             accessScanner.task = ExecutorPool::get()->schedule(task,
3129                                                                AUXIO_TASK_IDX);
3130
3131             struct timeval tv;
3132             gettimeofday(&tv, NULL);
3133             advance_tv(tv, accessScanner.sleeptime);
3134             stats.alogTime.store(tv.tv_sec);
3135         }
3136     }
3137 }
3138
3139 void EventuallyPersistentStore::resetAccessScannerStartTime() {
3140     LockHolder lh(accessScanner.mutex);
3141
3142     if (accessScanner.enabled) {
3143         if (accessScanner.sleeptime != 0) {
3144             ExecutorPool::get()->cancel(accessScanner.task);
3145             // re-schedule task according to the new task start hour
3146             ExTask task = new AccessScanner(*this, stats,
3147                                             Priority::AccessScannerPriority,
3148                                             accessScanner.sleeptime);
3149             accessScanner.task = ExecutorPool::get()->schedule(task,
3150                                                                AUXIO_TASK_IDX);
3151
3152             struct timeval tv;
3153             gettimeofday(&tv, NULL);
3154             advance_tv(tv, accessScanner.sleeptime);
3155             stats.alogTime.store(tv.tv_sec);
3156         }
3157     }
3158 }
3159
3160 void EventuallyPersistentStore::visit(VBucketVisitor &visitor)
3161 {
3162     size_t maxSize = vbMap.getSize();
3163     cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
3164     for (size_t i = 0; i < maxSize; ++i) {
3165         uint16_t vbid = static_cast<uint16_t>(i);
3166         RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3167         if (vb) {
3168             bool wantData = visitor.visitBucket(vb);
3169             // We could've lost this along the way.
3170             if (wantData) {
3171                 vb->ht.visit(visitor);
3172             }
3173         }
3174     }
3175     visitor.complete();
3176 }
3177
3178 VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s,
3179                          shared_ptr<VBucketVisitor> v,
3180                          const char *l, const Priority &p, double sleep) :
3181     GlobalTask(&s->getEPEngine(), p, 0, false), store(s),
3182     visitor(v), label(l), sleepTime(sleep), currentvb(0)
3183 {
3184     const VBucketFilter &vbFilter = visitor->getVBucketFilter();
3185     size_t maxSize = store->vbMap.getSize();
3186     cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
3187     for (size_t i = 0; i < maxSize; ++i) {
3188         uint16_t vbid = static_cast<uint16_t>(i);
3189         RCPtr<VBucket> vb = store->vbMap.getBucket(vbid);
3190         if (vb && vbFilter(vbid)) {
3191             vbList.push(vbid);
3192         }
3193     }
3194 }
3195
3196 bool VBCBAdaptor::run(void) {
3197     if (!vbList.empty()) {
3198         currentvb.store(vbList.front());
3199         RCPtr<VBucket> vb = store->vbMap.getBucket(currentvb);
3200         if (vb) {
3201             if (visitor->pauseVisitor()) {
3202                 snooze(sleepTime);
3203                 return true;
3204             }
3205             if (visitor->visitBucket(vb)) {
3206                 vb->ht.visit(*visitor);
3207             }
3208         }
3209         vbList.pop();
3210     }
3211
3212     bool isdone = vbList.empty();
3213     if (isdone) {
3214         visitor->complete();
3215     }
3216     return !isdone;
3217 }
3218
3219 VBucketVisitorTask::VBucketVisitorTask(EventuallyPersistentStore *s,
3220                                        shared_ptr<VBucketVisitor> v,
3221                                        uint16_t sh, const char *l,
3222                                        double sleep, bool shutdown):
3223     GlobalTask(&(s->getEPEngine()), Priority::AccessScannerPriority,
3224                0, shutdown),
3225     store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
3226     shardID(sh)
3227 {
3228     const VBucketFilter &vbFilter = visitor->getVBucketFilter();
3229     std::vector<int> vbs = store->vbMap.getShard(shardID)->getVBuckets();
3230     cb_assert(vbs.size() <= std::numeric_limits<uint16_t>::max());
3231     std::vector<int>::iterator it;
3232     for (it = vbs.begin(); it != vbs.end(); ++it) {
3233         uint16_t vbid = static_cast<uint16_t>(*it);
3234         RCPtr<VBucket> vb = store->vbMap.getBucket(vbid);
3235         if (vb && vbFilter(vbid)) {
3236             vbList.push(vbid);
3237         }
3238     }
3239 }
3240
3241 bool VBucketVisitorTask::run() {
3242     if (!vbList.empty()) {
3243         currentvb = vbList.front();
3244         RCPtr<VBucket> vb = store->vbMap.getBucket(currentvb);
3245         if (vb) {
3246             if (visitor->pauseVisitor()) {
3247                 snooze(sleepTime);
3248                 return true;
3249             }
3250             if (visitor->visitBucket(vb)) {
3251                 vb->ht.visit(*visitor);
3252             }
3253         }
3254         vbList.pop();
3255     }
3256
3257     bool isDone = vbList.empty();
3258     if (isDone) {
3259         visitor->complete();
3260     }
3261     return !isDone;
3262 }
3263
3264 void EventuallyPersistentStore::resetUnderlyingStats(void)
3265 {
3266     for (size_t i = 0; i < vbMap.numShards; i++) {
3267         KVShard *shard = vbMap.shards[i];
3268         shard->getRWUnderlying()->resetStats();
3269         shard->getROUnderlying()->resetStats();
3270     }
3271
3272     for (size_t i = 0; i < MAX_TYPE_ID; i++) {
3273         stats.schedulingHisto[i].reset();
3274         stats.taskRuntimeHisto[i].reset();
3275     }
3276 }
3277
3278 void EventuallyPersistentStore::addKVStoreStats(ADD_STAT add_stat,
3279                                                 const void* cookie) {
3280     for (size_t i = 0; i < vbMap.numShards; i++) {
3281         std::stringstream rwPrefix;
3282         std::stringstream roPrefix;
3283         rwPrefix << "rw_" << i;
3284         roPrefix << "ro_" << i;
3285         vbMap.shards[i]->getRWUnderlying()->addStats(rwPrefix.str(), add_stat,
3286                                                      cookie);
3287         vbMap.shards[i]->getROUnderlying()->addStats(roPrefix.str(), add_stat,
3288                                                      cookie);
3289     }
3290 }
3291
3292 void EventuallyPersistentStore::addKVStoreTimingStats(ADD_STAT add_stat,
3293                                                       const void* cookie) {
3294     for (size_t i = 0; i < vbMap.numShards; i++) {
3295         std::stringstream rwPrefix;
3296         std::stringstream roPrefix;
3297         rwPrefix << "rw_" << i;
3298         roPrefix << "ro_" << i;
3299         vbMap.shards[i]->getRWUnderlying()->addTimingStats(rwPrefix.str(),
3300                                                            add_stat,
3301                                                            cookie);
3302         vbMap.shards[i]->getROUnderlying()->addTimingStats(roPrefix.str(),
3303                                                            add_stat,
3304                                                            cookie);
3305     }
3306 }
3307
3308 KVStore *EventuallyPersistentStore::getOneROUnderlying(void) {
3309     return vbMap.getShard(EP_PRIMARY_SHARD)->getROUnderlying();
3310 }
3311
3312 KVStore *EventuallyPersistentStore::getOneRWUnderlying(void) {
3313     return vbMap.getShard(EP_PRIMARY_SHARD)->getRWUnderlying();
3314 }
3315
3316 ENGINE_ERROR_CODE
3317 EventuallyPersistentStore::rollback(uint16_t vbid,
3318                                     uint64_t rollbackSeqno) {
3319     LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
3320     if (!lh.islocked()) {
3321         return ENGINE_TMPFAIL; // Reschedule a vbucket rollback task.
3322     }
3323
3324     if (rollbackSeqno != 0) {
3325         shared_ptr<RollbackCB> cb(new RollbackCB(engine));
3326         KVStore* rwUnderlying = vbMap.getShard(vbid)->getRWUnderlying();
3327         RollbackResult result = rwUnderlying->rollback(vbid, rollbackSeqno, cb);
3328
3329         if (result.success) {
3330             RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3331             vb->failovers->pruneEntries(result.highSeqno);
3332             vb->checkpointManager.clear(vb->getState());
3333             vb->checkpointManager.setBySeqno(result.highSeqno);
3334             vb->setCurrentSnapshot(result.snapStartSeqno, result.snapEndSeqno);
3335             return ENGINE_SUCCESS;
3336         }
3337     }
3338
3339     if (resetVBucket(vbid)) {
3340         return ENGINE_SUCCESS;
3341     }
3342     return ENGINE_NOT_MY_VBUCKET;
3343 }