1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2010 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
32 #include "access_scanner.h"
33 #include "checkpoint_remover.h"
34 #include "conflict_resolution.h"
35 #include "defragmenter.h"
37 #include "ep_engine.h"
38 #include "failover-table.h"
40 #include "htresizer.h"
44 #include "mutation_log.h"
47 #include "tapthrottle.h"
49 class StatsValueChangeListener : public ValueChangedListener {
51 StatsValueChangeListener(EPStats &st, EventuallyPersistentStore &str)
52 : stats(st), store(str) {
56 virtual void sizeValueChanged(const std::string &key, size_t value) {
57 if (key.compare("max_size") == 0) {
58 stats.setMaxDataSize(value);
59 store.getEPEngine().getDcpConnMap(). \
60 updateMaxActiveSnoozingBackfills(value);
61 size_t low_wat = static_cast<size_t>
62 (static_cast<double>(value) * 0.75);
63 size_t high_wat = static_cast<size_t>(
64 static_cast<double>(value) * 0.85);
65 stats.mem_low_wat.store(low_wat);
66 stats.mem_high_wat.store(high_wat);
67 } else if (key.compare("mem_low_wat") == 0) {
68 stats.mem_low_wat.store(value);
69 } else if (key.compare("mem_high_wat") == 0) {
70 stats.mem_high_wat.store(value);
71 } else if (key.compare("tap_throttle_threshold") == 0) {
72 stats.tapThrottleThreshold.store(
73 static_cast<double>(value) / 100.0);
74 } else if (key.compare("warmup_min_memory_threshold") == 0) {
75 stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
76 } else if (key.compare("warmup_min_items_threshold") == 0) {
77 stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
79 LOG(EXTENSION_LOG_WARNING,
80 "Failed to change value for unknown variable, %s\n",
87 EventuallyPersistentStore &store;
91 * A configuration value changed listener that responds to ep-engine
92 * parameter changes by invoking engine-specific methods on
93 * configuration change events.
95 class EPStoreValueChangeListener : public ValueChangedListener {
97 EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
100 virtual void sizeValueChanged(const std::string &key, size_t value) {
101 if (key.compare("bg_fetch_delay") == 0) {
102 store.setBGFetchDelay(static_cast<uint32_t>(value));
103 } else if (key.compare("compaction_write_queue_cap") == 0) {
104 store.setCompactionWriteQueueCap(value);
105 } else if (key.compare("exp_pager_stime") == 0) {
106 store.setExpiryPagerSleeptime(value);
107 } else if (key.compare("alog_sleep_time") == 0) {
108 store.setAccessScannerSleeptime(value);
109 } else if (key.compare("alog_task_time") == 0) {
110 store.resetAccessScannerStartTime();
111 } else if (key.compare("mutation_mem_threshold") == 0) {
112 double mem_threshold = static_cast<double>(value) / 100;
113 StoredValue::setMutationMemoryThreshold(mem_threshold);
114 } else if (key.compare("backfill_mem_threshold") == 0) {
115 double backfill_threshold = static_cast<double>(value) / 100;
116 store.setBackfillMemoryThreshold(backfill_threshold);
117 } else if (key.compare("compaction_exp_mem_threshold") == 0) {
118 store.setCompactionExpMemThreshold(value);
119 } else if (key.compare("tap_throttle_queue_cap") == 0) {
120 store.getEPEngine().getTapThrottle().setQueueCap(value);
121 } else if (key.compare("tap_throttle_cap_pcnt") == 0) {
122 store.getEPEngine().getTapThrottle().setCapPercent(value);
124 LOG(EXTENSION_LOG_WARNING,
125 "Failed to change value for unknown variable, %s\n",
130 virtual void booleanValueChanged(const std::string &key, bool value) {
131 if (key.compare("access_scanner_enabled") == 0) {
133 store.enableAccessScannerTask();
135 store.disableAccessScannerTask();
137 } else if (key.compare("bfilter_enabled") == 0) {
138 store.setAllBloomFilters(value);
142 virtual void floatValueChanged(const std::string &key, float value) {
143 if (key.compare("bfilter_residency_threshold") == 0) {
144 store.setBfiltersResidencyThreshold(value);
149 EventuallyPersistentStore &store;
152 class VBucketMemoryDeletionTask : public GlobalTask {
154 VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
155 RCPtr<VBucket> &vb, double delay) :
157 TaskId::VBucketMemoryDeletionTask, delay, true),
158 e(eng), vbucket(vb), vbid(vb->getId()) { }
160 std::string getDescription() {
161 std::stringstream ss;
162 ss << "Removing (dead) vbucket " << vbid << " from memory";
167 vbucket->notifyAllPendingConnsFailed(e);
174 EventuallyPersistentEngine &e;
175 RCPtr<VBucket> vbucket;
179 class PendingOpsNotification : public GlobalTask {
181 PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
182 GlobalTask(&e, TaskId::PendingOpsNotification, 0, false),
183 engine(e), vbucket(vb) { }
185 std::string getDescription() {
186 std::stringstream ss;
187 ss << "Notify pending operations for vbucket " << vbucket->getId();
192 vbucket->fireAllOps(engine);
197 EventuallyPersistentEngine &engine;
198 RCPtr<VBucket> vbucket;
201 EventuallyPersistentStore::EventuallyPersistentStore(
202 EventuallyPersistentEngine &theEngine) :
203 engine(theEngine), stats(engine.getEpStats()),
204 vbMap(theEngine.getConfiguration(), *this),
205 defragmenterTask(NULL),
207 diskFlushAll(false), bgFetchDelay(0),
208 backfillMemoryThreshold(0.95),
209 statsSnapshotTaskId(0), lastTransTimePerItem(0)
211 cachedResidentRatio.activeRatio.store(0);
212 cachedResidentRatio.replicaRatio.store(0);
214 Configuration &config = engine.getConfiguration();
215 MutationLog *shardlog;
216 for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
219 shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
221 engine.getConfiguration().getAlogBlockSize());
222 accessLog.push_back(shardlog);
225 storageProperties = new StorageProperties(true, true, true, true);
227 stats.schedulingHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
228 stats.taskRuntimeHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
230 for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
231 stats.schedulingHisto[i].reset();
232 stats.taskRuntimeHisto[i].reset();
235 ExecutorPool::get()->registerBucket(ObjectRegistry::getCurrentEngine());
237 size_t num_vbs = config.getMaxVbuckets();
238 vb_mutexes = new Mutex[num_vbs];
239 schedule_vbstate_persist = new AtomicValue<bool>[num_vbs];
240 for (size_t i = 0; i < num_vbs; ++i) {
241 schedule_vbstate_persist[i] = false;
244 stats.memOverhead = sizeof(EventuallyPersistentStore);
246 if (config.getConflictResolutionType().compare("seqno") == 0) {
247 conflictResolver = new ConflictResolution();
250 stats.setMaxDataSize(config.getMaxSize());
251 config.addValueChangedListener("max_size",
252 new StatsValueChangeListener(stats, *this));
253 getEPEngine().getDcpConnMap().updateMaxActiveSnoozingBackfills(config.getMaxSize());
255 stats.mem_low_wat.store(config.getMemLowWat());
256 config.addValueChangedListener("mem_low_wat",
257 new StatsValueChangeListener(stats, *this));
259 stats.mem_high_wat.store(config.getMemHighWat());
260 config.addValueChangedListener("mem_high_wat",
261 new StatsValueChangeListener(stats, *this));
263 stats.tapThrottleThreshold.store(static_cast<double>
264 (config.getTapThrottleThreshold())
266 config.addValueChangedListener("tap_throttle_threshold",
267 new StatsValueChangeListener(stats, *this));
269 stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
270 config.addValueChangedListener("tap_throttle_queue_cap",
271 new EPStoreValueChangeListener(*this));
272 config.addValueChangedListener("tap_throttle_cap_pcnt",
273 new EPStoreValueChangeListener(*this));
275 setBGFetchDelay(config.getBgFetchDelay());
276 config.addValueChangedListener("bg_fetch_delay",
277 new EPStoreValueChangeListener(*this));
279 stats.warmupMemUsedCap.store(static_cast<double>
280 (config.getWarmupMinMemoryThreshold()) / 100.0);
281 config.addValueChangedListener("warmup_min_memory_threshold",
282 new StatsValueChangeListener(stats, *this));
283 stats.warmupNumReadCap.store(static_cast<double>
284 (config.getWarmupMinItemsThreshold()) / 100.0);
285 config.addValueChangedListener("warmup_min_items_threshold",
286 new StatsValueChangeListener(stats, *this));
288 double mem_threshold = static_cast<double>
289 (config.getMutationMemThreshold()) / 100;
290 StoredValue::setMutationMemoryThreshold(mem_threshold);
291 config.addValueChangedListener("mutation_mem_threshold",
292 new EPStoreValueChangeListener(*this));
294 double backfill_threshold = static_cast<double>
295 (config.getBackfillMemThreshold()) / 100;
296 setBackfillMemoryThreshold(backfill_threshold);
297 config.addValueChangedListener("backfill_mem_threshold",
298 new EPStoreValueChangeListener(*this));
300 config.addValueChangedListener("bfilter_enabled",
301 new EPStoreValueChangeListener(*this));
303 bfilterResidencyThreshold = config.getBfilterResidencyThreshold();
304 config.addValueChangedListener("bfilter_residency_threshold",
305 new EPStoreValueChangeListener(*this));
307 compactionExpMemThreshold = config.getCompactionExpMemThreshold();
308 config.addValueChangedListener("compaction_exp_mem_threshold",
309 new EPStoreValueChangeListener(*this));
311 compactionWriteQueueCap = config.getCompactionWriteQueueCap();
312 config.addValueChangedListener("compaction_write_queue_cap",
313 new EPStoreValueChangeListener(*this));
315 const std::string &policy = config.getItemEvictionPolicy();
316 if (policy.compare("value_only") == 0) {
317 eviction_policy = VALUE_ONLY;
319 eviction_policy = FULL_EVICTION;
322 warmupTask = new Warmup(this);
325 bool EventuallyPersistentStore::initialize() {
326 // We should nuke everything unless we want warmup
327 Configuration &config = engine.getConfiguration();
328 if (!config.isWarmup()) {
332 if (!startFlusher()) {
333 LOG(EXTENSION_LOG_WARNING,
334 "FATAL: Failed to create and start flushers");
337 if (!startBgFetcher()) {
338 LOG(EXTENSION_LOG_WARNING,
339 "FATAL: Failed to create and start bgfetchers");
345 if (config.isFailpartialwarmup() && stats.warmOOM > 0) {
346 LOG(EXTENSION_LOG_WARNING,
347 "Warmup failed to load %d records due to OOM, exiting.\n",
348 static_cast<unsigned int>(stats.warmOOM));
352 itmpTask = new ItemPager(&engine, stats);
353 ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
355 size_t expiryPagerSleeptime = config.getExpPagerStime();
356 setExpiryPagerSleeptime(expiryPagerSleeptime);
357 config.addValueChangedListener("exp_pager_stime",
358 new EPStoreValueChangeListener(*this));
360 ExTask htrTask = new HashtableResizerTask(this, 10);
361 ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
363 size_t checkpointRemoverInterval = config.getChkRemoverStime();
364 chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
365 checkpointRemoverInterval);
366 ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
368 ExTask vbSnapshotTask = new DaemonVBSnapshotTask(&engine);
369 ExecutorPool::get()->schedule(vbSnapshotTask, WRITER_TASK_IDX);
371 ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
372 ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
375 /* Only create the defragmenter task if we have an underlying memory
376 * allocator which can facilitate defragmenting memory.
378 defragmenterTask = new DefragmenterTask(&engine, stats);
379 ExecutorPool::get()->schedule(defragmenterTask, NONIO_TASK_IDX);
385 EventuallyPersistentStore::~EventuallyPersistentStore() {
388 ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX,
389 stats.forceShutdown);
391 ExecutorPool::get()->cancel(statsSnapshotTaskId);
392 LockHolder lh(accessScanner.mutex);
393 ExecutorPool::get()->cancel(accessScanner.task);
397 ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine(),
398 stats.forceShutdown);
400 delete [] vb_mutexes;
401 delete [] schedule_vbstate_persist;
402 delete [] stats.schedulingHisto;
403 delete [] stats.taskRuntimeHisto;
404 delete conflictResolver;
406 delete storageProperties;
407 defragmenterTask.reset();
409 std::vector<MutationLog*>::iterator it;
410 for (it = accessLog.begin(); it != accessLog.end(); it++) {
415 const Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
416 return vbMap.getShard(shardId)->getFlusher();
419 Warmup* EventuallyPersistentStore::getWarmup(void) const {
423 bool EventuallyPersistentStore::startFlusher() {
424 for (uint16_t i = 0; i < vbMap.numShards; ++i) {
425 Flusher *flusher = vbMap.shards[i]->getFlusher();
431 void EventuallyPersistentStore::stopFlusher() {
432 for (uint16_t i = 0; i < vbMap.numShards; i++) {
433 Flusher *flusher = vbMap.shards[i]->getFlusher();
434 LOG(EXTENSION_LOG_WARNING, "Attempting to stop the flusher for "
435 "shard:%" PRIu16, i);
436 bool rv = flusher->stop(stats.forceShutdown);
437 if (rv && !stats.forceShutdown) {
443 bool EventuallyPersistentStore::pauseFlusher() {
445 for (uint16_t i = 0; i < vbMap.numShards; i++) {
446 Flusher *flusher = vbMap.shards[i]->getFlusher();
447 if (!flusher->pause()) {
448 LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
449 "[%s], shard = %d", flusher->stateName(), i);
456 bool EventuallyPersistentStore::resumeFlusher() {
458 for (uint16_t i = 0; i < vbMap.numShards; i++) {
459 Flusher *flusher = vbMap.shards[i]->getFlusher();
460 if (!flusher->resume()) {
461 LOG(EXTENSION_LOG_WARNING,
462 "Warning: attempted to resume flusher in state [%s], "
463 "shard = %d", flusher->stateName(), i);
470 void EventuallyPersistentStore::wakeUpFlusher() {
471 if (stats.diskQueueSize.load() == 0) {
472 for (uint16_t i = 0; i < vbMap.numShards; i++) {
473 Flusher *flusher = vbMap.shards[i]->getFlusher();
479 bool EventuallyPersistentStore::startBgFetcher() {
480 for (uint16_t i = 0; i < vbMap.numShards; i++) {
481 BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
482 if (bgfetcher == NULL) {
483 LOG(EXTENSION_LOG_WARNING,
484 "Failed to start bg fetcher for shard %d", i);
492 void EventuallyPersistentStore::stopBgFetcher() {
493 for (uint16_t i = 0; i < vbMap.numShards; i++) {
494 BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
495 if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
496 LOG(EXTENSION_LOG_WARNING,
497 "Shutting down engine while there are still pending data "
498 "read for shard %d from database storage", i);
500 LOG(EXTENSION_LOG_WARNING, "Stopping bg fetcher for shard:%" PRIu16, i);
506 EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
509 RCPtr<VBucket> vb = getVBucket(vbid);
511 // Obtain reader access to the VB state change lock so that
512 // the VB can't switch state whilst we're processing
513 ReaderLockHolder rlh(vb->getStateLock());
514 if (vb->getState() == vbucket_state_active) {
516 incExpirationStat(vb);
517 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
518 StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
520 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
521 // This is a temporary item whose background fetch for metadata
523 bool deleted = vb->ht.unlocked_del(key, bucket_num);
525 } else if (v->isExpired(startTime) && !v->isDeleted()) {
526 vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
527 queueDirty(vb, v, &lh, NULL, false);
530 if (eviction_policy == FULL_EVICTION) {
531 // Create a temp item and delete and push it
532 // into the checkpoint queue, only if the bloomfilter
533 // predicts that the item may exist on disk.
534 if (vb->maybeKeyExistsInFilter(key)) {
535 add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
537 if (rv == ADD_NOMEM) {
540 v = vb->ht.unlocked_find(key, bucket_num, true, false);
541 v->setStoredValueState(StoredValue::state_deleted_key);
542 v->setRevSeqno(revSeqno);
543 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
544 queueDirty(vb, v, &lh, NULL, false);
553 EventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
554 std::string> > &keys) {
555 std::list<std::pair<uint16_t, std::string> >::iterator it;
556 time_t startTime = ep_real_time();
557 for (it = keys.begin(); it != keys.end(); it++) {
558 deleteExpiredItem(it->first, it->second, startTime, 0);
562 StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
563 const std::string &key,
568 StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
570 if (v && !v->isDeleted() && !v->isTempItem()) {
571 // In the deleted case, we ignore expiration time.
572 if (v->isExpired(ep_real_time())) {
573 if (vb->getState() != vbucket_state_active) {
574 return wantDeleted ? v : NULL;
577 // queueDirty only allowed on active VB
578 if (queueExpired && vb->getState() == vbucket_state_active) {
579 incExpirationStat(vb, false);
580 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
581 queueDirty(vb, v, NULL, NULL, false, true);
583 return wantDeleted ? v : NULL;
589 bool EventuallyPersistentStore::isMetaDataResident(RCPtr<VBucket> &vb,
590 const std::string &key) {
594 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
595 StoredValue *v = vb->ht.unlocked_find(key, bucket_num, false, false);
597 if (v && !v->isTempItem()) {
604 protocol_binary_response_status EventuallyPersistentStore::evictKey(
605 const std::string &key,
610 RCPtr<VBucket> vb = getVBucket(vbucket);
611 if (!vb || (vb->getState() != vbucket_state_active && !force)) {
612 return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
616 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
617 StoredValue *v = fetchValidValue(vb, key, bucket_num, force, false);
619 protocol_binary_response_status rv(PROTOCOL_BINARY_RESPONSE_SUCCESS);
626 if (v->isResident()) {
627 if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
630 // Add key to bloom filter incase of full eviction mode
631 if (getItemEvictionPolicy() == FULL_EVICTION) {
632 vb->addToFilter(key);
635 *msg = "Can't eject: Dirty object.";
636 rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
639 *msg = "Already ejected.";
642 if (eviction_policy == VALUE_ONLY) {
644 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
646 *msg = "Already ejected.";
653 ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
656 const std::string &key,
660 bool isReplication) {
662 add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
667 return ENGINE_ENOMEM;
671 case ADD_TMP_AND_BG_FETCH:
672 // Since the hashtable bucket is locked, we shouldn't get here
676 bgFetch(key, vb->getId(), cookie, metadataOnly);
678 return ENGINE_EWOULDBLOCK;
681 ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
686 RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
688 ++stats.numNotMyVBuckets;
689 return ENGINE_NOT_MY_VBUCKET;
692 // Obtain read-lock on VB state to ensure VB state changes are interlocked
694 ReaderLockHolder rlh(vb->getStateLock());
695 if (vb->getState() == vbucket_state_dead) {
696 ++stats.numNotMyVBuckets;
697 return ENGINE_NOT_MY_VBUCKET;
698 } else if (vb->getState() == vbucket_state_replica && !force) {
699 ++stats.numNotMyVBuckets;
700 return ENGINE_NOT_MY_VBUCKET;
701 } else if (vb->getState() == vbucket_state_pending && !force) {
702 if (vb->addPendingOp(cookie)) {
703 return ENGINE_EWOULDBLOCK;
707 bool cas_op = (itm.getCas() != 0);
709 LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
710 StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
712 if (v && v->isLocked(ep_current_time()) &&
713 (vb->getState() == vbucket_state_replica ||
714 vb->getState() == vbucket_state_pending)) {
718 bool maybeKeyExists = true;
719 // Check Bloomfilter's prediction if in full eviction policy
720 // and for a CAS operation only.
721 if (eviction_policy == FULL_EVICTION && itm.getCas() != 0) {
722 // Check Bloomfilter's prediction
723 if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
724 maybeKeyExists = false;
728 mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(), true, false,
729 eviction_policy, nru,
732 Item& it = const_cast<Item&>(itm);
734 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
741 ret = ENGINE_KEY_EEXISTS;
745 ret = ENGINE_KEY_ENOENT;
750 // Even if the item was dirty, push it into the vbucket's open
753 it.setCas(vb->nextHLCCas());
754 v->setCas(it.getCas());
755 queueDirty(vb, v, &lh, &seqno);
756 it.setBySeqno(seqno);
759 { // CAS operation with non-resident item + full eviction.
761 // temp item is already created. Simply schedule a bg fetch job
763 bgFetch(itm.getKey(), vb->getId(), cookie, true);
764 return ENGINE_EWOULDBLOCK;
766 ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
770 case INVALID_VBUCKET:
771 ret = ENGINE_NOT_MY_VBUCKET;
778 ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
781 RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
783 ++stats.numNotMyVBuckets;
784 return ENGINE_NOT_MY_VBUCKET;
787 // Obtain read-lock on VB state to ensure VB state changes are interlocked
789 ReaderLockHolder rlh(vb->getStateLock());
790 if (vb->getState() == vbucket_state_dead ||
791 vb->getState() == vbucket_state_replica) {
792 ++stats.numNotMyVBuckets;
793 return ENGINE_NOT_MY_VBUCKET;
794 } else if(vb->getState() == vbucket_state_pending) {
795 if (vb->addPendingOp(cookie)) {
796 return ENGINE_EWOULDBLOCK;
800 if (itm.getCas() != 0) {
801 // Adding with a cas value doesn't make sense..
802 return ENGINE_NOT_STORED;
806 LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
807 StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
810 bool maybeKeyExists = true;
811 if (eviction_policy == FULL_EVICTION) {
812 // Check bloomfilter's prediction
813 if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
814 maybeKeyExists = false;
818 add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
823 Item& it = const_cast<Item&>(itm);
827 return ENGINE_ENOMEM;
829 return ENGINE_NOT_STORED;
830 case ADD_TMP_AND_BG_FETCH:
831 return addTempItemForBgFetch(lh, bucket_num, it.getKey(), vb,
835 bgFetch(it.getKey(), vb->getId(), cookie, true);
836 return ENGINE_EWOULDBLOCK;
839 it.setCas(vb->nextHLCCas());
840 v->setCas(it.getCas());
841 queueDirty(vb, v, &lh, &seqno);
842 it.setBySeqno(seqno);
846 return ENGINE_SUCCESS;
849 ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
850 const void *cookie) {
851 RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
853 ++stats.numNotMyVBuckets;
854 return ENGINE_NOT_MY_VBUCKET;
857 // Obtain read-lock on VB state to ensure VB state changes are interlocked
859 ReaderLockHolder rlh(vb->getStateLock());
860 if (vb->getState() == vbucket_state_dead ||
861 vb->getState() == vbucket_state_replica) {
862 ++stats.numNotMyVBuckets;
863 return ENGINE_NOT_MY_VBUCKET;
864 } else if (vb->getState() == vbucket_state_pending) {
865 if (vb->addPendingOp(cookie)) {
866 return ENGINE_EWOULDBLOCK;
871 LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
872 StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
875 if (v->isDeleted() || v->isTempDeletedItem() ||
876 v->isTempNonExistentItem()) {
877 return ENGINE_KEY_ENOENT;
880 mutation_type_t mtype;
881 if (eviction_policy == FULL_EVICTION && v->isTempInitialItem()) {
882 mtype = NEED_BG_FETCH;
884 mtype = vb->ht.unlocked_set(v, itm, 0, true, false, eviction_policy,
888 Item& it = const_cast<Item&>(itm);
890 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
896 ret = ENGINE_KEY_EEXISTS;
900 ret = ENGINE_NOT_STORED;
904 // Even if the item was dirty, push it into the vbucket's open
907 it.setCas(vb->nextHLCCas());
908 v->setCas(it.getCas());
909 queueDirty(vb, v, &lh, &seqno);
910 it.setBySeqno(seqno);
914 // temp item is already created. Simply schedule a bg fetch job
916 bgFetch(it.getKey(), vb->getId(), cookie, true);
917 ret = ENGINE_EWOULDBLOCK;
920 case INVALID_VBUCKET:
921 ret = ENGINE_NOT_MY_VBUCKET;
927 if (eviction_policy == VALUE_ONLY) {
928 return ENGINE_KEY_ENOENT;
931 if (vb->maybeKeyExistsInFilter(itm.getKey())) {
932 return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
935 // As bloomfilter predicted that item surely doesn't exist
936 // on disk, return ENOENT for replace().
937 return ENGINE_KEY_ENOENT;
942 ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(
946 ExtendedMetaData *emd) {
948 RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
950 ++stats.numNotMyVBuckets;
951 return ENGINE_NOT_MY_VBUCKET;
954 // Obtain read-lock on VB state to ensure VB state changes are interlocked
955 // with this add-tapbackfill
956 ReaderLockHolder rlh(vb->getStateLock());
957 if (vb->getState() == vbucket_state_dead ||
958 vb->getState() == vbucket_state_active) {
959 ++stats.numNotMyVBuckets;
960 return ENGINE_NOT_MY_VBUCKET;
963 //check for the incoming item's CAS validity
964 if (!Item::isValidCas(itm.getCas())) {
965 return ENGINE_KEY_EEXISTS;
969 LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
970 StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
973 // Note that this function is only called on replica or pending vbuckets.
974 if (v && v->isLocked(ep_current_time())) {
977 mutation_type_t mtype = vb->ht.unlocked_set(v, itm, 0, true, true,
978 eviction_policy, nru);
980 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
987 ret = ENGINE_KEY_EEXISTS;
990 // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
991 // set correctly, and also the sequence numbers are ordered correctly.
996 /* set the conflict resolution mode from the extended meta data *
997 * Given that the mode is already set, we don't need to set the *
998 * conflict resolution mode in queueDirty */
1000 v->setConflictResMode(
1001 static_cast<enum conflict_resolution_mode>(
1002 emd->getConflictResMode()));
1004 vb->setMaxCas(v->getCas());
1005 queueDirty(vb, v, &lh, NULL,true, true, genBySeqno, false);
1007 case INVALID_VBUCKET:
1008 ret = ENGINE_NOT_MY_VBUCKET;
1011 // SET on a non-active vbucket should not require a bg_metadata_fetch.
1015 // Update drift counter for vbucket upon a success only
1016 if (ret == ENGINE_SUCCESS && emd) {
1017 vb->setDriftCounter(emd->getAdjustedTime());
1023 class KVStatsCallback : public Callback<kvstats_ctx> {
1025 KVStatsCallback(EventuallyPersistentStore *store)
1026 : epstore(store) { }
1028 void callback(kvstats_ctx &ctx) {
1029 RCPtr<VBucket> vb = epstore->getVBucket(ctx.vbucket);
1031 vb->fileSpaceUsed = ctx.fileSpaceUsed;
1032 vb->fileSize = ctx.fileSize;
1037 EventuallyPersistentStore *epstore;
1040 void EventuallyPersistentStore::snapshotVBuckets(VBSnapshotTask::Priority prio,
1043 class VBucketStateVisitor : public VBucketVisitor {
1045 VBucketStateVisitor(VBucketMap &vb_map, uint16_t sid)
1046 : vbuckets(vb_map), shardId(sid) { }
1047 bool visitBucket(RCPtr<VBucket> &vb) {
1048 if (vbuckets.getShard(vb->getId())->getId() == shardId) {
1049 snapshot_range_t range;
1050 vb->getPersistedSnapshot(range);
1051 std::string failovers = vb->failovers->toJSON();
1052 uint64_t chkId = vbuckets.getPersistenceCheckpointId(vb->getId());
1054 vbucket_state vb_state(vb->getState(), chkId, 0,
1055 vb->getHighSeqno(), vb->getPurgeSeqno(),
1056 range.start, range.end, vb->getMaxCas(),
1057 vb->getDriftCounter() ,failovers);
1058 states.insert(std::pair<uint16_t, vbucket_state>(vb->getId(), vb_state));
1063 void visit(StoredValue*) {
1064 cb_assert(false); // this does not happen
1067 std::map<uint16_t, vbucket_state> states;
1070 VBucketMap &vbuckets;
1074 KVShard *shard = vbMap.shards[shardId];
1075 if (prio == VBSnapshotTask::Priority::LOW) {
1076 shard->setLowPriorityVbSnapshotFlag(false);
1078 shard->setHighPriorityVbSnapshotFlag(false);
1081 KVStatsCallback kvcb(this);
1082 VBucketStateVisitor v(vbMap, shard->getId());
1084 hrtime_t start = gethrtime();
1086 bool success = true;
1087 vbucket_map_t::reverse_iterator iter = v.states.rbegin();
1088 for (; iter != v.states.rend(); ++iter) {
1089 LockHolder lh(vb_mutexes[iter->first], true /*tryLock*/);
1090 if (!lh.islocked()) {
1093 KVStore *rwUnderlying = getRWUnderlying(iter->first);
1094 if (!rwUnderlying->snapshotVBucket(iter->first, iter->second,
1096 LOG(EXTENSION_LOG_WARNING,
1097 "VBucket snapshot task failed!!! Rescheduling");
1102 if (prio == VBSnapshotTask::Priority::HIGH) {
1103 if (vbMap.setBucketCreation(iter->first, false)) {
1104 LOG(EXTENSION_LOG_INFO, "VBucket %d created", iter->first);
1110 scheduleVBSnapshot(prio, shard->getId());
1112 stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
1116 bool EventuallyPersistentStore::persistVBState(uint16_t vbid) {
1117 schedule_vbstate_persist[vbid] = false;
1119 RCPtr<VBucket> vb = getVBucket(vbid);
1121 LOG(EXTENSION_LOG_WARNING,
1122 "VBucket %d not exist!!! vb_state persistence task failed!!!", vbid);
1126 bool inverse = false;
1127 LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1128 if (!lh.islocked()) {
1129 if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
1137 const hrtime_t start = gethrtime();
1139 KVStatsCallback kvcb(this);
1140 uint64_t chkId = vbMap.getPersistenceCheckpointId(vbid);
1141 std::string failovers = vb->failovers->toJSON();
1143 snapshot_range_t range;
1144 vb->getPersistedSnapshot(range);
1145 vbucket_state vb_state(vb->getState(), chkId, 0, vb->getHighSeqno(),
1146 vb->getPurgeSeqno(), range.start, range.end,
1147 vb->getMaxCas(), vb->getDriftCounter(),
1150 KVStore *rwUnderlying = getRWUnderlying(vbid);
1151 if (rwUnderlying->snapshotVBucket(vbid, vb_state, &kvcb)) {
1152 stats.persistVBStateHisto.add((gethrtime() - start) / 1000);
1153 if (vbMap.setBucketCreation(vbid, false)) {
1154 LOG(EXTENSION_LOG_INFO, "VBucket %d created", vbid);
1157 LOG(EXTENSION_LOG_WARNING,
1158 "VBucket %d: vb_state persistence task failed!!! Rescheduling", vbid);
1160 if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
1170 ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
1174 // Lock to prevent a race condition between a failed update and add.
1175 LockHolder lh(vbsetMutex);
1176 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1177 if (vb && to == vb->getState()) {
1178 return ENGINE_SUCCESS;
1182 vbucket_state_t oldstate = vb->getState();
1183 if (oldstate != to && notify_dcp) {
1184 engine.getDcpConnMap().vbucketStateChanged(vbid, to);
1187 vb->setState(to, engine.getServerApi());
1189 if (to == vbucket_state_active && oldstate == vbucket_state_replica) {
1191 * Update snapshot range when vbucket goes from being a replica
1192 * to active, to maintain the correct snapshot sequence numbers
1193 * even in a failover scenario.
1195 vb->checkpointManager.resetSnapshotRange();
1198 if (to == vbucket_state_active && !transfer) {
1199 snapshot_range_t range;
1200 vb->getPersistedSnapshot(range);
1201 if (range.end == vbMap.getPersistenceSeqno(vbid)) {
1202 vb->failovers->createEntry(range.end);
1204 vb->failovers->createEntry(range.start);
1209 if (oldstate == vbucket_state_pending &&
1210 to == vbucket_state_active) {
1211 ExTask notifyTask = new PendingOpsNotification(engine, vb);
1212 ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
1214 scheduleVBStatePersist(VBStatePersistTask::Priority::LOW, vbid);
1216 FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
1217 KVShard* shard = vbMap.getShard(vbid);
1218 shared_ptr<Callback<uint16_t> > cb(new NotifyFlusherCB(shard));
1219 RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
1220 engine.getCheckpointConfig(),
1221 shard, 0, 0, 0, ft, cb));
1222 // The first checkpoint for active vbucket should start with id 2.
1223 uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
1224 newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
1225 if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
1227 return ENGINE_ERANGE;
1229 vbMap.setPersistenceCheckpointId(vbid, 0);
1230 vbMap.setPersistenceSeqno(vbid, 0);
1231 vbMap.setBucketCreation(vbid, true);
1233 scheduleVBStatePersist(VBStatePersistTask::Priority::HIGH, vbid);
1235 return ENGINE_SUCCESS;
1238 bool EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio) {
1239 KVShard *shard = NULL;
1240 if (prio == VBSnapshotTask::Priority::HIGH) {
1241 for (size_t i = 0; i < vbMap.numShards; ++i) {
1242 shard = vbMap.shards[i];
1243 if (shard->setHighPriorityVbSnapshotFlag(true)) {
1244 ExTask task = new VBSnapshotTaskHigh(&engine, i, true);
1245 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1249 for (size_t i = 0; i < vbMap.numShards; ++i) {
1250 shard = vbMap.shards[i];
1251 if (shard->setLowPriorityVbSnapshotFlag(true)) {
1252 ExTask task = new VBSnapshotTaskLow(&engine, i, true);
1253 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1257 if (stats.isShutdown) {
1263 void EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio,
1266 KVShard *shard = vbMap.shards[shardId];
1267 if (prio == VBSnapshotTask::Priority::HIGH) {
1268 if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
1269 ExTask task = new VBSnapshotTaskHigh(&engine, shardId, true);
1270 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1273 if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
1274 ExTask task = new VBSnapshotTaskLow(&engine, shardId, true);
1275 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1280 void EventuallyPersistentStore::scheduleVBStatePersist(VBStatePersistTask::Priority priority,
1283 bool inverse = false;
1285 schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
1286 if (priority == VBStatePersistTask::Priority::HIGH) {
1287 ExecutorPool::get()->schedule(new VBStatePersistTaskHigh(&engine, vbid, true), WRITER_TASK_IDX);
1289 ExecutorPool::get()->schedule(new VBStatePersistTaskLow(&engine, vbid, true), WRITER_TASK_IDX);
1294 bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
1295 const void* cookie) {
1296 LockHolder lh(vbsetMutex);
1298 hrtime_t start_time(gethrtime());
1299 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1300 if (!vb || vb->getState() == vbucket_state_dead ||
1301 vbMap.isBucketDeletion(vbid)) {
1303 LockHolder vlh(vb_mutexes[vbid]);
1304 getRWUnderlying(vbid)->delVBucket(vbid);
1305 vbMap.setBucketDeletion(vbid, false);
1306 vbMap.setPersistenceSeqno(vbid, 0);
1307 ++stats.vbucketDeletions;
1310 hrtime_t spent(gethrtime() - start_time);
1311 hrtime_t wall_time = spent / 1000;
1312 BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
1313 stats.diskVBDelHisto.add(wall_time);
1314 atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
1315 stats.vbucketDelTotWalltime.fetch_add(wall_time);
1317 engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1323 void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
1326 ExTask delTask = new VBucketMemoryDeletionTask(engine, vb, delay);
1327 ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
1329 if (vbMap.setBucketDeletion(vb->getId(), true)) {
1330 ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie);
1331 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1335 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteVBucket(uint16_t vbid,
1337 // Lock to prevent a race condition between a failed update and add
1339 LockHolder lh(vbsetMutex);
1341 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1343 return ENGINE_NOT_MY_VBUCKET;
1346 engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
1347 vbMap.removeBucket(vbid);
1349 scheduleVBDeletion(vb, c);
1351 return ENGINE_EWOULDBLOCK;
1353 return ENGINE_SUCCESS;
1356 ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
1358 const void *cookie) {
1359 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1361 return ENGINE_NOT_MY_VBUCKET;
1364 LockHolder lh(compactionLock);
1365 ExTask task = new CompactVBucketTask(&engine, vbid, c, cookie);
1366 compactionTasks.push_back(std::make_pair(vbid, task));
1367 if (compactionTasks.size() > 1) {
1368 if ((stats.diskQueueSize > compactionWriteQueueCap &&
1369 compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
1370 engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
1371 // Snooze a new compaction task.
1372 // We will wake it up when one of the existing compaction tasks is done.
1377 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1379 LOG(EXTENSION_LOG_DEBUG, "Scheduled compaction task %d on vbucket %d,"
1380 "purge_before_ts = %lld, purge_before_seq = %lld, dropdeletes = %d",
1381 task->getId(), vbid, c.purge_before_ts,
1382 c.purge_before_seq, c.drop_deletes);
1384 return ENGINE_EWOULDBLOCK;
1387 class ExpiredItemsCallback : public Callback<compaction_ctx> {
1389 ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid)
1390 : epstore(store), vbucket(vbid) { }
1392 void callback(compaction_ctx &ctx) {
1393 std::list<expiredItemCtx>::iterator it;
1394 for (it = ctx.expiredItems.begin();
1395 it != ctx.expiredItems.end(); it++) {
1396 if (epstore->compactionCanExpireItems()) {
1397 epstore->deleteExpiredItem(vbucket, it->keyStr,
1405 EventuallyPersistentStore *epstore;
1409 bool EventuallyPersistentStore::compactVBucket(const uint16_t vbid,
1410 compaction_ctx *ctx,
1411 const void *cookie) {
1412 ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1413 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1415 LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1416 if (!lh.islocked()) {
1417 return true; // Schedule a compaction task again.
1420 Configuration &config = getEPEngine().getConfiguration();
1421 if (config.isBfilterEnabled()) {
1422 size_t initial_estimation = config.getBfilterKeyCount();
1423 size_t estimated_count;
1424 size_t num_deletes =
1425 getROUnderlying(vbid)->getNumPersistedDeletes(vbid);
1426 if (eviction_policy == VALUE_ONLY) {
1428 * VALUE-ONLY EVICTION POLICY
1429 * Obtain number of persisted deletes from underlying kvstore.
1430 * Bloomfilter's estimated_key_count = 1.25 * deletes
1433 estimated_count = round(1.25 * num_deletes);
1434 ctx->bfcb = new BfilterCB(this, vbid, false);
1437 * FULL EVICTION POLICY
1438 * First determine if the resident ratio of vbucket is less than
1439 * the threshold from configuration.
1442 bool residentRatioAlert = vb->isResidentRatioUnderThreshold(
1443 getBfiltersResidencyThreshold(),
1445 ctx->bfcb = new BfilterCB(this, vbid, residentRatioAlert);
1448 * Based on resident ratio against threshold, estimate count.
1450 * 1. If resident ratio is greater than the threshold:
1451 * Obtain number of persisted deletes from underlying kvstore.
1452 * Obtain number of non-resident-items for vbucket.
1453 * Bloomfilter's estimated_key_count =
1454 * 1.25 * (deletes + non-resident)
1457 * Obtain number of items for vbucket.
1458 * Bloomfilter's estimated_key_count =
1459 * 1.25 * (num_items)
1462 if (residentRatioAlert) {
1463 estimated_count = round(1.25 *
1464 vb->getNumItems(eviction_policy));
1466 estimated_count = round(1.25 * (num_deletes +
1467 vb->getNumNonResidentItems(eviction_policy)));
1470 if (estimated_count < initial_estimation) {
1471 estimated_count = initial_estimation;
1473 vb->initTempFilter(estimated_count, config.getBfilterFpProb());
1476 if (vb->getState() == vbucket_state_active) {
1477 // Set the current time ONLY for active vbuckets.
1478 ctx->curr_time = ep_real_time();
1482 ExpiredItemsCallback cb(this, vbid);
1483 KVStatsCallback kvcb(this);
1484 if (getRWUnderlying(vbid)->compactVBucket(vbid, ctx, cb, kvcb)) {
1485 if (config.isBfilterEnabled()) {
1491 LOG(EXTENSION_LOG_WARNING, "Compaction: Not successful for vb %u, "
1492 "clearing bloom filter, if any.", vb->getId());
1495 vb->setPurgeSeqno(ctx->max_purged_seq);
1497 err = ENGINE_NOT_MY_VBUCKET;
1498 engine.storeEngineSpecific(cookie, NULL);
1499 //Decrement session counter here, as memcached thread wouldn't
1500 //visit the engine interface in case of a NOT_MY_VB notification
1501 engine.decrementSessionCtr();
1505 LockHolder lh(compactionLock);
1506 bool erased = false, woke = false;
1507 std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
1508 while (it != compactionTasks.end()) {
1509 if ((*it).first == vbid) {
1510 it = compactionTasks.erase(it);
1513 ExTask &task = (*it).second;
1514 if (task->getState() == TASK_SNOOZED) {
1515 ExecutorPool::get()->wake(task->getId());
1520 if (erased && woke) {
1527 engine.notifyIOComplete(cookie, err);
1529 --stats.pendingCompactions;
1533 bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
1534 LockHolder lh(vbsetMutex);
1537 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1539 vbucket_state_t vbstate = vb->getState();
1541 vbMap.removeBucket(vbid);
1544 std::list<std::string> tap_cursors = vb->checkpointManager.
1546 // Delete and recreate the vbucket database file
1547 scheduleVBDeletion(vb, NULL, 0);
1548 setVBucketState(vbid, vbstate, false);
1550 // Copy the all cursors from the old vbucket into the new vbucket
1551 RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
1552 newvb->checkpointManager.resetCursors(tap_cursors);
1562 EventuallyPersistentEngine* engine;
1563 std::map<std::string, std::string> smap;
1566 static void add_stat(const char *key, const uint16_t klen,
1567 const char *val, const uint32_t vlen,
1568 const void *cookie) {
1570 void *ptr = const_cast<void *>(cookie);
1571 snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1572 ObjectRegistry::onSwitchThread(snap->engine);
1574 std::string k(key, klen);
1575 std::string v(val, vlen);
1576 snap->smap.insert(std::pair<std::string, std::string>(k, v));
1580 void EventuallyPersistentStore::snapshotStats() {
1581 snapshot_stats_t snap;
1582 snap.engine = &engine;
1583 std::map<std::string, std::string> smap;
1584 bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1585 engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1586 engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS;
1588 if (rv && stats.isShutdown) {
1589 snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1591 std::stringstream ss;
1592 ss << ep_real_time();
1593 snap.smap["ep_shutdown_time"] = ss.str();
1595 getOneRWUnderlying()->snapshotStats(snap.smap);
1598 void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
1599 const hrtime_t start,
1600 const hrtime_t stop) {
1601 if (stop >= start && start >= init) {
1602 // skip the measurement if the counter wrapped...
1603 ++stats.bgNumOperations;
1604 hrtime_t w = (start - init) / 1000;
1605 BlockTimer::log(start - init, "bgwait", stats.timingLog);
1606 stats.bgWaitHisto.add(w);
1607 stats.bgWait.fetch_add(w);
1608 atomic_setIfLess(stats.bgMinWait, w);
1609 atomic_setIfBigger(stats.bgMaxWait, w);
1611 hrtime_t l = (stop - start) / 1000;
1612 BlockTimer::log(stop - start, "bgload", stats.timingLog);
1613 stats.bgLoadHisto.add(l);
1614 stats.bgLoad.fetch_add(l);
1615 atomic_setIfLess(stats.bgMinLoad, l);
1616 atomic_setIfBigger(stats.bgMaxLoad, l);
1620 void EventuallyPersistentStore::completeBGFetch(const std::string &key,
1625 hrtime_t start(gethrtime());
1627 RememberingCallback<GetValue> gcb;
1629 gcb.val.setPartial();
1630 ++stats.bg_meta_fetched;
1634 getROUnderlying(vbucket)->get(key, vbucket, gcb);
1636 cb_assert(gcb.fired);
1637 ENGINE_ERROR_CODE status = gcb.val.getStatus();
1639 // Lock to prevent a race condition between a fetch for restore and delete
1640 LockHolder lh(vbsetMutex);
1642 RCPtr<VBucket> vb = getVBucket(vbucket);
1644 ReaderLockHolder rlh(vb->getStateLock());
1646 LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1647 StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1649 if ((v && v->unlocked_restoreMeta(gcb.val.getValue(),
1650 gcb.val.getStatus(), vb->ht))
1651 || ENGINE_KEY_ENOENT == status) {
1652 /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1653 key is removed from hash table by the time bgfetch returns
1654 (in case multiple bgfetch is scheduled for a key), we still
1655 need to return ENGINE_SUCCESS to the memcached worker thread,
1656 so that the worker thread can visit the ep-engine and figure
1657 out the correct flow */
1658 status = ENGINE_SUCCESS;
1661 bool restore = false;
1662 if (v && v->isResident()) {
1663 status = ENGINE_SUCCESS;
1665 switch (eviction_policy) {
1667 if (v && !v->isResident() && !v->isDeleted()) {
1673 if (v->isTempInitialItem() ||
1674 (!v->isResident() && !v->isDeleted())) {
1680 throw std::logic_error("Unknown eviction policy");
1685 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1686 v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1687 cb_assert(v->isResident());
1688 if (vb->getState() == vbucket_state_active &&
1689 v->getExptime() != gcb.val.getValue()->getExptime() &&
1690 v->getCas() == gcb.val.getValue()->getCas()) {
1691 // MB-9306: It is possible that by the time bgfetcher
1692 // returns, the item may have been updated and queued
1693 // Hence test the CAS value to be the same first.
1694 // exptime mutated, schedule it into new checkpoint
1695 queueDirty(vb, v, &hlh, NULL);
1697 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1698 v->setStoredValueState(
1699 StoredValue::state_non_existent_key);
1700 if (eviction_policy == FULL_EVICTION) {
1701 // For the full eviction, we should notify
1702 // ENGINE_SUCCESS to the memcached worker thread, so
1703 // that the worker thread can visit the ep-engine and
1704 // figure out the correct error code.
1705 status = ENGINE_SUCCESS;
1708 // underlying kvstore couldn't fetch requested data
1709 // log returned error and notify TMPFAIL to client
1710 LOG(EXTENSION_LOG_WARNING,
1711 "Warning: failed background fetch for vb=%d "
1712 "seq=%" PRId64 " key=%s", vbucket, v->getBySeqno(),
1714 status = ENGINE_TMPFAIL;
1719 LOG(EXTENSION_LOG_INFO, "VBucket %d's file was deleted in the middle of"
1720 " a bg fetch for key %s\n", vbucket, key.c_str());
1721 status = ENGINE_NOT_MY_VBUCKET;
1726 hrtime_t stop = gethrtime();
1727 updateBGStats(init, start, stop);
1730 delete gcb.val.getValue();
1731 engine.notifyIOComplete(cookie, status);
1734 void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
1735 std::vector<bgfetched_item_t> &fetchedItems,
1738 RCPtr<VBucket> vb = getVBucket(vbId);
1740 std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1741 for (; itemItr != fetchedItems.end(); ++itemItr) {
1742 engine.notifyIOComplete((*itemItr).second->cookie,
1743 ENGINE_NOT_MY_VBUCKET);
1745 LOG(EXTENSION_LOG_WARNING,
1746 "EP Store completes %d of batched background fetch for "
1747 "for vBucket = %d that is already deleted\n",
1748 (int)fetchedItems.size(), vbId);
1752 std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1753 for (; itemItr != fetchedItems.end(); ++itemItr) {
1754 VBucketBGFetchItem *bgitem = (*itemItr).second;
1755 ENGINE_ERROR_CODE status = bgitem->value.getStatus();
1756 Item *fetchedValue = bgitem->value.getValue();
1757 const std::string &key = (*itemItr).first;
1759 ReaderLockHolder rlh(vb->getStateLock());
1762 LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
1763 StoredValue *v = fetchValidValue(vb, key, bucket, true);
1764 if (bgitem->metaDataOnly) {
1765 if ((v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht))
1766 || ENGINE_KEY_ENOENT == status) {
1767 /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1768 key is removed from hash table by the time bgfetch returns
1769 (in case multiple bgfetch is scheduled for a key), we still
1770 need to return ENGINE_SUCCESS to the memcached worker thread,
1771 so that the worker thread can visit the ep-engine and figure
1772 out the correct flow */
1773 status = ENGINE_SUCCESS;
1776 bool restore = false;
1777 if (v && v->isResident()) {
1778 status = ENGINE_SUCCESS;
1780 switch (eviction_policy) {
1782 if (v && !v->isResident() && !v->isDeleted()) {
1788 if (v->isTempInitialItem() ||
1789 (!v->isResident() && !v->isDeleted())) {
1795 throw std::logic_error("Unknown eviction policy");
1800 if (status == ENGINE_SUCCESS) {
1801 v->unlocked_restoreValue(fetchedValue, vb->ht);
1802 cb_assert(v->isResident());
1803 ReaderLockHolder(vb->getStateLock());
1804 if (vb->getState() == vbucket_state_active &&
1805 v->getExptime() != fetchedValue->getExptime() &&
1806 v->getCas() == fetchedValue->getCas()) {
1807 // MB-9306: It is possible that by the time
1808 // bgfetcher returns, the item may have been
1809 // updated and queued
1810 // Hence test the CAS value to be the same first.
1811 // exptime mutated, schedule it into new checkpoint
1812 queueDirty(vb, v, &blh, NULL);
1814 } else if (status == ENGINE_KEY_ENOENT) {
1815 v->setStoredValueState(StoredValue::state_non_existent_key);
1816 if (eviction_policy == FULL_EVICTION) {
1817 // For the full eviction, we should notify
1818 // ENGINE_SUCCESS to the memcached worker thread,
1819 // so that the worker thread can visit the
1820 // ep-engine and figure out the correct error
1822 status = ENGINE_SUCCESS;
1825 // underlying kvstore couldn't fetch requested data
1826 // log returned error and notify TMPFAIL to client
1827 LOG(EXTENSION_LOG_WARNING,
1828 "Warning: failed background fetch for vb=%d "
1829 "key=%s", vbId, key.c_str());
1830 status = ENGINE_TMPFAIL;
1834 } // locking scope ends
1836 if (bgitem->metaDataOnly) {
1837 ++stats.bg_meta_fetched;
1842 hrtime_t endTime = gethrtime();
1843 updateBGStats(bgitem->initTime, startTime, endTime);
1844 engine.notifyIOComplete(bgitem->cookie, status);
1847 LOG(EXTENSION_LOG_DEBUG,
1848 "EP Store completes %d of batched background fetch "
1849 "for vBucket = %d endTime = %lld\n",
1850 fetchedItems.size(), vbId, gethrtime()/1000000);
1853 void EventuallyPersistentStore::bgFetch(const std::string &key,
1857 std::stringstream ss;
1859 if (multiBGFetchEnabled()) {
1860 RCPtr<VBucket> vb = getVBucket(vbucket);
1862 KVShard *myShard = vbMap.getShard(vbucket);
1864 // schedule to the current batch of background fetch of the given
1866 VBucketBGFetchItem * fetchThis = new VBucketBGFetchItem(cookie,
1868 vb->queueBGFetchItem(key, fetchThis, myShard->getBgFetcher());
1869 myShard->getBgFetcher()->notifyBGEvent();
1870 ss << "Queued a background fetch, now at "
1871 << vb->numPendingBGFetchItems() << std::endl;
1872 LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1875 stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
1876 bgFetchQueue.load());
1877 ExecutorPool* iom = ExecutorPool::get();
1878 ExTask task = new SingleBGFetcherTask(&engine, key, vbucket, cookie,
1879 isMeta, bgFetchDelay, false);
1880 iom->schedule(task, READER_TASK_IDX);
1881 ss << "Queued a background fetch, now at " << bgFetchQueue.load()
1883 LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1887 GetValue EventuallyPersistentStore::getInternal(const std::string &key,
1892 vbucket_state_t allowedState,
1893 bool trackReference,
1894 bool deleteTempItem,
1895 bool hideLockedCAS) {
1897 vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1898 vbucket_state_replica : vbucket_state_active;
1899 RCPtr<VBucket> vb = getVBucket(vbucket);
1901 ++stats.numNotMyVBuckets;
1902 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1905 ReaderLockHolder rlh(vb->getStateLock());
1906 if (honorStates && vb->getState() == vbucket_state_dead) {
1907 ++stats.numNotMyVBuckets;
1908 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1909 } else if (honorStates && vb->getState() == disallowedState) {
1910 ++stats.numNotMyVBuckets;
1911 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1912 } else if (honorStates && vb->getState() == vbucket_state_pending) {
1913 if (vb->addPendingOp(cookie)) {
1914 return GetValue(NULL, ENGINE_EWOULDBLOCK);
1919 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1920 StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
1923 if (v->isDeleted()) {
1927 if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1928 // Delete a temp non-existent item to ensure that
1929 // if the get were issued over an item that doesn't
1930 // exist, then we dont preserve a temp item.
1931 if (deleteTempItem) {
1932 vb->ht.unlocked_del(key, bucket_num);
1938 // If the value is not resident, wait for it...
1939 if (!v->isResident()) {
1941 bgFetch(key, vbucket, cookie);
1943 return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
1944 true, v->getNRUValue());
1947 // Should we hide (return -1) for the items' CAS?
1948 const bool hide_cas = hideLockedCAS &&
1949 v->isLocked(ep_current_time());
1950 GetValue rv(v->toItem(hide_cas, vbucket), ENGINE_SUCCESS,
1951 v->getBySeqno(), false, v->getNRUValue());
1954 if (eviction_policy == VALUE_ONLY || diskFlushAll) {
1959 if (vb->maybeKeyExistsInFilter(key)) {
1960 ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1961 if (queueBG) { // Full eviction and need a bg fetch.
1962 ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
1965 return GetValue(NULL, ec, -1, true);
1967 // As bloomfilter predicted that item surely doesn't exist
1968 // on disk, return ENONET, for getInternal().
1975 GetValue EventuallyPersistentStore::getRandomKey() {
1976 long max = vbMap.getSize();
1978 long start = random() % max;
1982 while (itm == NULL) {
1983 RCPtr<VBucket> vb = getVBucket(curr++);
1984 while (!vb || vb->getState() != vbucket_state_active) {
1985 if (curr == start) {
1986 return GetValue(NULL, ENGINE_KEY_ENOENT);
1992 vb = getVBucket(curr++);
1995 if ((itm = vb->ht.getRandomKey(random())) != NULL) {
1996 GetValue rv(itm, ENGINE_SUCCESS);
2004 if (curr == start) {
2005 return GetValue(NULL, ENGINE_KEY_ENOENT);
2007 // Search next vbucket
2010 return GetValue(NULL, ENGINE_KEY_ENOENT);
2014 ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
2015 const std::string &key,
2018 ItemMetaData &metadata,
2020 uint8_t &confResMode,
2021 bool trackReferenced)
2024 RCPtr<VBucket> vb = getVBucket(vbucket);
2026 ++stats.numNotMyVBuckets;
2027 return ENGINE_NOT_MY_VBUCKET;
2030 ReaderLockHolder rlh(vb->getStateLock());
2031 if (vb->getState() == vbucket_state_dead ||
2032 vb->getState() == vbucket_state_replica) {
2033 ++stats.numNotMyVBuckets;
2034 return ENGINE_NOT_MY_VBUCKET;
2039 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2040 StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true,
2044 stats.numOpsGetMeta++;
2046 if (v->isTempInitialItem()) { // Need bg meta fetch.
2047 bgFetch(key, vbucket, cookie, true);
2048 return ENGINE_EWOULDBLOCK;
2049 } else if (v->isTempNonExistentItem()) {
2050 metadata.cas = v->getCas();
2051 return ENGINE_KEY_ENOENT;
2053 if (v->isTempDeletedItem() || v->isDeleted() ||
2054 v->isExpired(ep_real_time())) {
2055 deleted |= GET_META_ITEM_DELETED_FLAG;
2058 if (v->isLocked(ep_current_time())) {
2059 metadata.cas = static_cast<uint64_t>(-1);
2061 metadata.cas = v->getCas();
2063 metadata.flags = v->getFlags();
2064 metadata.exptime = v->getExptime();
2065 metadata.revSeqno = v->getRevSeqno();
2066 confResMode = v->getConflictResMode();
2067 return ENGINE_SUCCESS;
2070 // The key wasn't found. However, this may be because it was previously
2071 // deleted or evicted with the full eviction strategy.
2072 // So, add a temporary item corresponding to the key to the hash table
2073 // and schedule a background fetch for its metadata from the persistent
2074 // store. The item's state will be updated after the fetch completes.
2076 // Schedule this bgFetch only if the key is predicted to be may-be
2077 // existent on disk by the bloomfilter.
2079 if (vb->maybeKeyExistsInFilter(key)) {
2080 return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
2082 return ENGINE_KEY_ENOENT;
2087 ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
2096 ExtendedMetaData *emd,
2099 RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
2101 ++stats.numNotMyVBuckets;
2102 return ENGINE_NOT_MY_VBUCKET;
2105 ReaderLockHolder rlh(vb->getStateLock());
2106 if (vb->getState() == vbucket_state_dead) {
2107 ++stats.numNotMyVBuckets;
2108 return ENGINE_NOT_MY_VBUCKET;
2109 } else if (vb->getState() == vbucket_state_replica && !force) {
2110 ++stats.numNotMyVBuckets;
2111 return ENGINE_NOT_MY_VBUCKET;
2112 } else if (vb->getState() == vbucket_state_pending && !force) {
2113 if (vb->addPendingOp(cookie)) {
2114 return ENGINE_EWOULDBLOCK;
2118 //check for the incoming item's CAS validity
2119 if (!Item::isValidCas(itm.getCas())) {
2120 return ENGINE_KEY_EEXISTS;
2124 LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
2125 StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
2128 bool maybeKeyExists = true;
2131 if (v->isTempInitialItem()) {
2132 bgFetch(itm.getKey(), itm.getVBucketId(), cookie, true);
2133 return ENGINE_EWOULDBLOCK;
2136 enum conflict_resolution_mode confResMode = revision_seqno;
2138 confResMode = static_cast<enum conflict_resolution_mode>(
2139 emd->getConflictResMode());
2142 if (!conflictResolver->resolve(vb, v, itm.getMetaData(), false,
2144 ++stats.numOpsSetMetaResolutionFailed;
2145 return ENGINE_KEY_EEXISTS;
2148 if (vb->maybeKeyExistsInFilter(itm.getKey())) {
2149 return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
2150 cookie, true, isReplication);
2152 maybeKeyExists = false;
2156 if (eviction_policy == FULL_EVICTION) {
2157 // Check Bloomfilter's prediction
2158 if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
2159 maybeKeyExists = false;
2164 if (v && v->isLocked(ep_current_time()) &&
2165 (vb->getState() == vbucket_state_replica ||
2166 vb->getState() == vbucket_state_pending)) {
2170 mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
2171 true, eviction_policy, nru,
2172 maybeKeyExists, isReplication);
2174 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2177 ret = ENGINE_ENOMEM;
2181 ret = ENGINE_KEY_EEXISTS;
2183 case INVALID_VBUCKET:
2184 ret = ENGINE_NOT_MY_VBUCKET;
2188 /* set the conflict resolution mode from the extended meta data *
2189 * Given that the mode is already set, we don't need to set the *
2190 * conflict resolution mode in queueDirty */
2192 v->setConflictResMode(
2193 static_cast<enum conflict_resolution_mode>(
2194 emd->getConflictResMode()));
2196 vb->setMaxCas(v->getCas());
2197 queueDirty(vb, v, &lh, seqno, false, true, genBySeqno, false);
2200 ret = ENGINE_KEY_ENOENT;
2203 { // CAS operation with non-resident item + full eviction.
2204 if (v) { // temp item is already created. Simply schedule a
2205 lh.unlock(); // bg fetch job.
2206 bgFetch(itm.getKey(), vb->getId(), cookie, true);
2207 return ENGINE_EWOULDBLOCK;
2210 ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
2211 cookie, true, isReplication);
2215 // Update drift counter for vbucket upon a success only
2216 if (ret == ENGINE_SUCCESS && emd) {
2217 vb->setDriftCounter(emd->getAdjustedTime());
2223 GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
2228 RCPtr<VBucket> vb = getVBucket(vbucket);
2230 ++stats.numNotMyVBuckets;
2231 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2234 ReaderLockHolder rlh(vb->getStateLock());
2235 if (vb->getState() == vbucket_state_dead) {
2236 ++stats.numNotMyVBuckets;
2237 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2238 } else if (vb->getState() == vbucket_state_replica) {
2239 ++stats.numNotMyVBuckets;
2240 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2241 } else if (vb->getState() == vbucket_state_pending) {
2242 if (vb->addPendingOp(cookie)) {
2243 return GetValue(NULL, ENGINE_EWOULDBLOCK);
2248 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2249 StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2252 if (v->isDeleted() || v->isTempDeletedItem() ||
2253 v->isTempNonExistentItem()) {
2258 if (!v->isResident()) {
2259 bgFetch(key, vbucket, cookie);
2260 return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
2262 if (v->isLocked(ep_current_time())) {
2263 GetValue rv(NULL, ENGINE_KEY_EEXISTS, 0);
2267 bool exptime_mutated = exptime != v->getExptime() ? true : false;
2268 if (exptime_mutated) {
2270 v->setExptime(exptime);
2273 GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
2274 ENGINE_SUCCESS, v->getBySeqno());
2276 if (exptime_mutated) {
2277 if (vb->getState() == vbucket_state_active) {
2278 // persist the item in the underlying storage for
2279 // mutated exptime but only if VB is active.
2280 queueDirty(vb, v, &lh, NULL);
2285 if (eviction_policy == VALUE_ONLY) {
2289 if (vb->maybeKeyExistsInFilter(key)) {
2290 ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
2293 return GetValue(NULL, ec, -1, true);
2295 // As bloomfilter predicted that item surely doesn't exist
2296 // on disk, return ENOENT for getAndUpdateTtl().
2305 EventuallyPersistentStore::statsVKey(const std::string &key,
2307 const void *cookie) {
2308 RCPtr<VBucket> vb = getVBucket(vbucket);
2310 return ENGINE_NOT_MY_VBUCKET;
2314 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2315 StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2318 if (v->isDeleted() || v->isTempDeletedItem() ||
2319 v->isTempNonExistentItem()) {
2320 return ENGINE_KEY_ENOENT;
2323 cb_assert(bgFetchQueue > 0);
2324 ExecutorPool* iom = ExecutorPool::get();
2325 ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
2326 v->getBySeqno(), cookie,
2327 bgFetchDelay, false);
2328 iom->schedule(task, READER_TASK_IDX);
2329 return ENGINE_EWOULDBLOCK;
2331 if (eviction_policy == VALUE_ONLY) {
2332 return ENGINE_KEY_ENOENT;
2334 add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2338 return ENGINE_ENOMEM;
2342 case ADD_TMP_AND_BG_FETCH:
2343 // Since the hashtable bucket is locked, we shouldn't get here
2348 cb_assert(bgFetchQueue > 0);
2349 ExecutorPool* iom = ExecutorPool::get();
2350 ExTask task = new VKeyStatBGFetchTask(&engine, key,
2351 vbucket, -1, cookie,
2352 bgFetchDelay, false);
2353 iom->schedule(task, READER_TASK_IDX);
2356 return ENGINE_EWOULDBLOCK;
2361 void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
2364 uint64_t bySeqNum) {
2365 RememberingCallback<GetValue> gcb;
2367 getROUnderlying(vbid)->get(key, vbid, gcb);
2369 cb_assert(gcb.fired);
2371 if (eviction_policy == FULL_EVICTION) {
2372 RCPtr<VBucket> vb = getVBucket(vbid);
2375 LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
2376 StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2377 if (v && v->isTempInitialItem()) {
2378 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2379 v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
2380 cb_assert(v->isResident());
2381 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
2382 v->setStoredValueState(
2383 StoredValue::state_non_existent_key);
2385 // underlying kvstore couldn't fetch requested data
2386 // log returned error and notify TMPFAIL to client
2387 LOG(EXTENSION_LOG_WARNING,
2388 "Warning: failed background fetch for vb=%d "
2389 "seq=%" PRId64 " key=%s", vbid, v->getBySeqno(),
2396 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2397 engine.addLookupResult(cookie, gcb.val.getValue());
2399 engine.addLookupResult(cookie, NULL);
2403 engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
2406 bool EventuallyPersistentStore::getLocked(const std::string &key,
2408 Callback<GetValue> &cb,
2409 rel_time_t currentTime,
2410 uint32_t lockTimeout,
2411 const void *cookie) {
2412 RCPtr<VBucket> vb = getVBucket(vbucket);
2413 if (!vb || vb->getState() != vbucket_state_active) {
2414 ++stats.numNotMyVBuckets;
2415 GetValue rv(NULL, ENGINE_NOT_MY_VBUCKET);
2421 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2422 StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2425 if (v->isDeleted() || v->isTempNonExistentItem() ||
2426 v->isTempDeletedItem()) {
2432 // if v is locked return error
2433 if (v->isLocked(currentTime)) {
2439 // If the value is not resident, wait for it...
2440 if (!v->isResident()) {
2442 bgFetch(key, vbucket, cookie);
2444 GetValue rv(NULL, ENGINE_EWOULDBLOCK, -1, true);
2449 // acquire lock and increment cas value
2450 v->lock(currentTime + lockTimeout);
2452 Item *it = v->toItem(false, vbucket);
2453 it->setCas(vb->nextHLCCas());
2454 v->setCas(it->getCas());
2460 if (eviction_policy == VALUE_ONLY) {
2465 if (vb->maybeKeyExistsInFilter(key)) {
2466 ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
2469 GetValue rv(NULL, ec, -1, true);
2473 // As bloomfilter predicted that item surely doesn't exist
2474 // on disk, return ENOENT for getLocked().
2484 EventuallyPersistentStore::unlockKey(const std::string &key,
2487 rel_time_t currentTime)
2490 RCPtr<VBucket> vb = getVBucket(vbucket);
2491 if (!vb || vb->getState() != vbucket_state_active) {
2492 ++stats.numNotMyVBuckets;
2493 return ENGINE_NOT_MY_VBUCKET;
2497 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2498 StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2501 if (v->isDeleted() || v->isTempNonExistentItem() ||
2502 v->isTempDeletedItem()) {
2503 return ENGINE_KEY_ENOENT;
2505 if (v->isLocked(currentTime)) {
2506 if (v->getCas() == cas) {
2508 return ENGINE_SUCCESS;
2511 return ENGINE_TMPFAIL;
2513 if (eviction_policy == VALUE_ONLY) {
2514 return ENGINE_KEY_ENOENT;
2516 // With the full eviction, an item's lock is automatically
2517 // released when the item is evicted from memory. Therefore,
2518 // we simply return ENGINE_TMPFAIL when we receive unlockKey
2519 // for an item that is not in memocy cache. Note that we don't
2520 // spawn any bg fetch job to figure out if an item actually
2521 // exists in disk or not.
2522 return ENGINE_TMPFAIL;
2528 ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
2529 const std::string &key,
2532 struct key_stats &kstats,
2536 RCPtr<VBucket> vb = getVBucket(vbucket);
2538 return ENGINE_NOT_MY_VBUCKET;
2542 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2543 StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2546 if ((v->isDeleted() && !wantsDeleted) ||
2547 v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2548 return ENGINE_KEY_ENOENT;
2550 if (eviction_policy == FULL_EVICTION &&
2551 v->isTempInitialItem() && bgfetch) {
2553 bgFetch(key, vbucket, cookie, true);
2554 return ENGINE_EWOULDBLOCK;
2556 kstats.logically_deleted = v->isDeleted();
2557 kstats.dirty = v->isDirty();
2558 kstats.exptime = v->getExptime();
2559 kstats.flags = v->getFlags();
2560 kstats.cas = v->getCas();
2561 kstats.vb_state = vb->getState();
2562 return ENGINE_SUCCESS;
2564 if (eviction_policy == VALUE_ONLY) {
2565 return ENGINE_KEY_ENOENT;
2567 if (bgfetch && vb->maybeKeyExistsInFilter(key)) {
2568 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2571 // If bgFetch were false, or bloomfilter predicted that
2572 // item surely doesn't exist on disk, return ENOENT for
2574 return ENGINE_KEY_ENOENT;
2580 std::string EventuallyPersistentStore::validateKey(const std::string &key,
2584 RCPtr<VBucket> vb = getVBucket(vbucket);
2585 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2586 StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
2590 if (v->isDeleted() || v->isTempNonExistentItem() ||
2591 v->isTempDeletedItem()) {
2592 return "item_deleted";
2595 if (diskItem.getFlags() != v->getFlags()) {
2596 return "flags_mismatch";
2597 } else if (v->isResident() && memcmp(diskItem.getData(),
2598 v->getValue()->getData(),
2599 diskItem.getNBytes())) {
2600 return "data_mismatch";
2605 return "item_deleted";
2610 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
2615 ItemMetaData *itemMeta,
2616 mutation_descr_t *mutInfo,
2619 RCPtr<VBucket> vb = getVBucket(vbucket);
2620 if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2621 ++stats.numNotMyVBuckets;
2622 return ENGINE_NOT_MY_VBUCKET;
2623 } else if(vb->getState() == vbucket_state_replica && !force) {
2624 ++stats.numNotMyVBuckets;
2625 return ENGINE_NOT_MY_VBUCKET;
2626 } else if(vb->getState() == vbucket_state_pending && !force) {
2627 if (vb->addPendingOp(cookie)) {
2628 return ENGINE_EWOULDBLOCK;
2633 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2634 StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2635 if (!v || v->isDeleted() || v->isTempItem()) {
2636 if (eviction_policy == VALUE_ONLY) {
2637 return ENGINE_KEY_ENOENT;
2638 } else { // Full eviction.
2640 if (!v) { // Item might be evicted from cache.
2641 if (vb->maybeKeyExistsInFilter(key)) {
2642 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2645 // As bloomfilter predicted that item surely doesn't
2646 // exist on disk, return ENOENT for deleteItem().
2647 return ENGINE_KEY_ENOENT;
2649 } else if (v->isTempInitialItem()) {
2651 bgFetch(key, vbucket, cookie, true);
2652 return ENGINE_EWOULDBLOCK;
2653 } else { // Non-existent or deleted key.
2654 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2655 // Delete a temp non-existent item to ensure that
2656 // if a delete were issued over an item that doesn't
2657 // exist, then we don't preserve a temp item.
2658 vb->ht.unlocked_del(key, bucket_num);
2660 return ENGINE_KEY_ENOENT;
2663 if (!v) { // Item might be evicted from cache.
2664 // Create a temp item and delete it below as it is a
2665 // force deletion, only if bloomfilter predicts that
2666 // item may exist on disk.
2667 if (vb->maybeKeyExistsInFilter(key)) {
2668 add_type_t rv = vb->ht.unlocked_addTempItem(
2672 if (rv == ADD_NOMEM) {
2673 return ENGINE_ENOMEM;
2675 v = vb->ht.unlocked_find(key, bucket_num, true, false);
2676 v->setStoredValueState(StoredValue::state_deleted_key);
2678 return ENGINE_KEY_ENOENT;
2680 } else if (v->isTempInitialItem()) {
2681 v->setStoredValueState(StoredValue::state_deleted_key);
2682 } else { // Non-existent or deleted key.
2683 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2684 // Delete a temp non-existent item to ensure that
2685 // if a delete were issued over an item that doesn't
2686 // exist, then we don't preserve a temp item.
2687 vb->ht.unlocked_del(key, bucket_num);
2689 return ENGINE_KEY_ENOENT;
2695 if (v && v->isLocked(ep_current_time()) &&
2696 (vb->getState() == vbucket_state_replica ||
2697 vb->getState() == vbucket_state_pending)) {
2700 mutation_type_t delrv;
2701 delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
2703 if (itemMeta && v) {
2704 itemMeta->revSeqno = v->getRevSeqno();
2705 itemMeta->cas = v->getCas();
2706 itemMeta->flags = v->getFlags();
2707 itemMeta->exptime = v->getExptime();
2709 *cas = v ? v->getCas() : 0;
2712 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2715 ret = ENGINE_ENOMEM;
2717 case INVALID_VBUCKET:
2718 ret = ENGINE_NOT_MY_VBUCKET;
2721 ret = ENGINE_KEY_EEXISTS;
2724 ret = ENGINE_TMPFAIL;
2727 ret = ENGINE_KEY_ENOENT;
2729 queueDirty(vb, v, &lh, NULL, tapBackfill);
2734 queueDirty(vb, v, &lh, &seqno, tapBackfill);
2735 mutInfo->seqno = seqno;
2736 mutInfo->vbucket_uuid = vb->failovers->getLatestUUID();
2739 // We already figured out if a bg fetch is requred for a full-evicted
2746 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
2747 const std::string &key,
2753 ItemMetaData *itemMeta,
2757 ExtendedMetaData *emd,
2760 RCPtr<VBucket> vb = getVBucket(vbucket);
2761 if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2762 ++stats.numNotMyVBuckets;
2763 return ENGINE_NOT_MY_VBUCKET;
2764 } else if(vb->getState() == vbucket_state_replica && !force) {
2765 ++stats.numNotMyVBuckets;
2766 return ENGINE_NOT_MY_VBUCKET;
2767 } else if(vb->getState() == vbucket_state_pending && !force) {
2768 if (vb->addPendingOp(cookie)) {
2769 return ENGINE_EWOULDBLOCK;
2773 //check for the incoming item's CAS validity
2774 if (!Item::isValidCas(itemMeta->cas)) {
2775 return ENGINE_KEY_EEXISTS;
2779 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2780 StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2781 if (!force) { // Need conflict resolution.
2783 if (v->isTempInitialItem()) {
2784 bgFetch(key, vbucket, cookie, true);
2785 return ENGINE_EWOULDBLOCK;
2788 enum conflict_resolution_mode confResMode = revision_seqno;
2790 confResMode = static_cast<enum conflict_resolution_mode>(
2791 emd->getConflictResMode());
2794 if (!conflictResolver->resolve(vb, v, *itemMeta, true, confResMode)) {
2795 ++stats.numOpsDelMetaResolutionFailed;
2796 return ENGINE_KEY_EEXISTS;
2799 // Item is 1) deleted or not existent in the value eviction case OR
2800 // 2) deleted or evicted in the full eviction.
2801 if (vb->maybeKeyExistsInFilter(key)) {
2802 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2803 cookie, true, isReplication);
2805 // Even though bloomfilter predicted that item doesn't exist
2806 // on disk, we must put this delete on disk if the cas is valid.
2807 add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2810 if (rv == ADD_NOMEM) {
2811 return ENGINE_ENOMEM;
2813 v = vb->ht.unlocked_find(key, bucket_num, true, false);
2814 v->setStoredValueState(StoredValue::state_deleted_key);
2819 // We should always try to persist a delete here.
2820 add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2823 if (rv == ADD_NOMEM) {
2824 return ENGINE_ENOMEM;
2826 v = vb->ht.unlocked_find(key, bucket_num, true, false);
2827 v->setStoredValueState(StoredValue::state_deleted_key);
2829 } else if (v->isTempInitialItem()) {
2830 v->setStoredValueState(StoredValue::state_deleted_key);
2835 if (v && v->isLocked(ep_current_time()) &&
2836 (vb->getState() == vbucket_state_replica ||
2837 vb->getState() == vbucket_state_pending)) {
2840 mutation_type_t delrv;
2841 delrv = vb->ht.unlocked_softDelete(v, *cas, *itemMeta,
2842 eviction_policy, true);
2843 *cas = v ? v->getCas() : 0;
2845 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2848 ret = ENGINE_ENOMEM;
2850 case INVALID_VBUCKET:
2851 ret = ENGINE_NOT_MY_VBUCKET;
2854 ret = ENGINE_KEY_EEXISTS;
2857 ret = ENGINE_TMPFAIL;
2860 ret = ENGINE_KEY_ENOENT;
2865 v->setBySeqno(bySeqno);
2868 /* set the conflict resolution mode from the extended meta data *
2869 * Given that the mode is already set, we don't need to set the *
2870 * conflict resolution mode in queueDirty */
2872 v->setConflictResMode(
2873 static_cast<enum conflict_resolution_mode>(
2874 emd->getConflictResMode()));
2876 vb->setMaxCas(v->getCas());
2877 queueDirty(vb, v, &lh, seqno, tapBackfill, true, genBySeqno, false);
2881 bgFetch(key, vbucket, cookie, true);
2882 ret = ENGINE_EWOULDBLOCK;
2885 // Update drift counter for vbucket upon a success only
2886 if (ret == ENGINE_SUCCESS && emd) {
2887 vb->setDriftCounter(emd->getAdjustedTime());
2893 void EventuallyPersistentStore::reset() {
2894 std::vector<int> buckets = vbMap.getBuckets();
2895 std::vector<int>::iterator it;
2896 for (it = buckets.begin(); it != buckets.end(); ++it) {
2897 RCPtr<VBucket> vb = getVBucket(*it);
2899 LockHolder lh(vb_mutexes[vb->getId()]);
2901 vb->checkpointManager.clear(vb->getState());
2903 vb->setPersistedSnapshot(0, 0);
2907 ++stats.diskQueueSize;
2908 bool inverse = true;
2909 flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, false);
2910 // Waking up (notifying) one flusher is good enough for diskFlushAll
2911 vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
2915 * Callback invoked after persisting an item from memory to disk.
2917 * This class exists to create a closure around a few variables within
2918 * EventuallyPersistentStore::flushOne so that an object can be
2919 * requeued in case of failure to store in the underlying layer.
2921 class PersistenceCallback : public Callback<mutation_result>,
2922 public Callback<int> {
2925 PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
2926 EventuallyPersistentStore *st, EPStats *s, uint64_t c)
2927 : queuedItem(qi), vbucket(vb), store(st), stats(s), cas(c) {
2932 // This callback is invoked for set only.
2933 void callback(mutation_result &value) {
2934 if (value.first == 1) {
2936 LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2938 StoredValue *v = store->fetchValidValue(vbucket,
2939 queuedItem->getKey(),
2940 bucket_num, true, false);
2942 if (v->getCas() == cas) {
2943 // mark this item clean only if current and stored cas
2947 if (v->isNewCacheItem()) {
2949 // Insert in value-only or full eviction mode.
2950 ++vbucket->opsCreate;
2951 vbucket->incrMetaDataDisk(*queuedItem);
2952 } else { // Update in full eviction mode.
2953 vbucket->ht.decrNumTotalItems();
2954 ++vbucket->opsUpdate;
2956 v->setNewCacheItem(false);
2957 } else { // Update in value-only or full eviction mode.
2958 ++vbucket->opsUpdate;
2962 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2963 stats->decrDiskQueueSize(1);
2964 stats->totalPersisted++;
2966 // If the return was 0 here, we're in a bad state because
2967 // we do not know the rowid of this object.
2968 if (value.first == 0) {
2970 LockHolder lh = vbucket->ht.getLockedBucket(
2971 queuedItem->getKey(), &bucket_num);
2972 StoredValue *v = store->fetchValidValue(vbucket,
2973 queuedItem->getKey(),
2977 std::stringstream ss;
2978 ss << "Persisting ``" << queuedItem->getKey() << "'' on vb"
2979 << queuedItem->getVBucketId() << " (rowid="
2980 << v->getBySeqno() << ") returned 0 updates\n";
2981 LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2983 LOG(EXTENSION_LOG_WARNING,
2984 "Error persisting now missing ``%s'' from vb%d",
2985 queuedItem->getKey().c_str(),
2986 queuedItem->getVBucketId());
2989 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2990 stats->decrDiskQueueSize(1);
2992 std::stringstream ss;
2994 "Fatal error in persisting SET ``" <<
2995 queuedItem->getKey() << "'' on vb "
2996 << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2997 LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
3003 // This callback is invoked for deletions only.
3005 // The boolean indicates whether the underlying storage
3006 // successfully deleted the item.
3007 void callback(int &value) {
3008 // > 1 would be bad. We were only trying to delete one row.
3009 cb_assert(value < 2);
3011 // 1 means we deleted one row
3012 // 0 means we did not delete a row, but did not fail (did not exist)
3014 // We have succesfully removed an item from the disk, we
3015 // may now remove it from the hash table.
3017 LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
3019 StoredValue *v = store->fetchValidValue(vbucket,
3020 queuedItem->getKey(),
3021 bucket_num, true, false);
3022 if (v && v->isDeleted()) {
3023 bool newCacheItem = v->isNewCacheItem();
3024 bool deleted = vbucket->ht.unlocked_del(queuedItem->getKey(),
3027 if (newCacheItem && value > 0) {
3028 // Need to decrement the item counter again for an item that
3029 // exists on DB file, but not in memory (i.e., full eviction),
3030 // because we created the temp item in memory and incremented
3031 // the item counter when a deletion is pushed in the queue.
3032 vbucket->ht.decrNumTotalItems();
3037 * Deleted items are to be added to the bloomfilter,
3038 * in either eviction policy.
3040 vbucket->addToFilter(queuedItem->getKey());
3043 ++stats->totalPersisted;
3044 ++vbucket->opsDelete;
3046 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
3047 stats->decrDiskQueueSize(1);
3048 vbucket->decrMetaDataDisk(*queuedItem);
3050 std::stringstream ss;
3051 ss << "Fatal error in persisting DELETE ``" <<
3052 queuedItem->getKey() << "'' on vb "
3053 << queuedItem->getVBucketId() << "!!! Requeue it...\n";
3054 LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
3062 if (store->vbMap.isBucketDeletion(vbucket->getId())) {
3063 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
3064 stats->decrDiskQueueSize(1);
3067 ++stats->flushFailed;
3068 store->invokeOnLockedStoredValue(queuedItem->getKey(),
3069 queuedItem->getVBucketId(),
3070 &StoredValue::reDirty);
3071 vbucket->rejectQueue.push(queuedItem);
3074 const queued_item queuedItem;
3075 RCPtr<VBucket> &vbucket;
3076 EventuallyPersistentStore *store;
3079 DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
3082 bool EventuallyPersistentStore::scheduleFlushAllTask(const void* cookie,
3084 bool inverse = false;
3085 if (diskFlushAll.compare_exchange_strong(inverse, true)) {
3086 flushAllTaskCtx.cookie = cookie;
3087 flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
3088 ExTask task = new FlushAllTask(&engine, static_cast<double>(when));
3089 ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
3096 void EventuallyPersistentStore::setFlushAllComplete() {
3097 // Notify memcached about flushAll task completion, and
3098 // set diskFlushall flag to false
3099 if (flushAllTaskCtx.cookie) {
3100 engine.notifyIOComplete(flushAllTaskCtx.cookie, ENGINE_SUCCESS);
3102 bool inverse = false;
3103 flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
3105 diskFlushAll.compare_exchange_strong(inverse, false);
3108 void EventuallyPersistentStore::flushOneDeleteAll() {
3109 for (size_t i = 0; i < vbMap.getSize(); ++i) {
3110 RCPtr<VBucket> vb = getVBucket(i);
3112 LockHolder lh(vb_mutexes[vb->getId()]);
3113 getRWUnderlying(vb->getId())->reset(i);
3117 stats.decrDiskQueueSize(1);
3118 setFlushAllComplete();
3121 int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
3122 KVShard *shard = vbMap.getShard(vbid);
3123 if (diskFlushAll && !flushAllTaskCtx.delayFlushAll) {
3124 if (shard->getId() == EP_PRIMARY_SHARD) {
3125 flushOneDeleteAll();
3127 // disk flush is pending just return
3132 if (vbMap.isBucketCreation(vbid)) {
3133 return RETRY_FLUSH_VBUCKET;
3136 int items_flushed = 0;
3137 rel_time_t flush_start = ep_current_time();
3139 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3141 LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
3142 if (!lh.islocked()) { // Try another bucket if this one is locked
3143 return RETRY_FLUSH_VBUCKET; // to avoid blocking flusher
3146 KVStatsCallback cb(this);
3147 std::vector<queued_item> items;
3148 KVStore *rwUnderlying = getRWUnderlying(vbid);
3150 while (!vb->rejectQueue.empty()) {
3151 items.push_back(vb->rejectQueue.front());
3152 vb->rejectQueue.pop();
3155 const std::string cursor(CheckpointManager::pCursorName);
3156 vb->getBackfillItems(items);
3158 snapshot_range_t range;
3159 range = vb->checkpointManager.getAllItemsForCursor(cursor, items);
3161 if (!items.empty()) {
3162 while (!rwUnderlying->begin()) {
3163 ++stats.beginFailed;
3164 LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
3165 "Retry in 1 sec ...");
3168 rwUnderlying->optimizeWrites(items);
3171 uint64_t maxSeqno = 0;
3172 uint64_t maxCas = 0;
3173 std::list<PersistenceCallback*> pcbs;
3174 std::vector<queued_item>::iterator it = items.begin();
3175 for(; it != items.end(); ++it) {
3176 if ((*it)->getOperation() != queue_op_set &&
3177 (*it)->getOperation() != queue_op_del) {
3179 } else if (!prev || prev->getKey() != (*it)->getKey()) {
3182 PersistenceCallback *cb = flushOneDelOrSet(*it, vb);
3187 maxSeqno = std::max(maxSeqno, (uint64_t)(*it)->getBySeqno());
3188 maxCas = std::max(maxCas, (uint64_t)(*it)->getCas());
3189 ++stats.flusher_todo;
3191 stats.decrDiskQueueSize(1);
3192 vb->doStatsForFlushing(*(*it), (*it)->size());
3196 BlockTimer timer(&stats.diskCommitHisto, "disk_commit",
3198 hrtime_t start = gethrtime();
3200 if (vb->getState() == vbucket_state_active) {
3201 range.start = maxSeqno;
3202 range.end = maxSeqno;
3205 while (!rwUnderlying->commit(&cb, range.start, range.end, maxCas,
3206 vb->getDriftCounter())) {
3207 ++stats.commitFailed;
3208 LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
3214 if (vb->rejectQueue.empty()) {
3215 vb->setPersistedSnapshot(range.start, range.end);
3216 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
3217 if (highSeqno > 0 &&
3218 highSeqno != vbMap.getPersistenceSeqno(vbid)) {
3219 vbMap.setPersistenceSeqno(vbid, highSeqno);
3220 vb->notifySeqnoPersisted(highSeqno);
3224 while (!pcbs.empty()) {
3225 delete pcbs.front();
3229 ++stats.flusherCommits;
3230 hrtime_t end = gethrtime();
3231 uint64_t commit_time = (end - start) / 1000000;
3232 uint64_t trans_time = (end - flush_start) / 1000000;
3234 lastTransTimePerItem.store((items_flushed == 0) ? 0 :
3235 static_cast<double>(trans_time) /
3236 static_cast<double>(items_flushed));
3237 stats.commit_time.store(commit_time);
3238 stats.cumulativeCommitTime.fetch_add(commit_time);
3239 stats.cumulativeFlushTime.fetch_add(ep_current_time()
3241 stats.flusher_todo.store(0);
3244 rwUnderlying->pendingTasks();
3246 if (vb->checkpointManager.getNumCheckpoints() > 1) {
3247 wakeUpCheckpointRemover();
3250 if (vb->rejectQueue.empty()) {
3251 vb->checkpointManager.itemsPersisted();
3252 uint64_t seqno = vbMap.getPersistenceSeqno(vbid);
3253 uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
3254 vb->notifyOnPersistence(engine, seqno, true);
3255 vb->notifyOnPersistence(engine, chkid, false);
3256 if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
3257 vbMap.setPersistenceCheckpointId(vbid, chkid);
3260 return RETRY_FLUSH_VBUCKET;
3264 return items_flushed;
3267 PersistenceCallback*
3268 EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
3269 RCPtr<VBucket> &vb) {
3272 stats.decrDiskQueueSize(1);
3276 int64_t bySeqno = qi->getBySeqno();
3277 bool deleted = qi->isDeleted();
3278 rel_time_t queued(qi->getQueuedTime());
3280 int dirtyAge = ep_current_time() - queued;
3281 stats.dirtyAgeHisto.add(dirtyAge * 1000000);
3282 stats.dirtyAge.store(dirtyAge);
3283 stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
3284 stats.dirtyAgeHighWat.load()));
3286 // Wait until the vbucket database is created by the vbucket state
3288 if (vbMap.isBucketCreation(qi->getVBucketId()) ||
3289 vbMap.isBucketDeletion(qi->getVBucketId())) {
3290 vb->rejectQueue.push(qi);
3295 KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
3297 // TODO: Need to separate disk_insert from disk_update because
3298 // bySeqno doesn't give us that information.
3299 BlockTimer timer(bySeqno == -1 ?
3300 &stats.diskInsertHisto : &stats.diskUpdateHisto,
3301 bySeqno == -1 ? "disk_insert" : "disk_update",
3303 PersistenceCallback *cb =
3304 new PersistenceCallback(qi, vb, this, &stats, qi->getCas());
3305 rwUnderlying->set(*qi, *cb);
3308 BlockTimer timer(&stats.diskDelHisto, "disk_delete",
3310 PersistenceCallback *cb =
3311 new PersistenceCallback(qi, vb, this, &stats, 0);
3312 rwUnderlying->del(*qi, *cb);
3317 void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
3322 bool notifyReplicator,
3324 bool setConflictMode) {
3326 if (setConflictMode && (v->getConflictResMode() != last_write_wins) &&
3327 vb->isTimeSyncEnabled()) {
3328 v->setConflictResMode(last_write_wins);
3331 queued_item qi(v->toItem(false, vb->getId()));
3333 bool rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
3334 vb->checkpointManager.queueDirty(vb, qi,
3336 v->setBySeqno(qi->getBySeqno());
3339 *seqno = v->getBySeqno();
3347 KVShard* shard = vbMap.getShard(vb->getId());
3348 shard->getFlusher()->notifyFlushEvent();
3351 if (!tapBackfill && notifyReplicator) {
3352 engine.getTapConnMap().notifyVBConnections(vb->getId());
3353 engine.getDcpConnMap().notifyVBConnections(vb->getId(),
3359 std::vector<vbucket_state *> EventuallyPersistentStore::loadVBucketState()
3361 return getOneROUnderlying()->listPersistedVbuckets();
3364 void EventuallyPersistentStore::warmupCompleted() {
3365 // Run the vbucket state snapshot job once after the warmup
3366 scheduleVBSnapshot(VBSnapshotTask::Priority::HIGH);
3368 if (engine.getConfiguration().getAlogPath().length() > 0) {
3370 if (engine.getConfiguration().isAccessScannerEnabled()) {
3371 LockHolder lh(accessScanner.mutex);
3372 accessScanner.enabled = true;
3374 LOG(EXTENSION_LOG_WARNING, "Access Scanner task enabled");
3375 size_t smin = engine.getConfiguration().getAlogSleepTime();
3376 setAccessScannerSleeptime(smin);
3378 LockHolder lh(accessScanner.mutex);
3379 accessScanner.enabled = false;
3380 LOG(EXTENSION_LOG_WARNING, "Access Scanner task disabled");