1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2010 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
32 #include "access_scanner.h"
33 #include "checkpoint_remover.h"
34 #include "conflict_resolution.h"
36 #include "ep_engine.h"
37 #include "failover-table.h"
39 #include "htresizer.h"
43 #include "mutation_log.h"
46 #include "tapthrottle.h"
48 class StatsValueChangeListener : public ValueChangedListener {
50 StatsValueChangeListener(EPStats &st) : stats(st) {
54 virtual void sizeValueChanged(const std::string &key, size_t value) {
55 if (key.compare("max_size") == 0) {
56 stats.setMaxDataSize(value);
57 size_t low_wat = static_cast<size_t>
58 (static_cast<double>(value) * 0.6);
59 size_t high_wat = static_cast<size_t>(
60 static_cast<double>(value) * 0.75);
61 stats.mem_low_wat.store(low_wat);
62 stats.mem_high_wat.store(high_wat);
63 } else if (key.compare("mem_low_wat") == 0) {
64 stats.mem_low_wat.store(value);
65 } else if (key.compare("mem_high_wat") == 0) {
66 stats.mem_high_wat.store(value);
67 } else if (key.compare("tap_throttle_threshold") == 0) {
68 stats.tapThrottleThreshold.store(
69 static_cast<double>(value) / 100.0);
70 } else if (key.compare("warmup_min_memory_threshold") == 0) {
71 stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
72 } else if (key.compare("warmup_min_items_threshold") == 0) {
73 stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
75 LOG(EXTENSION_LOG_WARNING,
76 "Failed to change value for unknown variable, %s\n",
86 * A configuration value changed listener that responds to ep-engine
87 * parameter changes by invoking engine-specific methods on
88 * configuration change events.
90 class EPStoreValueChangeListener : public ValueChangedListener {
92 EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
95 virtual void sizeValueChanged(const std::string &key, size_t value) {
96 if (key.compare("bg_fetch_delay") == 0) {
97 store.setBGFetchDelay(static_cast<uint32_t>(value));
98 } else if (key.compare("compaction_write_queue_cap") == 0) {
99 store.setCompactionWriteQueueCap(value);
100 } else if (key.compare("exp_pager_stime") == 0) {
101 store.setExpiryPagerSleeptime(value);
102 } else if (key.compare("alog_sleep_time") == 0) {
103 store.setAccessScannerSleeptime(value);
104 } else if (key.compare("alog_task_time") == 0) {
105 store.resetAccessScannerStartTime();
106 } else if (key.compare("mutation_mem_threshold") == 0) {
107 double mem_threshold = static_cast<double>(value) / 100;
108 StoredValue::setMutationMemoryThreshold(mem_threshold);
109 } else if (key.compare("backfill_mem_threshold") == 0) {
110 double backfill_threshold = static_cast<double>(value) / 100;
111 store.setBackfillMemoryThreshold(backfill_threshold);
112 } else if (key.compare("compaction_exp_mem_threshold") == 0) {
113 store.setCompactionExpMemThreshold(value);
114 } else if (key.compare("tap_throttle_queue_cap") == 0) {
115 store.getEPEngine().getTapThrottle().setQueueCap(value);
116 } else if (key.compare("tap_throttle_cap_pcnt") == 0) {
117 store.getEPEngine().getTapThrottle().setCapPercent(value);
119 LOG(EXTENSION_LOG_WARNING,
120 "Failed to change value for unknown variable, %s\n",
125 virtual void booleanValueChanged(const std::string &key, bool value) {
126 if (key.compare("access_scanner_enabled") == 0) {
128 store.enableAccessScannerTask();
130 store.disableAccessScannerTask();
136 EventuallyPersistentStore &store;
139 class VBucketMemoryDeletionTask : public GlobalTask {
141 VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
142 RCPtr<VBucket> &vb, double delay) :
144 Priority::VBMemoryDeletionPriority, delay, true),
145 e(eng), vbucket(vb), vbid(vb->getId()) { }
147 std::string getDescription() {
148 std::stringstream ss;
149 ss << "Removing (dead) vbucket " << vbid << " from memory";
154 vbucket->notifyAllPendingConnsFailed(e);
161 EventuallyPersistentEngine &e;
162 RCPtr<VBucket> vbucket;
166 class PendingOpsNotification : public GlobalTask {
168 PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
169 GlobalTask(&e, Priority::VBMemoryDeletionPriority, 0, false),
170 engine(e), vbucket(vb) { }
172 std::string getDescription() {
173 std::stringstream ss;
174 ss << "Notify pending operations for vbucket " << vbucket->getId();
179 vbucket->fireAllOps(engine);
184 EventuallyPersistentEngine &engine;
185 RCPtr<VBucket> vbucket;
188 EventuallyPersistentStore::EventuallyPersistentStore(
189 EventuallyPersistentEngine &theEngine) :
190 engine(theEngine), stats(engine.getEpStats()),
191 vbMap(theEngine.getConfiguration(), *this),
193 diskFlushAll(false), bgFetchDelay(0), backfillMemoryThreshold(0.95),
194 statsSnapshotTaskId(0), lastTransTimePerItem(0)
196 cachedResidentRatio.activeRatio.store(0);
197 cachedResidentRatio.replicaRatio.store(0);
199 Configuration &config = engine.getConfiguration();
200 MutationLog *shardlog;
201 for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
204 shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
206 engine.getConfiguration().getAlogBlockSize());
207 accessLog.push_back(shardlog);
210 storageProperties = new StorageProperties(true, true, true, true);
212 stats.schedulingHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
213 stats.taskRuntimeHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
215 for (size_t i = 0; i < MAX_TYPE_ID; i++) {
216 stats.schedulingHisto[i].reset();
217 stats.taskRuntimeHisto[i].reset();
220 ExecutorPool::get()->registerBucket(ObjectRegistry::getCurrentEngine());
222 size_t num_vbs = config.getMaxVbuckets();
223 vb_mutexes = new Mutex[num_vbs];
224 schedule_vbstate_persist = new AtomicValue<bool>[num_vbs];
225 for (size_t i = 0; i < num_vbs; ++i) {
226 schedule_vbstate_persist[i] = false;
229 stats.memOverhead = sizeof(EventuallyPersistentStore);
231 if (config.getConflictResolutionType().compare("seqno") == 0) {
232 conflictResolver = new SeqBasedResolution();
235 stats.setMaxDataSize(config.getMaxSize());
236 config.addValueChangedListener("max_size",
237 new StatsValueChangeListener(stats));
239 stats.mem_low_wat.store(config.getMemLowWat());
240 config.addValueChangedListener("mem_low_wat",
241 new StatsValueChangeListener(stats));
243 stats.mem_high_wat.store(config.getMemHighWat());
244 config.addValueChangedListener("mem_high_wat",
245 new StatsValueChangeListener(stats));
247 stats.tapThrottleThreshold.store(static_cast<double>
248 (config.getTapThrottleThreshold())
250 config.addValueChangedListener("tap_throttle_threshold",
251 new StatsValueChangeListener(stats));
253 stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
254 config.addValueChangedListener("tap_throttle_queue_cap",
255 new EPStoreValueChangeListener(*this));
256 config.addValueChangedListener("tap_throttle_cap_pcnt",
257 new EPStoreValueChangeListener(*this));
259 setBGFetchDelay(config.getBgFetchDelay());
260 config.addValueChangedListener("bg_fetch_delay",
261 new EPStoreValueChangeListener(*this));
263 stats.warmupMemUsedCap.store(static_cast<double>
264 (config.getWarmupMinMemoryThreshold()) / 100.0);
265 config.addValueChangedListener("warmup_min_memory_threshold",
266 new StatsValueChangeListener(stats));
267 stats.warmupNumReadCap.store(static_cast<double>
268 (config.getWarmupMinItemsThreshold()) / 100.0);
269 config.addValueChangedListener("warmup_min_items_threshold",
270 new StatsValueChangeListener(stats));
272 double mem_threshold = static_cast<double>
273 (config.getMutationMemThreshold()) / 100;
274 StoredValue::setMutationMemoryThreshold(mem_threshold);
275 config.addValueChangedListener("mutation_mem_threshold",
276 new EPStoreValueChangeListener(*this));
278 double backfill_threshold = static_cast<double>
279 (config.getBackfillMemThreshold()) / 100;
280 setBackfillMemoryThreshold(backfill_threshold);
281 config.addValueChangedListener("backfill_mem_threshold",
282 new EPStoreValueChangeListener(*this));
284 compactionExpMemThreshold = config.getCompactionExpMemThreshold();
285 config.addValueChangedListener("compaction_exp_mem_threshold",
286 new EPStoreValueChangeListener(*this));
288 compactionWriteQueueCap = config.getCompactionWriteQueueCap();
289 config.addValueChangedListener("compaction_write_queue_cap",
290 new EPStoreValueChangeListener(*this));
292 const std::string &policy = config.getItemEvictionPolicy();
293 if (policy.compare("value_only") == 0) {
294 eviction_policy = VALUE_ONLY;
296 eviction_policy = FULL_EVICTION;
299 warmupTask = new Warmup(this);
302 bool EventuallyPersistentStore::initialize() {
303 // We should nuke everything unless we want warmup
304 Configuration &config = engine.getConfiguration();
305 if (!config.isWarmup()) {
309 if (!startFlusher()) {
310 LOG(EXTENSION_LOG_WARNING,
311 "FATAL: Failed to create and start flushers");
314 if (!startBgFetcher()) {
315 LOG(EXTENSION_LOG_WARNING,
316 "FATAL: Failed to create and start bgfetchers");
322 if (config.isFailpartialwarmup() && stats.warmOOM > 0) {
323 LOG(EXTENSION_LOG_WARNING,
324 "Warmup failed to load %d records due to OOM, exiting.\n",
325 static_cast<unsigned int>(stats.warmOOM));
329 itmpTask = new ItemPager(&engine, stats);
330 ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
332 size_t expiryPagerSleeptime = config.getExpPagerStime();
333 setExpiryPagerSleeptime(expiryPagerSleeptime);
334 config.addValueChangedListener("exp_pager_stime",
335 new EPStoreValueChangeListener(*this));
337 ExTask htrTask = new HashtableResizerTask(this, 10);
338 ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
340 size_t checkpointRemoverInterval = config.getChkRemoverStime();
341 chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
342 checkpointRemoverInterval);
343 ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
345 ExTask vbSnapshotTask = new DaemonVBSnapshotTask(&engine);
346 ExecutorPool::get()->schedule(vbSnapshotTask, WRITER_TASK_IDX);
348 ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
349 ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
354 EventuallyPersistentStore::~EventuallyPersistentStore() {
357 ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX);
359 ExecutorPool::get()->cancel(statsSnapshotTaskId);
360 LockHolder lh(accessScanner.mutex);
361 ExecutorPool::get()->cancel(accessScanner.task);
365 ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine());
367 delete [] vb_mutexes;
368 delete [] schedule_vbstate_persist;
369 delete [] stats.schedulingHisto;
370 delete [] stats.taskRuntimeHisto;
371 delete conflictResolver;
373 delete storageProperties;
375 std::vector<MutationLog*>::iterator it;
376 for (it = accessLog.begin(); it != accessLog.end(); it++) {
381 const Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
382 return vbMap.getShard(shardId)->getFlusher();
385 Warmup* EventuallyPersistentStore::getWarmup(void) const {
389 bool EventuallyPersistentStore::startFlusher() {
390 for (uint16_t i = 0; i < vbMap.numShards; ++i) {
391 Flusher *flusher = vbMap.shards[i]->getFlusher();
397 void EventuallyPersistentStore::stopFlusher() {
398 for (uint16_t i = 0; i < vbMap.numShards; i++) {
399 Flusher *flusher = vbMap.shards[i]->getFlusher();
400 bool rv = flusher->stop(stats.forceShutdown);
401 if (rv && !stats.forceShutdown) {
407 bool EventuallyPersistentStore::pauseFlusher() {
409 for (uint16_t i = 0; i < vbMap.numShards; i++) {
410 Flusher *flusher = vbMap.shards[i]->getFlusher();
411 if (!flusher->pause()) {
412 LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
413 "[%s], shard = %d", flusher->stateName(), i);
420 bool EventuallyPersistentStore::resumeFlusher() {
422 for (uint16_t i = 0; i < vbMap.numShards; i++) {
423 Flusher *flusher = vbMap.shards[i]->getFlusher();
424 if (!flusher->resume()) {
425 LOG(EXTENSION_LOG_WARNING,
426 "Warning: attempted to resume flusher in state [%s], "
427 "shard = %d", flusher->stateName(), i);
434 void EventuallyPersistentStore::wakeUpFlusher() {
435 if (stats.diskQueueSize.load() == 0) {
436 for (uint16_t i = 0; i < vbMap.numShards; i++) {
437 Flusher *flusher = vbMap.shards[i]->getFlusher();
443 bool EventuallyPersistentStore::startBgFetcher() {
444 for (uint16_t i = 0; i < vbMap.numShards; i++) {
445 BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
446 if (bgfetcher == NULL) {
447 LOG(EXTENSION_LOG_WARNING,
448 "Falied to start bg fetcher for shard %d", i);
456 void EventuallyPersistentStore::stopBgFetcher() {
457 for (uint16_t i = 0; i < vbMap.numShards; i++) {
458 BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
459 if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
460 LOG(EXTENSION_LOG_WARNING,
461 "Shutting down engine while there are still pending data "
462 "read for shard %d from database storage", i);
464 LOG(EXTENSION_LOG_INFO, "Stopping bg fetcher for underlying storage");
469 RCPtr<VBucket> EventuallyPersistentStore::getVBucket(uint16_t vbid,
470 vbucket_state_t wanted_state) {
471 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
472 vbucket_state_t found_state(vb ? vb->getState() : vbucket_state_dead);
473 if (found_state == wanted_state) {
482 EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
485 RCPtr<VBucket> vb = getVBucket(vbid);
487 // Obtain reader access to the VB state change lock so that
488 // the VB can't switch state whilst we're processing
489 ReaderLockHolder rlh(vb->getStateLock());
490 if (vb->getState() == vbucket_state_active) {
492 incExpirationStat(vb);
493 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
494 StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
496 if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
497 // This is a temporary item whose background fetch for metadata
499 bool deleted = vb->ht.unlocked_del(key, bucket_num);
501 } else if (v->isExpired(startTime) && !v->isDeleted()) {
502 vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
503 queueDirty(vb, v, &lh, false);
506 if (eviction_policy == FULL_EVICTION) {
507 // Create a temp item and delete and push it
508 // into the checkpoint queue.
509 add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
511 if (rv == ADD_NOMEM) {
514 v = vb->ht.unlocked_find(key, bucket_num, true, false);
515 v->setStoredValueState(StoredValue::state_deleted_key);
516 v->setRevSeqno(revSeqno);
517 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
518 queueDirty(vb, v, &lh, false);
526 EventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
527 std::string> > &keys) {
528 std::list<std::pair<uint16_t, std::string> >::iterator it;
529 time_t startTime = ep_real_time();
530 for (it = keys.begin(); it != keys.end(); it++) {
531 deleteExpiredItem(it->first, it->second, startTime, 0);
535 StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
536 const std::string &key,
541 StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
543 if (v && !v->isDeleted() && !v->isTempItem()) {
544 // In the deleted case, we ignore expiration time.
545 if (v->isExpired(ep_real_time())) {
546 if (vb->getState() != vbucket_state_active) {
547 return wantDeleted ? v : NULL;
550 incExpirationStat(vb, false);
551 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
552 queueDirty(vb, v, NULL, false, true);
554 return wantDeleted ? v : NULL;
560 protocol_binary_response_status EventuallyPersistentStore::evictKey(
561 const std::string &key,
566 RCPtr<VBucket> vb = getVBucket(vbucket);
567 if (!vb || (vb->getState() != vbucket_state_active && !force)) {
568 return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
572 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
573 StoredValue *v = fetchValidValue(vb, key, bucket_num, force, false);
575 protocol_binary_response_status rv(PROTOCOL_BINARY_RESPONSE_SUCCESS);
582 if (v->isResident()) {
583 if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
586 *msg = "Can't eject: Dirty object.";
587 rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
590 *msg = "Already ejected.";
593 if (eviction_policy == VALUE_ONLY) {
595 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
597 *msg = "Already ejected.";
604 ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
607 const std::string &key,
611 bool isReplication) {
613 add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
618 return ENGINE_ENOMEM;
622 case ADD_TMP_AND_BG_FETCH:
623 // Since the hashtable bucket is locked, we shouldn't get here
627 bgFetch(key, vb->getId(), -1, cookie, metadataOnly);
629 return ENGINE_EWOULDBLOCK;
632 ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
637 RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
638 if (!vb || vb->getState() == vbucket_state_dead) {
639 ++stats.numNotMyVBuckets;
640 return ENGINE_NOT_MY_VBUCKET;
641 } else if (vb->getState() == vbucket_state_replica && !force) {
642 ++stats.numNotMyVBuckets;
643 return ENGINE_NOT_MY_VBUCKET;
644 } else if (vb->getState() == vbucket_state_pending && !force) {
645 if (vb->addPendingOp(cookie)) {
646 return ENGINE_EWOULDBLOCK;
650 bool cas_op = (itm.getCas() != 0);
652 LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
653 StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
655 if (v && v->isLocked(ep_current_time()) &&
656 (vb->getState() == vbucket_state_replica ||
657 vb->getState() == vbucket_state_pending)) {
660 mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(),
662 eviction_policy, nru);
664 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
671 ret = ENGINE_KEY_EEXISTS;
675 ret = ENGINE_KEY_ENOENT;
680 // Even if the item was dirty, push it into the vbucket's open
683 queueDirty(vb, v, &lh);
686 { // CAS operation with non-resident item + full eviction.
688 // temp item is already created. Simply schedule a bg fetch job
690 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
691 return ENGINE_EWOULDBLOCK;
693 ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
697 case INVALID_VBUCKET:
698 ret = ENGINE_NOT_MY_VBUCKET;
705 ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
708 RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
709 if (!vb || vb->getState() == vbucket_state_dead ||
710 vb->getState() == vbucket_state_replica) {
711 ++stats.numNotMyVBuckets;
712 return ENGINE_NOT_MY_VBUCKET;
713 } else if(vb->getState() == vbucket_state_pending) {
714 if (vb->addPendingOp(cookie)) {
715 return ENGINE_EWOULDBLOCK;
719 if (itm.getCas() != 0) {
720 // Adding with a cas value doesn't make sense..
721 return ENGINE_NOT_STORED;
725 LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
726 StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
728 add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
733 return ENGINE_ENOMEM;
735 return ENGINE_NOT_STORED;
736 case ADD_TMP_AND_BG_FETCH:
737 return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
741 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
742 return ENGINE_EWOULDBLOCK;
745 queueDirty(vb, v, &lh);
748 return ENGINE_SUCCESS;
751 ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
752 const void *cookie) {
753 RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
754 if (!vb || vb->getState() == vbucket_state_dead ||
755 vb->getState() == vbucket_state_replica) {
756 ++stats.numNotMyVBuckets;
757 return ENGINE_NOT_MY_VBUCKET;
758 } else if (vb->getState() == vbucket_state_pending) {
759 if (vb->addPendingOp(cookie)) {
760 return ENGINE_EWOULDBLOCK;
765 LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
766 StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
769 if (v->isDeleted() || v->isTempDeletedItem() ||
770 v->isTempNonExistentItem()) {
771 return ENGINE_KEY_ENOENT;
774 mutation_type_t mtype;
775 if (eviction_policy == FULL_EVICTION && v->isTempInitialItem()) {
776 mtype = NEED_BG_FETCH;
778 mtype = vb->ht.unlocked_set(v, itm, 0, true, false, eviction_policy,
782 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
788 ret = ENGINE_KEY_EEXISTS;
792 ret = ENGINE_NOT_STORED;
796 // Even if the item was dirty, push it into the vbucket's open
799 queueDirty(vb, v, &lh);
803 // temp item is already created. Simply schedule a bg fetch job
805 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
806 ret = ENGINE_EWOULDBLOCK;
809 case INVALID_VBUCKET:
810 ret = ENGINE_NOT_MY_VBUCKET;
816 if (eviction_policy == VALUE_ONLY) {
817 return ENGINE_KEY_ENOENT;
820 return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
825 ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
829 RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
831 vb->getState() == vbucket_state_dead ||
832 vb->getState() == vbucket_state_active) {
833 ++stats.numNotMyVBuckets;
834 return ENGINE_NOT_MY_VBUCKET;
838 LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
839 StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
842 // Note that this function is only called on replica or pending vbuckets.
843 if (v && v->isLocked(ep_current_time())) {
846 mutation_type_t mtype = vb->ht.unlocked_set(v, itm, 0, true, true,
847 eviction_policy, nru);
849 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
856 ret = ENGINE_KEY_EEXISTS;
859 // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
860 // set correctly, and also the sequence numbers are ordered correctly.
865 queueDirty(vb, v, &lh, true, true, genBySeqno);
867 case INVALID_VBUCKET:
868 ret = ENGINE_NOT_MY_VBUCKET;
871 // SET on a non-active vbucket should not require a bg_metadata_fetch.
878 class KVStatsCallback : public Callback<kvstats_ctx> {
880 KVStatsCallback(EventuallyPersistentStore *store)
883 void callback(kvstats_ctx &ctx) {
884 RCPtr<VBucket> vb = epstore->getVBucket(ctx.vbucket);
886 vb->fileSpaceUsed = ctx.fileSpaceUsed;
887 vb->fileSize = ctx.fileSize;
892 EventuallyPersistentStore *epstore;
895 void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
898 class VBucketStateVisitor : public VBucketVisitor {
900 VBucketStateVisitor(VBucketMap &vb_map, uint16_t sid)
901 : vbuckets(vb_map), shardId(sid) { }
902 bool visitBucket(RCPtr<VBucket> &vb) {
903 if (vbuckets.getShard(vb->getId())->getId() == shardId) {
904 uint64_t snapStart = 0;
905 uint64_t snapEnd = 0;
906 std::string failovers = vb->failovers->toJSON();
907 uint64_t chkId = vbuckets.getPersistenceCheckpointId(vb->getId());
909 vb->getCurrentSnapshot(snapStart, snapEnd);
910 vbucket_state vb_state(vb->getState(), chkId, 0,
911 vb->getHighSeqno(), vb->getPurgeSeqno(),
912 snapStart, snapEnd, failovers);
913 states.insert(std::pair<uint16_t, vbucket_state>(vb->getId(), vb_state));
918 void visit(StoredValue*) {
919 cb_assert(false); // this does not happen
922 std::map<uint16_t, vbucket_state> states;
925 VBucketMap &vbuckets;
929 KVShard *shard = vbMap.shards[shardId];
930 if (priority == Priority::VBucketPersistLowPriority) {
931 shard->setLowPriorityVbSnapshotFlag(false);
933 shard->setHighPriorityVbSnapshotFlag(false);
936 KVStatsCallback kvcb(this);
937 VBucketStateVisitor v(vbMap, shard->getId());
939 hrtime_t start = gethrtime();
942 vbucket_map_t::reverse_iterator iter = v.states.rbegin();
943 for (; iter != v.states.rend(); ++iter) {
944 LockHolder lh(vb_mutexes[iter->first], true /*tryLock*/);
945 if (!lh.islocked()) {
948 KVStore *rwUnderlying = getRWUnderlying(iter->first);
949 if (!rwUnderlying->snapshotVBucket(iter->first, iter->second,
951 LOG(EXTENSION_LOG_WARNING,
952 "VBucket snapshot task failed!!! Rescheduling");
957 if (priority == Priority::VBucketPersistHighPriority) {
958 if (vbMap.setBucketCreation(iter->first, false)) {
959 LOG(EXTENSION_LOG_INFO, "VBucket %d created", iter->first);
965 scheduleVBSnapshot(priority, shard->getId());
967 stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
971 bool EventuallyPersistentStore::persistVBState(const Priority &priority,
973 schedule_vbstate_persist[vbid] = false;
975 RCPtr<VBucket> vb = getVBucket(vbid);
977 LOG(EXTENSION_LOG_WARNING,
978 "VBucket %d not exist!!! vb_state persistence task failed!!!", vbid);
982 bool inverse = false;
983 LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
984 if (!lh.islocked()) {
985 if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
993 KVStatsCallback kvcb(this);
994 uint64_t chkId = vbMap.getPersistenceCheckpointId(vbid);
995 std::string failovers = vb->failovers->toJSON();
996 uint64_t snapStart = 0;
997 uint64_t snapEnd = 0;
999 vb->getCurrentSnapshot(snapStart, snapEnd);
1000 vbucket_state vb_state(vb->getState(), chkId, 0, vb->getHighSeqno(),
1001 vb->getPurgeSeqno(), snapStart, snapEnd, failovers);
1003 KVStore *rwUnderlying = getRWUnderlying(vbid);
1004 if (rwUnderlying->snapshotVBucket(vbid, vb_state, &kvcb)) {
1005 if (vbMap.setBucketCreation(vbid, false)) {
1006 LOG(EXTENSION_LOG_INFO, "VBucket %d created", vbid);
1009 LOG(EXTENSION_LOG_WARNING,
1010 "VBucket %d: vb_state persistence task failed!!! Rescheduling", vbid);
1012 if (schedule_vbstate_persist[vbid].compare_exchange_strong(inverse,
1022 ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
1026 // Lock to prevent a race condition between a failed update and add.
1027 LockHolder lh(vbsetMutex);
1028 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1029 if (vb && to == vb->getState()) {
1030 return ENGINE_SUCCESS;
1034 vbucket_state_t oldstate = vb->getState();
1035 if (oldstate != to && notify_dcp) {
1036 engine.getDcpConnMap().vbucketStateChanged(vbid, to);
1039 vb->setState(to, engine.getServerApi());
1040 if (to == vbucket_state_active && !transfer) {
1041 uint64_t snapStart = 0;
1042 uint64_t snapEnd = 0;
1043 vb->getCurrentSnapshot(snapStart, snapEnd);
1044 if (snapEnd == vbMap.getPersistenceSeqno(vbid)) {
1045 vb->failovers->createEntry(snapEnd);
1047 vb->failovers->createEntry(snapStart);
1052 if (oldstate == vbucket_state_pending &&
1053 to == vbucket_state_active) {
1054 ExTask notifyTask = new PendingOpsNotification(engine, vb);
1055 ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
1057 scheduleVBStatePersist(Priority::VBucketPersistLowPriority, vbid);
1059 FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
1060 RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
1061 engine.getCheckpointConfig(),
1062 vbMap.getShard(vbid), 0, 0, 0, ft));
1063 // The first checkpoint for active vbucket should start with id 2.
1064 uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
1065 newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
1066 if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
1068 return ENGINE_ERANGE;
1070 vbMap.setPersistenceCheckpointId(vbid, 0);
1071 vbMap.setPersistenceSeqno(vbid, 0);
1072 vbMap.setBucketCreation(vbid, true);
1074 scheduleVBStatePersist(Priority::VBucketPersistHighPriority, vbid);
1076 return ENGINE_SUCCESS;
1079 bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
1080 KVShard *shard = NULL;
1081 if (p == Priority::VBucketPersistHighPriority) {
1082 for (size_t i = 0; i < vbMap.numShards; ++i) {
1083 shard = vbMap.shards[i];
1084 if (shard->setHighPriorityVbSnapshotFlag(true)) {
1085 ExTask task = new VBSnapshotTask(&engine, p, i, false);
1086 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1090 for (size_t i = 0; i < vbMap.numShards; ++i) {
1091 shard = vbMap.shards[i];
1092 if (shard->setLowPriorityVbSnapshotFlag(true)) {
1093 ExTask task = new VBSnapshotTask(&engine, p, i, false);
1094 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1098 if (stats.isShutdown) {
1104 void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p,
1107 KVShard *shard = vbMap.shards[shardId];
1108 if (p == Priority::VBucketPersistHighPriority) {
1109 if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
1110 ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
1111 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1114 if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
1115 ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
1116 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1121 void EventuallyPersistentStore::scheduleVBStatePersist(const Priority &priority,
1124 bool inverse = false;
1126 schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
1127 ExTask task = new VBStatePersistTask(&engine, priority, vbid, false);
1128 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1132 bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
1133 const void* cookie) {
1134 LockHolder lh(vbsetMutex);
1136 hrtime_t start_time(gethrtime());
1137 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1138 if (!vb || vb->getState() == vbucket_state_dead ||
1139 vbMap.isBucketDeletion(vbid)) {
1141 LockHolder vlh(vb_mutexes[vbid]);
1142 getRWUnderlying(vbid)->delVBucket(vbid);
1143 vbMap.setBucketDeletion(vbid, false);
1144 vbMap.setPersistenceSeqno(vbid, 0);
1145 ++stats.vbucketDeletions;
1148 hrtime_t spent(gethrtime() - start_time);
1149 hrtime_t wall_time = spent / 1000;
1150 BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
1151 stats.diskVBDelHisto.add(wall_time);
1152 atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
1153 stats.vbucketDelTotWalltime.fetch_add(wall_time);
1155 engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1161 void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
1164 ExTask delTask = new VBucketMemoryDeletionTask(engine, vb, delay);
1165 ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
1167 if (vbMap.setBucketDeletion(vb->getId(), true)) {
1168 ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie,
1169 Priority::VBucketDeletionPriority);
1170 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1174 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteVBucket(uint16_t vbid,
1176 // Lock to prevent a race condition between a failed update and add
1178 LockHolder lh(vbsetMutex);
1180 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1182 return ENGINE_NOT_MY_VBUCKET;
1185 engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
1186 vbMap.removeBucket(vbid);
1188 scheduleVBDeletion(vb, c);
1190 return ENGINE_EWOULDBLOCK;
1192 return ENGINE_SUCCESS;
1195 ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
1197 const void *cookie) {
1198 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1200 return ENGINE_NOT_MY_VBUCKET;
1203 LockHolder lh(compactionLock);
1204 ExTask task = new CompactVBucketTask(&engine, Priority::CompactorPriority,
1206 compactionTasks.push_back(std::make_pair(vbid, task));
1207 if (compactionTasks.size() > 1) {
1208 if ((stats.diskQueueSize > compactionWriteQueueCap &&
1209 compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
1210 engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
1211 // Snooze a new compaction task.
1212 // We will wake it up when one of the existing compaction tasks is done.
1217 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1219 LOG(EXTENSION_LOG_DEBUG, "Scheduled compaction task %d on vbucket %d,"
1220 "purge_before_ts = %lld, purge_before_seq = %lld, dropdeletes = %d",
1221 task->getId(), vbid, c.purge_before_ts,
1222 c.purge_before_seq, c.drop_deletes);
1224 return ENGINE_EWOULDBLOCK;
1227 class ExpiredItemsCallback : public Callback<compaction_ctx> {
1229 ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid)
1230 : epstore(store), vbucket(vbid) { }
1232 void callback(compaction_ctx &ctx) {
1233 std::list<expiredItemCtx>::iterator it;
1234 for (it = ctx.expiredItems.begin();
1235 it != ctx.expiredItems.end(); it++) {
1236 if (epstore->compactionCanExpireItems()) {
1237 epstore->deleteExpiredItem(vbucket, it->keyStr,
1245 EventuallyPersistentStore *epstore;
1249 bool EventuallyPersistentStore::compactVBucket(const uint16_t vbid,
1250 compaction_ctx *ctx,
1251 const void *cookie) {
1252 ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1253 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1255 LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1256 if (!lh.islocked()) {
1257 return true; // Schedule a compaction task again.
1260 if (vb->getState() == vbucket_state_active) {
1261 // Set the current time ONLY for active vbuckets.
1262 ctx->curr_time = ep_real_time();
1266 ExpiredItemsCallback cb(this, vbid);
1267 KVStatsCallback kvcb(this);
1268 getRWUnderlying(vbid)->compactVBucket(vbid, ctx, cb, kvcb);
1269 vb->setPurgeSeqno(ctx->max_purged_seq);
1271 err = ENGINE_NOT_MY_VBUCKET;
1272 engine.storeEngineSpecific(cookie, NULL);
1273 //Decrement session counter here, as memcached thread wouldn't
1274 //visit the engine interface in case of a NOT_MY_VB notification
1275 engine.decrementSessionCtr();
1279 LockHolder lh(compactionLock);
1280 bool erased = false, woke = false;
1281 std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
1282 while (it != compactionTasks.end()) {
1283 if ((*it).first == vbid) {
1284 it = compactionTasks.erase(it);
1287 ExTask &task = (*it).second;
1288 if (task->getState() == TASK_SNOOZED) {
1289 ExecutorPool::get()->wake(task->getId());
1294 if (erased && woke) {
1301 engine.notifyIOComplete(cookie, err);
1303 --stats.pendingCompactions;
1307 bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
1308 LockHolder lh(vbsetMutex);
1311 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1313 vbucket_state_t vbstate = vb->getState();
1315 vbMap.removeBucket(vbid);
1318 std::list<std::string> tap_cursors = vb->checkpointManager.
1319 getTAPCursorNames();
1320 // Delete and recreate the vbucket database file
1321 scheduleVBDeletion(vb, NULL, 0);
1322 setVBucketState(vbid, vbstate, false);
1324 // Copy the all cursors from the old vbucket into the new vbucket
1325 RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
1326 newvb->checkpointManager.resetTAPCursors(tap_cursors);
1336 EventuallyPersistentEngine* engine;
1337 std::map<std::string, std::string> smap;
1340 static void add_stat(const char *key, const uint16_t klen,
1341 const char *val, const uint32_t vlen,
1342 const void *cookie) {
1344 void *ptr = const_cast<void *>(cookie);
1345 snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1346 ObjectRegistry::onSwitchThread(snap->engine);
1348 std::string k(key, klen);
1349 std::string v(val, vlen);
1350 snap->smap.insert(std::pair<std::string, std::string>(k, v));
1354 void EventuallyPersistentStore::snapshotStats() {
1355 snapshot_stats_t snap;
1356 snap.engine = &engine;
1357 std::map<std::string, std::string> smap;
1358 bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1359 engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1360 engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS;
1362 if (rv && stats.isShutdown) {
1363 snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1365 std::stringstream ss;
1366 ss << ep_real_time();
1367 snap.smap["ep_shutdown_time"] = ss.str();
1369 getOneRWUnderlying()->snapshotStats(snap.smap);
1372 void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
1373 const hrtime_t start,
1374 const hrtime_t stop) {
1375 if (stop >= start && start >= init) {
1376 // skip the measurement if the counter wrapped...
1377 ++stats.bgNumOperations;
1378 hrtime_t w = (start - init) / 1000;
1379 BlockTimer::log(start - init, "bgwait", stats.timingLog);
1380 stats.bgWaitHisto.add(w);
1381 stats.bgWait.fetch_add(w);
1382 atomic_setIfLess(stats.bgMinWait, w);
1383 atomic_setIfBigger(stats.bgMaxWait, w);
1385 hrtime_t l = (stop - start) / 1000;
1386 BlockTimer::log(stop - start, "bgload", stats.timingLog);
1387 stats.bgLoadHisto.add(l);
1388 stats.bgLoad.fetch_add(l);
1389 atomic_setIfLess(stats.bgMinLoad, l);
1390 atomic_setIfBigger(stats.bgMaxLoad, l);
1394 void EventuallyPersistentStore::completeBGFetch(const std::string &key,
1400 hrtime_t start(gethrtime());
1402 RememberingCallback<GetValue> gcb;
1404 gcb.val.setPartial();
1405 ++stats.bg_meta_fetched;
1409 getROUnderlying(vbucket)->get(key, rowid, vbucket, gcb);
1411 cb_assert(gcb.fired);
1412 ENGINE_ERROR_CODE status = gcb.val.getStatus();
1414 // Lock to prevent a race condition between a fetch for restore and delete
1415 LockHolder lh(vbsetMutex);
1417 RCPtr<VBucket> vb = getVBucket(vbucket);
1420 LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1421 StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1423 if (v && v->unlocked_restoreMeta(gcb.val.getValue(),
1424 gcb.val.getStatus(), vb->ht)) {
1425 status = ENGINE_SUCCESS;
1428 bool restore = false;
1429 if (v && v->isResident()) {
1430 status = ENGINE_SUCCESS;
1432 switch (eviction_policy) {
1434 if (v && !v->isResident() && !v->isDeleted()) {
1440 if (v->isTempInitialItem() ||
1441 (!v->isResident() && !v->isDeleted())) {
1447 throw std::logic_error("Unknown eviction policy");
1452 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1453 v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1454 cb_assert(v->isResident());
1455 if (vb->getState() == vbucket_state_active &&
1456 v->getExptime() != gcb.val.getValue()->getExptime() &&
1457 v->getCas() == gcb.val.getValue()->getCas()) {
1458 // MB-9306: It is possible that by the time bgfetcher
1459 // returns, the item may have been updated and queued
1460 // Hence test the CAS value to be the same first.
1461 // exptime mutated, schedule it into new checkpoint
1462 queueDirty(vb, v, &hlh);
1464 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1465 v->setStoredValueState(
1466 StoredValue::state_non_existent_key);
1467 if (eviction_policy == FULL_EVICTION) {
1468 // For the full eviction, we should notify
1469 // ENGINE_SUCCESS to the memcached worker thread, so
1470 // that the worker thread can visit the ep-engine and
1471 // figure out the correct error code.
1472 status = ENGINE_SUCCESS;
1475 // underlying kvstore couldn't fetch requested data
1476 // log returned error and notify TMPFAIL to client
1477 LOG(EXTENSION_LOG_WARNING,
1478 "Warning: failed background fetch for vb=%d seq=%d "
1479 "key=%s", vbucket, v->getBySeqno(), key.c_str());
1480 status = ENGINE_TMPFAIL;
1485 LOG(EXTENSION_LOG_INFO, "VBucket %d's file was deleted in the middle of"
1486 " a bg fetch for key %s\n", vbucket, key.c_str());
1487 status = ENGINE_NOT_MY_VBUCKET;
1492 hrtime_t stop = gethrtime();
1493 updateBGStats(init, start, stop);
1496 delete gcb.val.getValue();
1497 engine.notifyIOComplete(cookie, status);
1500 void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
1501 std::vector<bgfetched_item_t> &fetchedItems,
1504 RCPtr<VBucket> vb = getVBucket(vbId);
1506 std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1507 for (; itemItr != fetchedItems.end(); ++itemItr) {
1508 engine.notifyIOComplete((*itemItr).second->cookie,
1509 ENGINE_NOT_MY_VBUCKET);
1511 LOG(EXTENSION_LOG_WARNING,
1512 "EP Store completes %d of batched background fetch for "
1513 "for vBucket = %d that is already deleted\n",
1514 (int)fetchedItems.size(), vbId);
1518 std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1519 for (; itemItr != fetchedItems.end(); ++itemItr) {
1520 VBucketBGFetchItem *bgitem = (*itemItr).second;
1521 ENGINE_ERROR_CODE status = bgitem->value.getStatus();
1522 Item *fetchedValue = bgitem->value.getValue();
1523 const std::string &key = (*itemItr).first;
1526 LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
1527 StoredValue *v = fetchValidValue(vb, key, bucket, true);
1528 if (bgitem->metaDataOnly) {
1529 if (v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht)) {
1530 status = ENGINE_SUCCESS;
1533 bool restore = false;
1534 if (v && v->isResident()) {
1535 status = ENGINE_SUCCESS;
1537 switch (eviction_policy) {
1539 if (v && !v->isResident() && !v->isDeleted()) {
1545 if (v->isTempInitialItem() ||
1546 (!v->isResident() && !v->isDeleted())) {
1552 throw std::logic_error("Unknown eviction policy");
1557 if (status == ENGINE_SUCCESS) {
1558 v->unlocked_restoreValue(fetchedValue, vb->ht);
1559 cb_assert(v->isResident());
1560 if (vb->getState() == vbucket_state_active &&
1561 v->getExptime() != fetchedValue->getExptime() &&
1562 v->getCas() == fetchedValue->getCas()) {
1563 // MB-9306: It is possible that by the time
1564 // bgfetcher returns, the item may have been
1565 // updated and queued
1566 // Hence test the CAS value to be the same first.
1567 // exptime mutated, schedule it into new checkpoint
1568 queueDirty(vb, v, &blh);
1570 } else if (status == ENGINE_KEY_ENOENT) {
1571 v->setStoredValueState(StoredValue::state_non_existent_key);
1572 if (eviction_policy == FULL_EVICTION) {
1573 // For the full eviction, we should notify
1574 // ENGINE_SUCCESS to the memcached worker thread,
1575 // so that the worker thread can visit the
1576 // ep-engine and figure out the correct error
1578 status = ENGINE_SUCCESS;
1581 // underlying kvstore couldn't fetch requested data
1582 // log returned error and notify TMPFAIL to client
1583 LOG(EXTENSION_LOG_WARNING,
1584 "Warning: failed background fetch for vb=%d "
1585 "key=%s", vbId, key.c_str());
1586 status = ENGINE_TMPFAIL;
1592 if (bgitem->metaDataOnly) {
1593 ++stats.bg_meta_fetched;
1598 hrtime_t endTime = gethrtime();
1599 updateBGStats(bgitem->initTime, startTime, endTime);
1600 engine.notifyIOComplete(bgitem->cookie, status);
1603 LOG(EXTENSION_LOG_DEBUG,
1604 "EP Store completes %d of batched background fetch "
1605 "for vBucket = %d endTime = %lld\n",
1606 fetchedItems.size(), vbId, gethrtime()/1000000);
1609 void EventuallyPersistentStore::bgFetch(const std::string &key,
1614 std::stringstream ss;
1616 if (multiBGFetchEnabled()) {
1617 RCPtr<VBucket> vb = getVBucket(vbucket);
1619 KVShard *myShard = vbMap.getShard(vbucket);
1621 // schedule to the current batch of background fetch of the given
1623 VBucketBGFetchItem * fetchThis = new VBucketBGFetchItem(cookie,
1625 vb->queueBGFetchItem(key, fetchThis, myShard->getBgFetcher());
1626 myShard->getBgFetcher()->notifyBGEvent();
1627 ss << "Queued a background fetch, now at "
1628 << vb->numPendingBGFetchItems() << std::endl;
1629 LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1632 stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
1633 bgFetchQueue.load());
1634 ExecutorPool* iom = ExecutorPool::get();
1635 ExTask task = new BGFetchTask(&engine, key, vbucket, rowid, cookie,
1637 Priority::BgFetcherGetMetaPriority,
1638 bgFetchDelay, false);
1639 iom->schedule(task, READER_TASK_IDX);
1640 ss << "Queued a background fetch, now at " << bgFetchQueue.load()
1642 LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1646 GetValue EventuallyPersistentStore::getInternal(const std::string &key,
1651 vbucket_state_t allowedState,
1652 bool trackReference) {
1654 vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1655 vbucket_state_replica : vbucket_state_active;
1656 RCPtr<VBucket> vb = getVBucket(vbucket);
1658 ++stats.numNotMyVBuckets;
1659 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1660 } else if (honorStates && vb->getState() == vbucket_state_dead) {
1661 ++stats.numNotMyVBuckets;
1662 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1663 } else if (honorStates && vb->getState() == disallowedState) {
1664 ++stats.numNotMyVBuckets;
1665 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1666 } else if (honorStates && vb->getState() == vbucket_state_pending) {
1667 if (vb->addPendingOp(cookie)) {
1668 return GetValue(NULL, ENGINE_EWOULDBLOCK);
1673 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1674 StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
1677 if (v->isDeleted() || v->isTempDeletedItem() ||
1678 v->isTempNonExistentItem()) {
1682 // If the value is not resident, wait for it...
1683 if (!v->isResident()) {
1685 bgFetch(key, vbucket, v->getBySeqno(), cookie);
1687 return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
1688 true, v->getNRUValue());
1691 GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
1692 ENGINE_SUCCESS, v->getBySeqno(), false, v->getNRUValue());
1695 if (eviction_policy == VALUE_ONLY || diskFlushAll) {
1699 ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1700 if (queueBG) { // Full eviction and need a bg fetch.
1701 ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
1704 return GetValue(NULL, ec, -1, true);
1708 GetValue EventuallyPersistentStore::getRandomKey() {
1709 long max = vbMap.getSize();
1711 long start = random() % max;
1715 while (itm == NULL) {
1716 RCPtr<VBucket> vb = getVBucket(curr++);
1717 while (!vb || vb->getState() != vbucket_state_active) {
1718 if (curr == start) {
1719 return GetValue(NULL, ENGINE_KEY_ENOENT);
1725 vb = getVBucket(curr++);
1728 if ((itm = vb->ht.getRandomKey(random())) != NULL) {
1729 GetValue rv(itm, ENGINE_SUCCESS);
1737 if (curr == start) {
1738 return GetValue(NULL, ENGINE_KEY_ENOENT);
1740 // Search next vbucket
1743 return GetValue(NULL, ENGINE_KEY_ENOENT);
1747 ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
1748 const std::string &key,
1751 ItemMetaData &metadata,
1753 bool trackReferenced)
1756 RCPtr<VBucket> vb = getVBucket(vbucket);
1757 if (!vb || vb->getState() == vbucket_state_dead ||
1758 vb->getState() == vbucket_state_replica) {
1759 ++stats.numNotMyVBuckets;
1760 return ENGINE_NOT_MY_VBUCKET;
1765 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1766 StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true,
1770 stats.numOpsGetMeta++;
1772 if (v->isTempInitialItem()) { // Need bg meta fetch.
1773 bgFetch(key, vbucket, -1, cookie, true);
1774 return ENGINE_EWOULDBLOCK;
1775 } else if (v->isTempNonExistentItem()) {
1776 metadata.cas = v->getCas();
1777 return ENGINE_KEY_ENOENT;
1779 if (v->isTempDeletedItem() || v->isDeleted() ||
1780 v->isExpired(ep_real_time())) {
1781 deleted |= GET_META_ITEM_DELETED_FLAG;
1784 if (v->isLocked(ep_current_time())) {
1785 metadata.cas = static_cast<uint64_t>(-1);
1787 metadata.cas = v->getCas();
1789 metadata.flags = v->getFlags();
1790 metadata.exptime = v->getExptime();
1791 metadata.revSeqno = v->getRevSeqno();
1792 return ENGINE_SUCCESS;
1795 // The key wasn't found. However, this may be because it was previously
1796 // deleted or evicted with the full eviction strategy.
1797 // So, add a temporary item corresponding to the key to the hash table
1798 // and schedule a background fetch for its metadata from the persistent
1799 // store. The item's state will be updated after the fetch completes.
1800 return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
1804 ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
1813 RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
1814 if (!vb || vb->getState() == vbucket_state_dead) {
1815 ++stats.numNotMyVBuckets;
1816 return ENGINE_NOT_MY_VBUCKET;
1817 } else if (vb->getState() == vbucket_state_replica && !force) {
1818 ++stats.numNotMyVBuckets;
1819 return ENGINE_NOT_MY_VBUCKET;
1820 } else if (vb->getState() == vbucket_state_pending && !force) {
1821 if (vb->addPendingOp(cookie)) {
1822 return ENGINE_EWOULDBLOCK;
1827 LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
1828 StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
1833 if (v->isTempInitialItem()) {
1834 bgFetch(itm.getKey(), itm.getVBucketId(), -1, cookie, true);
1835 return ENGINE_EWOULDBLOCK;
1837 if (!conflictResolver->resolve(v, itm.getMetaData(), false)) {
1838 ++stats.numOpsSetMetaResolutionFailed;
1839 return ENGINE_KEY_EEXISTS;
1842 return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1843 cookie, true, isReplication);
1847 if (v && v->isLocked(ep_current_time()) &&
1848 (vb->getState() == vbucket_state_replica ||
1849 vb->getState() == vbucket_state_pending)) {
1852 mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
1853 true, eviction_policy, nru,
1856 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1859 ret = ENGINE_ENOMEM;
1863 ret = ENGINE_KEY_EEXISTS;
1865 case INVALID_VBUCKET:
1866 ret = ENGINE_NOT_MY_VBUCKET;
1870 queueDirty(vb, v, &lh, false, true, genBySeqno);
1873 ret = ENGINE_KEY_ENOENT;
1876 { // CAS operation with non-resident item + full eviction.
1877 if (v) { // temp item is already created. Simply schedule a
1878 lh.unlock(); // bg fetch job.
1879 bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
1880 return ENGINE_EWOULDBLOCK;
1882 ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1883 cookie, true, isReplication);
1890 GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
1895 RCPtr<VBucket> vb = getVBucket(vbucket);
1897 ++stats.numNotMyVBuckets;
1898 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1899 } else if (vb->getState() == vbucket_state_dead) {
1900 ++stats.numNotMyVBuckets;
1901 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1902 } else if (vb->getState() == vbucket_state_replica) {
1903 ++stats.numNotMyVBuckets;
1904 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1905 } else if (vb->getState() == vbucket_state_pending) {
1906 if (vb->addPendingOp(cookie)) {
1907 return GetValue(NULL, ENGINE_EWOULDBLOCK);
1912 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1913 StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1916 if (v->isDeleted() || v->isTempDeletedItem() ||
1917 v->isTempNonExistentItem()) {
1922 if (!v->isResident()) {
1923 bgFetch(key, vbucket, v->getBySeqno(), cookie);
1924 return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
1926 if (v->isLocked(ep_current_time())) {
1927 GetValue rv(NULL, ENGINE_KEY_EEXISTS, 0);
1931 bool exptime_mutated = exptime != v->getExptime() ? true : false;
1932 if (exptime_mutated) {
1934 v->setExptime(exptime);
1937 GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
1938 ENGINE_SUCCESS, v->getBySeqno());
1940 if (exptime_mutated) {
1941 // persist the itme in the underlying storage for
1943 queueDirty(vb, v, &lh);
1947 if (eviction_policy == VALUE_ONLY) {
1951 ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
1953 return GetValue(NULL, ec, -1, true);
1959 EventuallyPersistentStore::statsVKey(const std::string &key,
1961 const void *cookie) {
1962 RCPtr<VBucket> vb = getVBucket(vbucket);
1964 return ENGINE_NOT_MY_VBUCKET;
1968 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1969 StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1972 if (v->isDeleted() || v->isTempDeletedItem() ||
1973 v->isTempNonExistentItem()) {
1974 return ENGINE_KEY_ENOENT;
1977 cb_assert(bgFetchQueue > 0);
1978 ExecutorPool* iom = ExecutorPool::get();
1979 ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
1980 v->getBySeqno(), cookie,
1981 Priority::VKeyStatBgFetcherPriority,
1982 bgFetchDelay, false);
1983 iom->schedule(task, READER_TASK_IDX);
1984 return ENGINE_EWOULDBLOCK;
1986 if (eviction_policy == VALUE_ONLY) {
1987 return ENGINE_KEY_ENOENT;
1989 add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
1993 return ENGINE_ENOMEM;
1997 case ADD_TMP_AND_BG_FETCH:
1998 // Since the hashtable bucket is locked, we shouldn't get here
2003 cb_assert(bgFetchQueue > 0);
2004 ExecutorPool* iom = ExecutorPool::get();
2005 ExTask task = new VKeyStatBGFetchTask(&engine, key,
2006 vbucket, -1, cookie,
2007 Priority::VKeyStatBgFetcherPriority,
2008 bgFetchDelay, false);
2009 iom->schedule(task, READER_TASK_IDX);
2012 return ENGINE_EWOULDBLOCK;
2017 void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
2020 uint64_t bySeqNum) {
2021 RememberingCallback<GetValue> gcb;
2023 getROUnderlying(vbid)->get(key, bySeqNum, vbid, gcb);
2025 cb_assert(gcb.fired);
2027 if (eviction_policy == FULL_EVICTION) {
2028 RCPtr<VBucket> vb = getVBucket(vbid);
2031 LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
2032 StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2033 if (v && v->isTempInitialItem()) {
2034 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2035 v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
2036 cb_assert(v->isResident());
2037 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
2038 v->setStoredValueState(
2039 StoredValue::state_non_existent_key);
2041 // underlying kvstore couldn't fetch requested data
2042 // log returned error and notify TMPFAIL to client
2043 LOG(EXTENSION_LOG_WARNING,
2044 "Warning: failed background fetch for vb=%d seq=%d "
2045 "key=%s", vbid, v->getBySeqno(), key.c_str());
2051 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2052 engine.addLookupResult(cookie, gcb.val.getValue());
2054 engine.addLookupResult(cookie, NULL);
2058 engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
2061 bool EventuallyPersistentStore::getLocked(const std::string &key,
2063 Callback<GetValue> &cb,
2064 rel_time_t currentTime,
2065 uint32_t lockTimeout,
2066 const void *cookie) {
2067 RCPtr<VBucket> vb = getVBucket(vbucket, vbucket_state_active);
2069 ++stats.numNotMyVBuckets;
2070 GetValue rv(NULL, ENGINE_NOT_MY_VBUCKET);
2076 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2077 StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2080 if (v->isDeleted() || v->isTempNonExistentItem() ||
2081 v->isTempDeletedItem()) {
2087 // if v is locked return error
2088 if (v->isLocked(currentTime)) {
2094 // If the value is not resident, wait for it...
2095 if (!v->isResident()) {
2097 bgFetch(key, vbucket, v->getBySeqno(), cookie);
2099 GetValue rv(NULL, ENGINE_EWOULDBLOCK, -1, true);
2104 // acquire lock and increment cas value
2105 v->lock(currentTime + lockTimeout);
2107 Item *it = v->toItem(false, vbucket);
2109 v->setCas(it->getCas());
2115 if (eviction_policy == VALUE_ONLY) {
2120 ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
2122 GetValue rv(NULL, ec, -1, true);
2130 EventuallyPersistentStore::unlockKey(const std::string &key,
2133 rel_time_t currentTime)
2136 RCPtr<VBucket> vb = getVBucket(vbucket, vbucket_state_active);
2138 ++stats.numNotMyVBuckets;
2139 return ENGINE_NOT_MY_VBUCKET;
2143 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2144 StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2147 if (v->isDeleted() || v->isTempNonExistentItem() ||
2148 v->isTempDeletedItem()) {
2149 return ENGINE_KEY_ENOENT;
2151 if (v->isLocked(currentTime)) {
2152 if (v->getCas() == cas) {
2154 return ENGINE_SUCCESS;
2157 return ENGINE_TMPFAIL;
2159 if (eviction_policy == VALUE_ONLY) {
2160 return ENGINE_KEY_ENOENT;
2162 // With the full eviction, an item's lock is automatically
2163 // released when the item is evicted from memory. Therefore,
2164 // we simply return ENGINE_TMPFAIL when we receive unlockKey
2165 // for an item that is not in memocy cache. Note that we don't
2166 // spawn any bg fetch job to figure out if an item actually
2167 // exists in disk or not.
2168 return ENGINE_TMPFAIL;
2174 ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
2175 const std::string &key,
2178 struct key_stats &kstats,
2182 RCPtr<VBucket> vb = getVBucket(vbucket);
2184 return ENGINE_NOT_MY_VBUCKET;
2188 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2189 StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2192 if ((v->isDeleted() && !wantsDeleted) ||
2193 v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2194 return ENGINE_KEY_ENOENT;
2196 if (eviction_policy == FULL_EVICTION &&
2197 v->isTempInitialItem() && bgfetch) {
2199 bgFetch(key, vbucket, -1, cookie, true);
2200 return ENGINE_EWOULDBLOCK;
2202 kstats.logically_deleted = v->isDeleted();
2203 kstats.dirty = v->isDirty();
2204 kstats.exptime = v->getExptime();
2205 kstats.flags = v->getFlags();
2206 kstats.cas = v->getCas();
2207 kstats.vb_state = vb->getState();
2208 return ENGINE_SUCCESS;
2210 if (eviction_policy == VALUE_ONLY) {
2211 return ENGINE_KEY_ENOENT;
2214 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2217 return ENGINE_KEY_ENOENT;
2223 std::string EventuallyPersistentStore::validateKey(const std::string &key,
2227 RCPtr<VBucket> vb = getVBucket(vbucket);
2228 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2229 StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
2233 if (v->isDeleted() || v->isTempNonExistentItem() ||
2234 v->isTempDeletedItem()) {
2235 return "item_deleted";
2238 if (diskItem.getFlags() != v->getFlags()) {
2239 return "flags_mismatch";
2240 } else if (v->isResident() && memcmp(diskItem.getData(),
2241 v->getValue()->getData(),
2242 diskItem.getNBytes())) {
2243 return "data_mismatch";
2248 return "item_deleted";
2253 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
2258 ItemMetaData *itemMeta,
2261 RCPtr<VBucket> vb = getVBucket(vbucket);
2262 if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2263 ++stats.numNotMyVBuckets;
2264 return ENGINE_NOT_MY_VBUCKET;
2265 } else if(vb->getState() == vbucket_state_replica && !force) {
2266 ++stats.numNotMyVBuckets;
2267 return ENGINE_NOT_MY_VBUCKET;
2268 } else if(vb->getState() == vbucket_state_pending && !force) {
2269 if (vb->addPendingOp(cookie)) {
2270 return ENGINE_EWOULDBLOCK;
2275 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2276 StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2277 if (!v || v->isDeleted() || v->isTempItem()) {
2278 if (eviction_policy == VALUE_ONLY) {
2279 return ENGINE_KEY_ENOENT;
2280 } else { // Full eviction.
2282 if (!v) { // Item might be evicted from cache.
2283 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2285 } else if (v->isTempInitialItem()) {
2287 bgFetch(key, vbucket, -1, cookie, true);
2288 return ENGINE_EWOULDBLOCK;
2289 } else { // Non-existent or deleted key.
2290 return ENGINE_KEY_ENOENT;
2293 if (!v) { // Item might be evicted from cache.
2294 // Create a temp item and delete it below as it is a
2296 add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num,
2299 if (rv == ADD_NOMEM) {
2300 return ENGINE_ENOMEM;
2302 v = vb->ht.unlocked_find(key, bucket_num, true, false);
2303 v->setStoredValueState(StoredValue::state_deleted_key);
2304 } else if (v->isTempInitialItem()) {
2305 v->setStoredValueState(StoredValue::state_deleted_key);
2306 } else { // Non-existent or deleted key.
2307 return ENGINE_KEY_ENOENT;
2313 if (v && v->isLocked(ep_current_time()) &&
2314 (vb->getState() == vbucket_state_replica ||
2315 vb->getState() == vbucket_state_pending)) {
2318 mutation_type_t delrv;
2319 delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
2321 if (itemMeta && v) {
2322 itemMeta->revSeqno = v->getRevSeqno();
2323 itemMeta->cas = v->getCas();
2324 itemMeta->flags = v->getFlags();
2325 itemMeta->exptime = v->getExptime();
2327 *cas = v ? v->getCas() : 0;
2329 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2332 ret = ENGINE_ENOMEM;
2334 case INVALID_VBUCKET:
2335 ret = ENGINE_NOT_MY_VBUCKET;
2338 ret = ENGINE_KEY_EEXISTS;
2341 ret = ENGINE_TMPFAIL;
2344 ret = ENGINE_KEY_ENOENT;
2346 queueDirty(vb, v, &lh, tapBackfill);
2351 queueDirty(vb, v, &lh, tapBackfill);
2354 // We already figured out if a bg fetch is requred for a full-evicted
2361 ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
2362 const std::string &key,
2367 ItemMetaData *itemMeta,
2373 RCPtr<VBucket> vb = getVBucket(vbucket);
2374 if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2375 ++stats.numNotMyVBuckets;
2376 return ENGINE_NOT_MY_VBUCKET;
2377 } else if(vb->getState() == vbucket_state_replica && !force) {
2378 ++stats.numNotMyVBuckets;
2379 return ENGINE_NOT_MY_VBUCKET;
2380 } else if(vb->getState() == vbucket_state_pending && !force) {
2381 if (vb->addPendingOp(cookie)) {
2382 return ENGINE_EWOULDBLOCK;
2387 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2388 StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2389 if (!force) { // Need conflict resolution.
2391 if (v->isTempInitialItem()) {
2392 bgFetch(key, vbucket, -1, cookie, true);
2393 return ENGINE_EWOULDBLOCK;
2395 if (!conflictResolver->resolve(v, *itemMeta, true)) {
2396 ++stats.numOpsDelMetaResolutionFailed;
2397 return ENGINE_KEY_EEXISTS;
2400 // Item is 1) deleted or not existent in the value eviction case OR
2401 // 2) deleted or evicted in the full eviction.
2402 return addTempItemForBgFetch(lh, bucket_num, key, vb,
2403 cookie, true, isReplication);
2407 // Create a temp item and delete it below as it is a force deletion
2408 add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2411 if (rv == ADD_NOMEM) {
2412 return ENGINE_ENOMEM;
2414 v = vb->ht.unlocked_find(key, bucket_num, true, false);
2415 v->setStoredValueState(StoredValue::state_deleted_key);
2416 } else if (v->isTempInitialItem()) {
2417 v->setStoredValueState(StoredValue::state_deleted_key);
2421 if (v && v->isLocked(ep_current_time()) &&
2422 (vb->getState() == vbucket_state_replica ||
2423 vb->getState() == vbucket_state_pending)) {
2426 mutation_type_t delrv;
2427 delrv = vb->ht.unlocked_softDelete(v, *cas, *itemMeta,
2428 eviction_policy, true);
2429 *cas = v ? v->getCas() : 0;
2431 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2434 ret = ENGINE_ENOMEM;
2436 case INVALID_VBUCKET:
2437 ret = ENGINE_NOT_MY_VBUCKET;
2440 ret = ENGINE_KEY_EEXISTS;
2443 ret = ENGINE_TMPFAIL;
2446 ret = ENGINE_KEY_ENOENT;
2451 v->setBySeqno(bySeqno);
2453 queueDirty(vb, v, &lh, tapBackfill, true, genBySeqno);
2457 bgFetch(key, vbucket, -1, cookie, true);
2458 ret = ENGINE_EWOULDBLOCK;
2464 void EventuallyPersistentStore::reset() {
2465 std::vector<int> buckets = vbMap.getBuckets();
2466 std::vector<int>::iterator it;
2467 for (it = buckets.begin(); it != buckets.end(); ++it) {
2468 RCPtr<VBucket> vb = getVBucket(*it);
2470 LockHolder lh(vb_mutexes[vb->getId()]);
2472 vb->checkpointManager.clear(vb->getState());
2474 vb->setCurrentSnapshot(0, 0);
2478 bool inverse = false;
2479 if (diskFlushAll.compare_exchange_strong(inverse, true)) {
2480 ++stats.diskQueueSize;
2481 // wake up (notify) one flusher is good enough for diskFlushAll
2482 vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
2487 * Callback invoked after persisting an item from memory to disk.
2489 * This class exists to create a closure around a few variables within
2490 * EventuallyPersistentStore::flushOne so that an object can be
2491 * requeued in case of failure to store in the underlying layer.
2493 class PersistenceCallback : public Callback<mutation_result>,
2494 public Callback<int> {
2497 PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
2498 EventuallyPersistentStore *st, EPStats *s, uint64_t c)
2499 : queuedItem(qi), vbucket(vb), store(st), stats(s), cas(c) {
2504 // This callback is invoked for set only.
2505 void callback(mutation_result &value) {
2506 if (value.first == 1) {
2508 LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2510 StoredValue *v = store->fetchValidValue(vbucket,
2511 queuedItem->getKey(),
2512 bucket_num, true, false);
2514 if (v->getCas() == cas) {
2515 // mark this item clean only if current and stored cas
2519 if (v->isNewCacheItem()) {
2521 // Insert in value-only or full eviction mode.
2522 ++vbucket->opsCreate;
2523 vbucket->incrMetaDataDisk(*queuedItem);
2524 } else { // Update in full eviction mode.
2525 --vbucket->ht.numTotalItems;
2526 ++vbucket->opsUpdate;
2528 v->setNewCacheItem(false);
2529 } else { // Update in value-only or full eviction mode.
2530 ++vbucket->opsUpdate;
2534 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2535 stats->decrDiskQueueSize(1);
2536 stats->totalPersisted++;
2538 // If the return was 0 here, we're in a bad state because
2539 // we do not know the rowid of this object.
2540 if (value.first == 0) {
2542 LockHolder lh = vbucket->ht.getLockedBucket(
2543 queuedItem->getKey(), &bucket_num);
2544 StoredValue *v = store->fetchValidValue(vbucket,
2545 queuedItem->getKey(),
2549 std::stringstream ss;
2550 ss << "Persisting ``" << queuedItem->getKey() << "'' on vb"
2551 << queuedItem->getVBucketId() << " (rowid="
2552 << v->getBySeqno() << ") returned 0 updates\n";
2553 LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2555 LOG(EXTENSION_LOG_WARNING,
2556 "Error persisting now missing ``%s'' from vb%d",
2557 queuedItem->getKey().c_str(),
2558 queuedItem->getVBucketId());
2561 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2562 stats->decrDiskQueueSize(1);
2564 std::stringstream ss;
2566 "Fatal error in persisting SET ``" <<
2567 queuedItem->getKey() << "'' on vb "
2568 << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2569 LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2575 // This callback is invoked for deletions only.
2577 // The boolean indicates whether the underlying storage
2578 // successfully deleted the item.
2579 void callback(int &value) {
2580 // > 1 would be bad. We were only trying to delete one row.
2581 cb_assert(value < 2);
2583 // 1 means we deleted one row
2584 // 0 means we did not delete a row, but did not fail (did not exist)
2586 // We have succesfully removed an item from the disk, we
2587 // may now remove it from the hash table.
2589 LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2591 StoredValue *v = store->fetchValidValue(vbucket,
2592 queuedItem->getKey(),
2593 bucket_num, true, false);
2594 if (v && v->isDeleted()) {
2595 bool newCacheItem = v->isNewCacheItem();
2596 bool deleted = vbucket->ht.unlocked_del(queuedItem->getKey(),
2599 if (newCacheItem && value > 0) {
2600 // Need to decrement the item counter again for an item that
2601 // exists on DB file, but not in memory (i.e., full eviction),
2602 // because we created the temp item in memory and incremented
2603 // the item counter when a deletion is pushed in the queue.
2604 --vbucket->ht.numTotalItems;
2609 ++stats->totalPersisted;
2610 ++vbucket->opsDelete;
2612 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2613 stats->decrDiskQueueSize(1);
2614 vbucket->decrMetaDataDisk(*queuedItem);
2616 std::stringstream ss;
2617 ss << "Fatal error in persisting DELETE ``" <<
2618 queuedItem->getKey() << "'' on vb "
2619 << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2620 LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2628 if (store->vbMap.isBucketDeletion(vbucket->getId())) {
2629 vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2630 stats->decrDiskQueueSize(1);
2633 ++stats->flushFailed;
2634 store->invokeOnLockedStoredValue(queuedItem->getKey(),
2635 queuedItem->getVBucketId(),
2636 &StoredValue::reDirty);
2637 vbucket->rejectQueue.push(queuedItem);
2640 const queued_item queuedItem;
2641 RCPtr<VBucket> &vbucket;
2642 EventuallyPersistentStore *store;
2645 DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
2648 void EventuallyPersistentStore::flushOneDeleteAll() {
2649 for (size_t i = 0; i < vbMap.getSize(); ++i) {
2650 RCPtr<VBucket> vb = getVBucket(i);
2652 LockHolder lh(vb_mutexes[vb->getId()]);
2653 getRWUnderlying(vb->getId())->reset(i);
2657 bool inverse = true;
2658 diskFlushAll.compare_exchange_strong(inverse, false);
2659 stats.decrDiskQueueSize(1);
2662 int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
2663 KVShard *shard = vbMap.getShard(vbid);
2665 if (shard->getId() == EP_PRIMARY_SHARD) {
2666 flushOneDeleteAll();
2668 // disk flush is pending just return
2673 if (vbMap.isBucketCreation(vbid)) {
2674 return RETRY_FLUSH_VBUCKET;
2677 int items_flushed = 0;
2678 rel_time_t flush_start = ep_current_time();
2680 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
2682 LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
2683 if (!lh.islocked()) { // Try another bucket if this one is locked
2684 return RETRY_FLUSH_VBUCKET; // to avoid blocking flusher
2687 KVStatsCallback cb(this);
2688 std::vector<queued_item> items;
2689 KVStore *rwUnderlying = getRWUnderlying(vbid);
2691 while (!vb->rejectQueue.empty()) {
2692 items.push_back(vb->rejectQueue.front());
2693 vb->rejectQueue.pop();
2696 LockHolder slh = vb->getSnapshotLock();
2699 vb->getCurrentSnapshot_UNLOCKED(snapStart, snapEnd);
2701 vb->getBackfillItems(items);
2702 vb->checkpointManager.getAllItemsForPersistence(items);
2705 if (!items.empty()) {
2706 while (!rwUnderlying->begin()) {
2707 ++stats.beginFailed;
2708 LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
2709 "Retry in 1 sec ...");
2712 rwUnderlying->optimizeWrites(items);
2715 uint64_t maxSeqno = 0;
2716 std::list<PersistenceCallback*> pcbs;
2717 std::vector<queued_item>::iterator it = items.begin();
2718 for(; it != items.end(); ++it) {
2719 if ((*it)->getOperation() != queue_op_set &&
2720 (*it)->getOperation() != queue_op_del) {
2722 } else if (!prev || prev->getKey() != (*it)->getKey()) {
2725 PersistenceCallback *cb = flushOneDelOrSet(*it, vb);
2729 maxSeqno = std::max(maxSeqno, (uint64_t)(*it)->getBySeqno());
2730 ++stats.flusher_todo;
2732 stats.decrDiskQueueSize(1);
2733 vb->doStatsForFlushing(*(*it), (*it)->size());
2737 BlockTimer timer(&stats.diskCommitHisto, "disk_commit",
2739 hrtime_t start = gethrtime();
2741 if (vb->getState() == vbucket_state_active) {
2742 snapStart = maxSeqno;
2746 while (!rwUnderlying->commit(&cb, snapStart, snapEnd)) {
2747 ++stats.commitFailed;
2748 LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
2754 if (vb->rejectQueue.empty()) {
2755 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
2756 if (highSeqno > 0 &&
2757 highSeqno != vbMap.getPersistenceSeqno(vbid)) {
2758 vbMap.setPersistenceSeqno(vbid, highSeqno);
2762 while (!pcbs.empty()) {
2763 delete pcbs.front();
2767 ++stats.flusherCommits;
2768 hrtime_t end = gethrtime();
2769 uint64_t commit_time = (end - start) / 1000000;
2770 uint64_t trans_time = (end - flush_start) / 1000000;
2772 lastTransTimePerItem.store((items_flushed == 0) ? 0 :
2773 static_cast<double>(trans_time) /
2774 static_cast<double>(items_flushed));
2775 stats.commit_time.store(commit_time);
2776 stats.cumulativeCommitTime.fetch_add(commit_time);
2777 stats.cumulativeFlushTime.fetch_add(ep_current_time()
2779 stats.flusher_todo.store(0);
2782 rwUnderlying->pendingTasks();
2784 if (vb->checkpointManager.getNumCheckpoints() > 1) {
2785 wakeUpCheckpointRemover();
2788 if (vb->rejectQueue.empty()) {
2789 vb->checkpointManager.itemsPersisted();
2790 uint64_t seqno = vbMap.getPersistenceSeqno(vbid);
2791 uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
2792 vb->notifyOnPersistence(engine, seqno, true);
2793 vb->notifyOnPersistence(engine, chkid, false);
2794 if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
2795 vbMap.setPersistenceCheckpointId(vbid, chkid);
2800 return items_flushed;
2803 PersistenceCallback*
2804 EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
2805 RCPtr<VBucket> &vb) {
2808 stats.decrDiskQueueSize(1);
2812 int64_t bySeqno = qi->getBySeqno();
2813 bool deleted = qi->isDeleted();
2814 rel_time_t queued(qi->getQueuedTime());
2816 int dirtyAge = ep_current_time() - queued;
2817 stats.dirtyAgeHisto.add(dirtyAge * 1000000);
2818 stats.dirtyAge.store(dirtyAge);
2819 stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
2820 stats.dirtyAgeHighWat.load()));
2822 // Wait until the vbucket database is created by the vbucket state
2824 if (vbMap.isBucketCreation(qi->getVBucketId()) ||
2825 vbMap.isBucketDeletion(qi->getVBucketId())) {
2826 vb->rejectQueue.push(qi);
2831 KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
2833 // TODO: Need to separate disk_insert from disk_update because
2834 // bySeqno doesn't give us that information.
2835 BlockTimer timer(bySeqno == -1 ?
2836 &stats.diskInsertHisto : &stats.diskUpdateHisto,
2837 bySeqno == -1 ? "disk_insert" : "disk_update",
2839 PersistenceCallback *cb =
2840 new PersistenceCallback(qi, vb, this, &stats, qi->getCas());
2841 rwUnderlying->set(*qi, *cb);
2844 BlockTimer timer(&stats.diskDelHisto, "disk_delete",
2846 PersistenceCallback *cb =
2847 new PersistenceCallback(qi, vb, this, &stats, 0);
2848 rwUnderlying->del(*qi, *cb);
2853 void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
2857 bool notifyReplicator,
2860 queued_item qi(v->toItem(false, vb->getId()));
2863 LockHolder slh = vb->getSnapshotLock();
2864 rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
2865 vb->checkpointManager.queueDirty(vb, qi,
2867 v->setBySeqno(qi->getBySeqno());
2868 vb->setCurrentSnapshot_UNLOCKED(qi->getBySeqno(), qi->getBySeqno());
2870 rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
2871 vb->checkpointManager.queueDirty(vb, qi,
2873 v->setBySeqno(qi->getBySeqno());
2881 KVShard* shard = vbMap.getShard(vb->getId());
2882 shard->getFlusher()->notifyFlushEvent();
2885 if (!tapBackfill && notifyReplicator) {
2886 engine.getTapConnMap().notifyVBConnections(vb->getId());
2887 engine.getDcpConnMap().notifyVBConnections(vb->getId(),
2893 std::vector<vbucket_state *> EventuallyPersistentStore::loadVBucketState()
2895 return getOneROUnderlying()->listPersistedVbuckets();
2898 void EventuallyPersistentStore::warmupCompleted() {
2899 // Run the vbucket state snapshot job once after the warmup
2900 scheduleVBSnapshot(Priority::VBucketPersistHighPriority);
2902 if (engine.getConfiguration().getAlogPath().length() > 0) {
2904 if (engine.getConfiguration().isAccessScannerEnabled()) {
2905 LockHolder lh(accessScanner.mutex);
2906 accessScanner.enabled = true;
2908 LOG(EXTENSION_LOG_WARNING, "Access Scanner task enabled");
2909 size_t smin = engine.getConfiguration().getAlogSleepTime();
2910 setAccessScannerSleeptime(smin);
2912 LockHolder lh(accessScanner.mutex);
2913 accessScanner.enabled = false;
2914 LOG(EXTENSION_LOG_WARNING, "Access Scanner task disabled");
2917 Configuration &config = engine.getConfiguration();
2918 config.addValueChangedListener("access_scanner_enabled",
2919 new EPStoreValueChangeListener(*this));
2920 config.addValueChangedListener("alog_sleep_time",
2921 new EPStoreValueChangeListener(*this));
2922 config.addValueChangedListener("alog_task_time",
2923 new EPStoreValueChangeListener(*this));
2926 // "0" sleep_time means that the first snapshot task will be executed
2927 // right after warmup. Subsequent snapshot tasks will be scheduled every
2928 // 60 sec by default.
2929 ExecutorPool *iom = ExecutorPool::get();
2930 ExTask task = new StatSnap(&engine, Priority::StatSnapPriority, 0, false);
2931 statsSnapshotTaskId = iom->schedule(task, WRITER_TASK_IDX);
2934 bool EventuallyPersistentStore::maybeEnableTraffic()
2936 // @todo rename.. skal vaere isTrafficDisabled elns
2937 double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
2938 double maxSize = static_cast<double>(stats.getMaxDataSize());
2940 if (memoryUsed >= stats.mem_low_wat) {
2941 LOG(EXTENSION_LOG_WARNING,
2942 "Total memory use reached to the low water mark, stop warmup");
2944 } else if (memoryUsed > (maxSize * stats.warmupMemUsedCap)) {
2945 LOG(EXTENSION_LOG_WARNING,
2946 "Enough MB of data loaded to enable traffic");
2948 } else if (eviction_policy == VALUE_ONLY &&
2949 stats.warmedUpValues >=
2950 (stats.warmedUpKeys * stats.warmupNumReadCap)) {
2951 // Let ep-engine think we're done with the warmup phase
2952 // (we should refactor this into "enableTraffic")
2953 LOG(EXTENSION_LOG_WARNING,
2954 "Enough number of items loaded to enable traffic");
2956 } else if (eviction_policy == FULL_EVICTION &&
2957 stats.warmedUpValues >=
2958 (warmupTask->getEstimatedItemCount() *
2959 stats.warmupNumReadCap)) {
2960 // In case of FULL EVICTION, warmed up keys always matches the number
2961 // of warmed up values, therefore for honoring the min_item threshold
2962 // in this scenario, we can consider warmup's estimated item count.
2963 LOG(EXTENSION_LOG_WARNING,
2964 "Enough number of items loaded to enable traffic");
2970 bool EventuallyPersistentStore::isWarmingUp() {
2971 return !warmupTask->isComplete();
2974 void EventuallyPersistentStore::stopWarmup(void)
2976 // forcefully stop current warmup task
2977 if (isWarmingUp()) {
2978 LOG(EXTENSION_LOG_WARNING, "Stopping warmup while engine is loading "
2979 "data from underlying storage, shutdown = %s\n",
2980 stats.isShutdown ? "yes" : "no");
2985 bool EventuallyPersistentStore::isMemoryUsageTooHigh() {
2986 double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
2987 double maxSize = static_cast<double>(stats.getMaxDataSize());
2988 return memoryUsed > (maxSize * backfillMemoryThreshold);
2991 void EventuallyPersistentStore::setBackfillMemoryThreshold(
2993 backfillMemoryThreshold = threshold;
2996 void EventuallyPersistentStore::setExpiryPagerSleeptime(size_t val) {
2997 LockHolder lh(expiryPager.mutex);
2999 if (expiryPager.sleeptime != 0) {
3000 ExecutorPool::get()->cancel(expiryPager.task);
3003 expiryPager.sleeptime = val;
3005 ExTask expTask = new ExpiredItemPager(&engine, stats,
3006 expiryPager.sleeptime);
3007 expiryPager.task = ExecutorPool::get()->schedule(expTask,
3012 void EventuallyPersistentStore::enableAccessScannerTask() {
3013 LockHolder lh(accessScanner.mutex);
3014 if (!accessScanner.enabled) {
3015 accessScanner.enabled = true;
3017 if (accessScanner.sleeptime != 0) {
3018 ExecutorPool::get()->cancel(accessScanner.task);
3021 size_t alogSleepTime = engine.getConfiguration().getAlogSleepTime();
3022 accessScanner.sleeptime = alogSleepTime * 60;
3023 if (accessScanner.sleeptime != 0) {
3024 ExTask task = new AccessScanner(*this, stats,
3025 Priority::AccessScannerPriority,
3026 accessScanner.sleeptime);
3027 accessScanner.task = ExecutorPool::get()->schedule(task,
3030 gettimeofday(&tv, NULL);
3031 advance_tv(tv, accessScanner.sleeptime);
3032 stats.alogTime.store(tv.tv_sec);
3034 LOG(EXTENSION_LOG_WARNING, "Did not enable access scanner task, "
3035 "as alog_sleep_time is set to zero!");
3038 LOG(EXTENSION_LOG_DEBUG, "Access scanner already enabled!");
3042 void EventuallyPersistentStore::disableAccessScannerTask() {
3043 LockHolder lh(accessScanner.mutex);
3044 if (accessScanner.enabled) {
3045 ExecutorPool::get()->cancel(accessScanner.task);
3046 accessScanner.sleeptime = 0;
3047 accessScanner.enabled = false;
3049 LOG(EXTENSION_LOG_DEBUG, "Access scanner already disabled!");
3053 void EventuallyPersistentStore::setAccessScannerSleeptime(size_t val) {
3054 LockHolder lh(accessScanner.mutex);
3056 if (accessScanner.enabled) {
3057 if (accessScanner.sleeptime != 0) {
3058 ExecutorPool::get()->cancel(accessScanner.task);
3061 // store sleeptime in seconds
3062 accessScanner.sleeptime = val * 60;
3063 if (accessScanner.sleeptime != 0) {
3064 ExTask task = new AccessScanner(*this, stats,
3065 Priority::AccessScannerPriority,
3066 accessScanner.sleeptime);
3067 accessScanner.task = ExecutorPool::get()->schedule(task,
3071 gettimeofday(&tv, NULL);
3072 advance_tv(tv, accessScanner.sleeptime);
3073 stats.alogTime.store(tv.tv_sec);
3078 void EventuallyPersistentStore::resetAccessScannerStartTime() {
3079 LockHolder lh(accessScanner.mutex);
3081 if (accessScanner.enabled) {
3082 if (accessScanner.sleeptime != 0) {
3083 ExecutorPool::get()->cancel(accessScanner.task);
3084 // re-schedule task according to the new task start hour
3085 ExTask task = new AccessScanner(*this, stats,
3086 Priority::AccessScannerPriority,
3087 accessScanner.sleeptime);
3088 accessScanner.task = ExecutorPool::get()->schedule(task,
3092 gettimeofday(&tv, NULL);
3093 advance_tv(tv, accessScanner.sleeptime);
3094 stats.alogTime.store(tv.tv_sec);
3099 void EventuallyPersistentStore::visit(VBucketVisitor &visitor)
3101 size_t maxSize = vbMap.getSize();
3102 cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
3103 for (size_t i = 0; i < maxSize; ++i) {
3104 uint16_t vbid = static_cast<uint16_t>(i);
3105 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3107 bool wantData = visitor.visitBucket(vb);
3108 // We could've lost this along the way.
3110 vb->ht.visit(visitor);
3117 VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s,
3118 shared_ptr<VBucketVisitor> v,
3119 const char *l, const Priority &p, double sleep) :
3120 GlobalTask(&s->getEPEngine(), p, 0, false), store(s),
3121 visitor(v), label(l), sleepTime(sleep), currentvb(0)
3123 const VBucketFilter &vbFilter = visitor->getVBucketFilter();
3124 size_t maxSize = store->vbMap.getSize();
3125 cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
3126 for (size_t i = 0; i < maxSize; ++i) {
3127 uint16_t vbid = static_cast<uint16_t>(i);
3128 RCPtr<VBucket> vb = store->vbMap.getBucket(vbid);
3129 if (vb && vbFilter(vbid)) {
3135 bool VBCBAdaptor::run(void) {
3136 if (!vbList.empty()) {
3137 currentvb = vbList.front();
3138 RCPtr<VBucket> vb = store->vbMap.getBucket(currentvb);
3140 if (visitor->pauseVisitor()) {
3144 if (visitor->visitBucket(vb)) {
3145 vb->ht.visit(*visitor);
3151 bool isdone = vbList.empty();
3153 visitor->complete();
3158 VBucketVisitorTask::VBucketVisitorTask(EventuallyPersistentStore *s,
3159 shared_ptr<VBucketVisitor> v,
3160 uint16_t sh, const char *l,
3161 double sleep, bool shutdown):
3162 GlobalTask(&(s->getEPEngine()), Priority::AccessScannerPriority,
3164 store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
3167 const VBucketFilter &vbFilter = visitor->getVBucketFilter();
3168 std::vector<int> vbs = store->vbMap.getShard(shardID)->getVBuckets();
3169 cb_assert(vbs.size() <= std::numeric_limits<uint16_t>::max());
3170 std::vector<int>::iterator it;
3171 for (it = vbs.begin(); it != vbs.end(); ++it) {
3172 uint16_t vbid = static_cast<uint16_t>(*it);
3173 RCPtr<VBucket> vb = store->vbMap.getBucket(vbid);
3174 if (vb && vbFilter(vbid)) {
3180 bool VBucketVisitorTask::run() {
3181 if (!vbList.empty()) {
3182 currentvb = vbList.front();
3183 RCPtr<VBucket> vb = store->vbMap.getBucket(currentvb);
3185 if (visitor->pauseVisitor()) {
3189 if (visitor->visitBucket(vb)) {
3190 vb->ht.visit(*visitor);
3196 bool isDone = vbList.empty();
3198 visitor->complete();
3203 void EventuallyPersistentStore::resetUnderlyingStats(void)
3205 for (size_t i = 0; i < vbMap.numShards; i++) {
3206 KVShard *shard = vbMap.shards[i];
3207 shard->getRWUnderlying()->resetStats();
3208 shard->getROUnderlying()->resetStats();
3211 for (size_t i = 0; i < MAX_TYPE_ID; i++) {
3212 stats.schedulingHisto[i].reset();
3213 stats.taskRuntimeHisto[i].reset();
3217 void EventuallyPersistentStore::addKVStoreStats(ADD_STAT add_stat,
3218 const void* cookie) {
3219 for (size_t i = 0; i < vbMap.numShards; i++) {
3220 std::stringstream rwPrefix;
3221 std::stringstream roPrefix;
3222 rwPrefix << "rw_" << i;
3223 roPrefix << "ro_" << i;
3224 vbMap.shards[i]->getRWUnderlying()->addStats(rwPrefix.str(), add_stat,
3226 vbMap.shards[i]->getROUnderlying()->addStats(roPrefix.str(), add_stat,
3231 void EventuallyPersistentStore::addKVStoreTimingStats(ADD_STAT add_stat,
3232 const void* cookie) {
3233 for (size_t i = 0; i < vbMap.numShards; i++) {
3234 std::stringstream rwPrefix;
3235 std::stringstream roPrefix;
3236 rwPrefix << "rw_" << i;
3237 roPrefix << "ro_" << i;
3238 vbMap.shards[i]->getRWUnderlying()->addTimingStats(rwPrefix.str(),
3241 vbMap.shards[i]->getROUnderlying()->addTimingStats(roPrefix.str(),
3247 KVStore *EventuallyPersistentStore::getOneROUnderlying(void) {
3248 return vbMap.getShard(EP_PRIMARY_SHARD)->getROUnderlying();
3251 KVStore *EventuallyPersistentStore::getOneRWUnderlying(void) {
3252 return vbMap.getShard(EP_PRIMARY_SHARD)->getRWUnderlying();
3256 EventuallyPersistentStore::rollback(uint16_t vbid,
3257 uint64_t rollbackSeqno) {
3258 LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
3259 if (!lh.islocked()) {
3260 return ENGINE_TMPFAIL; // Reschedule a vbucket rollback task.
3263 if (rollbackSeqno != 0) {
3264 shared_ptr<RollbackCB> cb(new RollbackCB(engine));
3265 KVStore* rwUnderlying = vbMap.getShard(vbid)->getRWUnderlying();
3266 RollbackResult result = rwUnderlying->rollback(vbid, rollbackSeqno, cb);
3268 if (result.success) {
3269 RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3270 vb->failovers->pruneEntries(result.highSeqno);
3271 vb->checkpointManager.clear(vb->getState());
3272 vb->checkpointManager.setBySeqno(result.highSeqno);
3273 vb->setCurrentSnapshot(result.snapStartSeqno, result.snapEndSeqno);
3274 return ENGINE_SUCCESS;
3278 if (resetVBucket(vbid)) {
3279 return ENGINE_SUCCESS;
3281 return ENGINE_NOT_MY_VBUCKET;