#include "access_scanner.h"
#include "checkpoint_remover.h"
#include "conflict_resolution.h"
+#include "defragmenter.h"
#include "ep.h"
#include "ep_engine.h"
#include "failover-table.h"
class StatsValueChangeListener : public ValueChangedListener {
public:
- StatsValueChangeListener(EPStats &st) : stats(st) {
+ StatsValueChangeListener(EPStats &st, EventuallyPersistentStore &str)
+ : stats(st), store(str) {
// EMPTY
}
virtual void sizeValueChanged(const std::string &key, size_t value) {
if (key.compare("max_size") == 0) {
stats.setMaxDataSize(value);
+ store.getEPEngine().getDcpConnMap(). \
+ updateMaxActiveSnoozingBackfills(value);
size_t low_wat = static_cast<size_t>
- (static_cast<double>(value) * 0.6);
+ (static_cast<double>(value) * 0.75);
size_t high_wat = static_cast<size_t>(
- static_cast<double>(value) * 0.75);
+ static_cast<double>(value) * 0.85);
stats.mem_low_wat.store(low_wat);
stats.mem_high_wat.store(high_wat);
} else if (key.compare("mem_low_wat") == 0) {
private:
EPStats &stats;
+ EventuallyPersistentStore &store;
};
/**
} else {
store.disableAccessScannerTask();
}
+ } else if (key.compare("bfilter_enabled") == 0) {
+ store.setAllBloomFilters(value);
+ }
+ }
+
+ virtual void floatValueChanged(const std::string &key, float value) {
+ if (key.compare("bfilter_residency_threshold") == 0) {
+ store.setBfiltersResidencyThreshold(value);
}
}
VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
RCPtr<VBucket> &vb, double delay) :
GlobalTask(&eng,
- Priority::VBMemoryDeletionPriority, delay, true),
+ TaskId::VBucketMemoryDeletionTask, delay, true),
e(eng), vbucket(vb), vbid(vb->getId()) { }
std::string getDescription() {
class PendingOpsNotification : public GlobalTask {
public:
PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
- GlobalTask(&e, Priority::VBMemoryDeletionPriority, 0, false),
+ GlobalTask(&e, TaskId::PendingOpsNotification, 0, false),
engine(e), vbucket(vb) { }
std::string getDescription() {
EventuallyPersistentEngine &theEngine) :
engine(theEngine), stats(engine.getEpStats()),
vbMap(theEngine.getConfiguration(), *this),
+ defragmenterTask(NULL),
bgFetchQueue(0),
- diskFlushAll(false), bgFetchDelay(0), backfillMemoryThreshold(0.95),
+ diskFlushAll(false), bgFetchDelay(0),
+ backfillMemoryThreshold(0.95),
statsSnapshotTaskId(0), lastTransTimePerItem(0)
{
cachedResidentRatio.activeRatio.store(0);
storageProperties = new StorageProperties(true, true, true, true);
- stats.schedulingHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
- stats.taskRuntimeHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
+ stats.schedulingHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
+ stats.taskRuntimeHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
- for (size_t i = 0; i < MAX_TYPE_ID; i++) {
+ for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
stats.schedulingHisto[i].reset();
stats.taskRuntimeHisto[i].reset();
}
stats.memOverhead = sizeof(EventuallyPersistentStore);
if (config.getConflictResolutionType().compare("seqno") == 0) {
- conflictResolver = new SeqBasedResolution();
+ conflictResolver = new ConflictResolution();
}
stats.setMaxDataSize(config.getMaxSize());
config.addValueChangedListener("max_size",
- new StatsValueChangeListener(stats));
+ new StatsValueChangeListener(stats, *this));
+ getEPEngine().getDcpConnMap().updateMaxActiveSnoozingBackfills(config.getMaxSize());
stats.mem_low_wat.store(config.getMemLowWat());
config.addValueChangedListener("mem_low_wat",
- new StatsValueChangeListener(stats));
+ new StatsValueChangeListener(stats, *this));
stats.mem_high_wat.store(config.getMemHighWat());
config.addValueChangedListener("mem_high_wat",
- new StatsValueChangeListener(stats));
+ new StatsValueChangeListener(stats, *this));
stats.tapThrottleThreshold.store(static_cast<double>
(config.getTapThrottleThreshold())
/ 100.0);
config.addValueChangedListener("tap_throttle_threshold",
- new StatsValueChangeListener(stats));
+ new StatsValueChangeListener(stats, *this));
stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
config.addValueChangedListener("tap_throttle_queue_cap",
stats.warmupMemUsedCap.store(static_cast<double>
(config.getWarmupMinMemoryThreshold()) / 100.0);
config.addValueChangedListener("warmup_min_memory_threshold",
- new StatsValueChangeListener(stats));
+ new StatsValueChangeListener(stats, *this));
stats.warmupNumReadCap.store(static_cast<double>
(config.getWarmupMinItemsThreshold()) / 100.0);
config.addValueChangedListener("warmup_min_items_threshold",
- new StatsValueChangeListener(stats));
+ new StatsValueChangeListener(stats, *this));
double mem_threshold = static_cast<double>
(config.getMutationMemThreshold()) / 100;
config.addValueChangedListener("backfill_mem_threshold",
new EPStoreValueChangeListener(*this));
+ config.addValueChangedListener("bfilter_enabled",
+ new EPStoreValueChangeListener(*this));
+
+ bfilterResidencyThreshold = config.getBfilterResidencyThreshold();
+ config.addValueChangedListener("bfilter_residency_threshold",
+ new EPStoreValueChangeListener(*this));
+
compactionExpMemThreshold = config.getCompactionExpMemThreshold();
config.addValueChangedListener("compaction_exp_mem_threshold",
new EPStoreValueChangeListener(*this));
ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
+#if HAVE_JEMALLOC
+ /* Only create the defragmenter task if we have an underlying memory
+ * allocator which can facilitate defragmenting memory.
+ */
+ defragmenterTask = new DefragmenterTask(&engine, stats);
+ ExecutorPool::get()->schedule(defragmenterTask, NONIO_TASK_IDX);
+#endif
+
return true;
}
EventuallyPersistentStore::~EventuallyPersistentStore() {
stopWarmup();
stopBgFetcher();
- ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX);
+ ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX,
+ stats.forceShutdown);
ExecutorPool::get()->cancel(statsSnapshotTaskId);
LockHolder lh(accessScanner.mutex);
lh.unlock();
stopFlusher();
- ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine());
+ ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine(),
+ stats.forceShutdown);
delete [] vb_mutexes;
delete [] schedule_vbstate_persist;
delete conflictResolver;
delete warmupTask;
delete storageProperties;
+ defragmenterTask.reset();
std::vector<MutationLog*>::iterator it;
for (it = accessLog.begin(); it != accessLog.end(); it++) {
void EventuallyPersistentStore::stopFlusher() {
for (uint16_t i = 0; i < vbMap.numShards; i++) {
Flusher *flusher = vbMap.shards[i]->getFlusher();
+ LOG(EXTENSION_LOG_WARNING, "Attempting to stop the flusher for "
+ "shard:%" PRIu16, i);
bool rv = flusher->stop(stats.forceShutdown);
if (rv && !stats.forceShutdown) {
flusher->wait();
BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
if (bgfetcher == NULL) {
LOG(EXTENSION_LOG_WARNING,
- "Falied to start bg fetcher for shard %d", i);
+ "Failed to start bg fetcher for shard %d", i);
return false;
}
bgfetcher->start();
"Shutting down engine while there are still pending data "
"read for shard %d from database storage", i);
}
- LOG(EXTENSION_LOG_INFO, "Stopping bg fetcher for underlying storage");
+ LOG(EXTENSION_LOG_WARNING, "Stopping bg fetcher for shard:%" PRIu16, i);
bgfetcher->stop();
}
}
cb_assert(deleted);
} else if (v->isExpired(startTime) && !v->isDeleted()) {
vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
- queueDirty(vb, v, &lh, false);
+ queueDirty(vb, v, &lh, NULL, false);
}
} else {
if (eviction_policy == FULL_EVICTION) {
// Create a temp item and delete and push it
- // into the checkpoint queue.
- add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
- eviction_policy);
- if (rv == ADD_NOMEM) {
- return;
+ // into the checkpoint queue, only if the bloomfilter
+ // predicts that the item may exist on disk.
+ if (vb->maybeKeyExistsInFilter(key)) {
+ add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
+ eviction_policy);
+ if (rv == ADD_NOMEM) {
+ return;
+ }
+ v = vb->ht.unlocked_find(key, bucket_num, true, false);
+ v->setStoredValueState(StoredValue::state_deleted_key);
+ v->setRevSeqno(revSeqno);
+ vb->ht.unlocked_softDelete(v, 0, eviction_policy);
+ queueDirty(vb, v, &lh, NULL, false);
}
- v = vb->ht.unlocked_find(key, bucket_num, true, false);
- v->setStoredValueState(StoredValue::state_deleted_key);
- v->setRevSeqno(revSeqno);
- vb->ht.unlocked_softDelete(v, 0, eviction_policy);
- queueDirty(vb, v, &lh, false);
}
}
}
if (queueExpired && vb->getState() == vbucket_state_active) {
incExpirationStat(vb, false);
vb->ht.unlocked_softDelete(v, 0, eviction_policy);
- queueDirty(vb, v, NULL, false, true);
+ queueDirty(vb, v, NULL, NULL, false, true);
}
return wantDeleted ? v : NULL;
}
return v;
}
+bool EventuallyPersistentStore::isMetaDataResident(RCPtr<VBucket> &vb,
+ const std::string &key) {
+
+ cb_assert(vb);
+ int bucket_num(0);
+ LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
+ StoredValue *v = vb->ht.unlocked_find(key, bucket_num, false, false);
+
+ if (v && !v->isTempItem()) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
protocol_binary_response_status EventuallyPersistentStore::evictKey(
const std::string &key,
uint16_t vbucket,
if (v->isResident()) {
if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
*msg = "Ejected.";
+
+ // Add key to bloom filter incase of full eviction mode
+ if (getItemEvictionPolicy() == FULL_EVICTION) {
+ vb->addToFilter(key);
+ }
} else {
*msg = "Can't eject: Dirty object.";
rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
abort();
case ADD_BG_FETCH:
lock.unlock();
- bgFetch(key, vb->getId(), -1, cookie, metadataOnly);
+ bgFetch(key, vb->getId(), cookie, metadataOnly);
}
return ENGINE_EWOULDBLOCK;
}
vb->getState() == vbucket_state_pending)) {
v->unlock();
}
- mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(),
- true, false,
- eviction_policy, nru);
+ bool maybeKeyExists = true;
+ // Check Bloomfilter's prediction if in full eviction policy
+ // and for a CAS operation only.
+ if (eviction_policy == FULL_EVICTION && itm.getCas() != 0) {
+ // Check Bloomfilter's prediction
+ if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
+ maybeKeyExists = false;
+ }
+ }
+
+ mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(), true, false,
+ eviction_policy, nru,
+ maybeKeyExists);
+
+ Item& it = const_cast<Item&>(itm);
+ uint64_t seqno = 0;
ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
switch (mtype) {
case NOMEM:
// Even if the item was dirty, push it into the vbucket's open
// checkpoint.
case WAS_CLEAN:
- queueDirty(vb, v, &lh);
+ it.setCas(vb->nextHLCCas());
+ v->setCas(it.getCas());
+ queueDirty(vb, v, &lh, &seqno);
+ it.setBySeqno(seqno);
break;
case NEED_BG_FETCH:
- { // CAS operation with non-resident item + full eviction.
- if (v) {
- // temp item is already created. Simply schedule a bg fetch job
- lh.unlock();
- bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
- return ENGINE_EWOULDBLOCK;
- }
- ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
- cookie, true);
- break;
+ { // CAS operation with non-resident item + full eviction.
+ if (v) {
+ // temp item is already created. Simply schedule a bg fetch job
+ lh.unlock();
+ bgFetch(itm.getKey(), vb->getId(), cookie, true);
+ return ENGINE_EWOULDBLOCK;
}
+ ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
+ cookie, true);
+ break;
+ }
case INVALID_VBUCKET:
ret = ENGINE_NOT_MY_VBUCKET;
break;
LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
false);
+
+ bool maybeKeyExists = true;
+ if (eviction_policy == FULL_EVICTION) {
+ // Check bloomfilter's prediction
+ if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
+ maybeKeyExists = false;
+ }
+ }
+
add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
- eviction_policy);
+ eviction_policy,
+ true, true,
+ maybeKeyExists);
+ Item& it = const_cast<Item&>(itm);
+ uint64_t seqno = 0;
switch (atype) {
case ADD_NOMEM:
return ENGINE_ENOMEM;
case ADD_EXISTS:
return ENGINE_NOT_STORED;
case ADD_TMP_AND_BG_FETCH:
- return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
+ return addTempItemForBgFetch(lh, bucket_num, it.getKey(), vb,
cookie, true);
case ADD_BG_FETCH:
lh.unlock();
- bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
+ bgFetch(it.getKey(), vb->getId(), cookie, true);
return ENGINE_EWOULDBLOCK;
case ADD_SUCCESS:
case ADD_UNDEL:
- queueDirty(vb, v, &lh);
+ it.setCas(vb->nextHLCCas());
+ v->setCas(it.getCas());
+ queueDirty(vb, v, &lh, &seqno);
+ it.setBySeqno(seqno);
break;
}
+
return ENGINE_SUCCESS;
}
0xff);
}
+ Item& it = const_cast<Item&>(itm);
+ uint64_t seqno = 0;
ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
switch (mtype) {
case NOMEM:
// Even if the item was dirty, push it into the vbucket's open
// checkpoint.
case WAS_CLEAN:
- queueDirty(vb, v, &lh);
+ it.setCas(vb->nextHLCCas());
+ v->setCas(it.getCas());
+ queueDirty(vb, v, &lh, &seqno);
+ it.setBySeqno(seqno);
break;
case NEED_BG_FETCH:
{
// temp item is already created. Simply schedule a bg fetch job
lh.unlock();
- bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
+ bgFetch(it.getKey(), vb->getId(), cookie, true);
ret = ENGINE_EWOULDBLOCK;
break;
}
return ENGINE_KEY_ENOENT;
}
- return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
- cookie, false);
+ if (vb->maybeKeyExistsInFilter(itm.getKey())) {
+ return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
+ cookie, false);
+ } else {
+ // As bloomfilter predicted that item surely doesn't exist
+ // on disk, return ENOENT for replace().
+ return ENGINE_KEY_ENOENT;
+ }
}
}
-ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
- uint8_t nru,
- bool genBySeqno) {
+ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(
+ const Item &itm,
+ uint8_t nru,
+ bool genBySeqno,
+ ExtendedMetaData *emd) {
RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
if (!vb) {
return ENGINE_NOT_MY_VBUCKET;
}
+ //check for the incoming item's CAS validity
+ if (!Item::isValidCas(itm.getCas())) {
+ return ENGINE_KEY_EEXISTS;
+ }
+
int bucket_num(0);
LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
case NOT_FOUND:
// FALLTHROUGH
case WAS_CLEAN:
- queueDirty(vb, v, &lh, true, true, genBySeqno);
+ /* set the conflict resolution mode from the extended meta data *
+ * Given that the mode is already set, we don't need to set the *
+ * conflict resolution mode in queueDirty */
+ if (emd) {
+ v->setConflictResMode(
+ static_cast<enum conflict_resolution_mode>(
+ emd->getConflictResMode()));
+ }
+ vb->setMaxCas(v->getCas());
+ queueDirty(vb, v, &lh, NULL,true, true, genBySeqno, false);
break;
case INVALID_VBUCKET:
ret = ENGINE_NOT_MY_VBUCKET;
abort();
}
+ // Update drift counter for vbucket upon a success only
+ if (ret == ENGINE_SUCCESS && emd) {
+ vb->setDriftCounter(emd->getAdjustedTime());
+ }
+
return ret;
}
EventuallyPersistentStore *epstore;
};
-void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
+void EventuallyPersistentStore::snapshotVBuckets(VBSnapshotTask::Priority prio,
uint16_t shardId) {
class VBucketStateVisitor : public VBucketVisitor {
: vbuckets(vb_map), shardId(sid) { }
bool visitBucket(RCPtr<VBucket> &vb) {
if (vbuckets.getShard(vb->getId())->getId() == shardId) {
- uint64_t snapStart = 0;
- uint64_t snapEnd = 0;
+ snapshot_range_t range;
+ vb->getPersistedSnapshot(range);
std::string failovers = vb->failovers->toJSON();
uint64_t chkId = vbuckets.getPersistenceCheckpointId(vb->getId());
- vb->getCurrentSnapshot(snapStart, snapEnd);
vbucket_state vb_state(vb->getState(), chkId, 0,
vb->getHighSeqno(), vb->getPurgeSeqno(),
- snapStart, snapEnd, failovers);
+ range.start, range.end, vb->getMaxCas(),
+ vb->getDriftCounter() ,failovers);
states.insert(std::pair<uint16_t, vbucket_state>(vb->getId(), vb_state));
}
return false;
};
KVShard *shard = vbMap.shards[shardId];
- if (priority == Priority::VBucketPersistLowPriority) {
+ if (prio == VBSnapshotTask::Priority::LOW) {
shard->setLowPriorityVbSnapshotFlag(false);
} else {
shard->setHighPriorityVbSnapshotFlag(false);
break;
}
- if (priority == Priority::VBucketPersistHighPriority) {
+ if (prio == VBSnapshotTask::Priority::HIGH) {
if (vbMap.setBucketCreation(iter->first, false)) {
LOG(EXTENSION_LOG_INFO, "VBucket %d created", iter->first);
}
}
if (!success) {
- scheduleVBSnapshot(priority, shard->getId());
+ scheduleVBSnapshot(prio, shard->getId());
} else {
stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
}
}
-bool EventuallyPersistentStore::persistVBState(const Priority &priority,
- uint16_t vbid) {
+bool EventuallyPersistentStore::persistVBState(uint16_t vbid) {
schedule_vbstate_persist[vbid] = false;
RCPtr<VBucket> vb = getVBucket(vbid);
}
}
+ const hrtime_t start = gethrtime();
+
KVStatsCallback kvcb(this);
uint64_t chkId = vbMap.getPersistenceCheckpointId(vbid);
std::string failovers = vb->failovers->toJSON();
- uint64_t snapStart = 0;
- uint64_t snapEnd = 0;
- vb->getCurrentSnapshot(snapStart, snapEnd);
+ snapshot_range_t range;
+ vb->getPersistedSnapshot(range);
vbucket_state vb_state(vb->getState(), chkId, 0, vb->getHighSeqno(),
- vb->getPurgeSeqno(), snapStart, snapEnd, failovers);
+ vb->getPurgeSeqno(), range.start, range.end,
+ vb->getMaxCas(), vb->getDriftCounter(),
+ failovers);
KVStore *rwUnderlying = getRWUnderlying(vbid);
if (rwUnderlying->snapshotVBucket(vbid, vb_state, &kvcb)) {
+ stats.persistVBStateHisto.add((gethrtime() - start) / 1000);
if (vbMap.setBucketCreation(vbid, false)) {
LOG(EXTENSION_LOG_INFO, "VBucket %d created", vbid);
}
}
vb->setState(to, engine.getServerApi());
+
+ if (to == vbucket_state_active && oldstate == vbucket_state_replica) {
+ /**
+ * Update snapshot range when vbucket goes from being a replica
+ * to active, to maintain the correct snapshot sequence numbers
+ * even in a failover scenario.
+ */
+ vb->checkpointManager.resetSnapshotRange();
+ }
+
if (to == vbucket_state_active && !transfer) {
- uint64_t snapStart = 0;
- uint64_t snapEnd = 0;
- vb->getCurrentSnapshot(snapStart, snapEnd);
- if (snapEnd == vbMap.getPersistenceSeqno(vbid)) {
- vb->failovers->createEntry(snapEnd);
+ snapshot_range_t range;
+ vb->getPersistedSnapshot(range);
+ if (range.end == vbMap.getPersistenceSeqno(vbid)) {
+ vb->failovers->createEntry(range.end);
} else {
- vb->failovers->createEntry(snapStart);
+ vb->failovers->createEntry(range.start);
}
}
ExTask notifyTask = new PendingOpsNotification(engine, vb);
ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
}
- scheduleVBStatePersist(Priority::VBucketPersistLowPriority, vbid);
+ scheduleVBStatePersist(VBStatePersistTask::Priority::LOW, vbid);
} else {
FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
+ KVShard* shard = vbMap.getShard(vbid);
+ shared_ptr<Callback<uint16_t> > cb(new NotifyFlusherCB(shard));
RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
engine.getCheckpointConfig(),
- vbMap.getShard(vbid), 0, 0, 0, ft));
+ shard, 0, 0, 0, ft, cb));
// The first checkpoint for active vbucket should start with id 2.
uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
vbMap.setPersistenceSeqno(vbid, 0);
vbMap.setBucketCreation(vbid, true);
lh.unlock();
- scheduleVBStatePersist(Priority::VBucketPersistHighPriority, vbid);
+ scheduleVBStatePersist(VBStatePersistTask::Priority::HIGH, vbid);
}
return ENGINE_SUCCESS;
}
-bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
+bool EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio) {
KVShard *shard = NULL;
- if (p == Priority::VBucketPersistHighPriority) {
+ if (prio == VBSnapshotTask::Priority::HIGH) {
for (size_t i = 0; i < vbMap.numShards; ++i) {
shard = vbMap.shards[i];
if (shard->setHighPriorityVbSnapshotFlag(true)) {
- ExTask task = new VBSnapshotTask(&engine, p, i, false);
+ ExTask task = new VBSnapshotTaskHigh(&engine, i, true);
ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
}
}
for (size_t i = 0; i < vbMap.numShards; ++i) {
shard = vbMap.shards[i];
if (shard->setLowPriorityVbSnapshotFlag(true)) {
- ExTask task = new VBSnapshotTask(&engine, p, i, false);
+ ExTask task = new VBSnapshotTaskLow(&engine, i, true);
ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
}
}
return true;
}
-void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p,
+void EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio,
uint16_t shardId,
bool force) {
KVShard *shard = vbMap.shards[shardId];
- if (p == Priority::VBucketPersistHighPriority) {
+ if (prio == VBSnapshotTask::Priority::HIGH) {
if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
- ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
+ ExTask task = new VBSnapshotTaskHigh(&engine, shardId, true);
ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
}
} else {
if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
- ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
+ ExTask task = new VBSnapshotTaskLow(&engine, shardId, true);
ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
}
}
}
-void EventuallyPersistentStore::scheduleVBStatePersist(const Priority &priority,
+void EventuallyPersistentStore::scheduleVBStatePersist(VBStatePersistTask::Priority priority,
uint16_t vbid,
bool force) {
bool inverse = false;
if (force ||
schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
- ExTask task = new VBStatePersistTask(&engine, priority, vbid, false);
- ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
+ if (priority == VBStatePersistTask::Priority::HIGH) {
+ ExecutorPool::get()->schedule(new VBStatePersistTaskHigh(&engine, vbid, true), WRITER_TASK_IDX);
+ } else {
+ ExecutorPool::get()->schedule(new VBStatePersistTaskLow(&engine, vbid, true), WRITER_TASK_IDX);
+ }
}
}
ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
if (vbMap.setBucketDeletion(vb->getId(), true)) {
- ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie,
- Priority::VBucketDeletionPriority);
+ ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie);
ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
}
}
}
LockHolder lh(compactionLock);
- ExTask task = new CompactVBucketTask(&engine, Priority::CompactorPriority,
- vbid, c, cookie);
+ ExTask task = new CompactVBucketTask(&engine, vbid, c, cookie);
compactionTasks.push_back(std::make_pair(vbid, task));
if (compactionTasks.size() > 1) {
if ((stats.diskQueueSize > compactionWriteQueueCap &&
return true; // Schedule a compaction task again.
}
+ Configuration &config = getEPEngine().getConfiguration();
+ if (config.isBfilterEnabled()) {
+ size_t initial_estimation = config.getBfilterKeyCount();
+ size_t estimated_count;
+ size_t num_deletes =
+ getROUnderlying(vbid)->getNumPersistedDeletes(vbid);
+ if (eviction_policy == VALUE_ONLY) {
+ /**
+ * VALUE-ONLY EVICTION POLICY
+ * Obtain number of persisted deletes from underlying kvstore.
+ * Bloomfilter's estimated_key_count = 1.25 * deletes
+ */
+
+ estimated_count = round(1.25 * num_deletes);
+ ctx->bfcb = new BfilterCB(this, vbid, false);
+ } else {
+ /**
+ * FULL EVICTION POLICY
+ * First determine if the resident ratio of vbucket is less than
+ * the threshold from configuration.
+ */
+
+ bool residentRatioAlert = vb->isResidentRatioUnderThreshold(
+ getBfiltersResidencyThreshold(),
+ eviction_policy);
+ ctx->bfcb = new BfilterCB(this, vbid, residentRatioAlert);
+
+ /**
+ * Based on resident ratio against threshold, estimate count.
+ *
+ * 1. If resident ratio is greater than the threshold:
+ * Obtain number of persisted deletes from underlying kvstore.
+ * Obtain number of non-resident-items for vbucket.
+ * Bloomfilter's estimated_key_count =
+ * 1.25 * (deletes + non-resident)
+ *
+ * 2. Otherwise:
+ * Obtain number of items for vbucket.
+ * Bloomfilter's estimated_key_count =
+ * 1.25 * (num_items)
+ */
+
+ if (residentRatioAlert) {
+ estimated_count = round(1.25 *
+ vb->getNumItems(eviction_policy));
+ } else {
+ estimated_count = round(1.25 * (num_deletes +
+ vb->getNumNonResidentItems(eviction_policy)));
+ }
+ }
+ if (estimated_count < initial_estimation) {
+ estimated_count = initial_estimation;
+ }
+ vb->initTempFilter(estimated_count, config.getBfilterFpProb());
+ }
+
if (vb->getState() == vbucket_state_active) {
// Set the current time ONLY for active vbuckets.
ctx->curr_time = ep_real_time();
}
ExpiredItemsCallback cb(this, vbid);
KVStatsCallback kvcb(this);
- getRWUnderlying(vbid)->compactVBucket(vbid, ctx, cb, kvcb);
+ if (getRWUnderlying(vbid)->compactVBucket(vbid, ctx, cb, kvcb)) {
+ if (config.isBfilterEnabled()) {
+ vb->swapFilter();
+ } else {
+ vb->clearFilter();
+ }
+ } else {
+ LOG(EXTENSION_LOG_WARNING, "Compaction: Not successful for vb %u, "
+ "clearing bloom filter, if any.", vb->getId());
+ vb->clearFilter();
+ }
vb->setPurgeSeqno(ctx->max_purged_seq);
} else {
err = ENGINE_NOT_MY_VBUCKET;
lh.unlock();
std::list<std::string> tap_cursors = vb->checkpointManager.
- getTAPCursorNames();
+ getCursorNames();
// Delete and recreate the vbucket database file
scheduleVBDeletion(vb, NULL, 0);
setVBucketState(vbid, vbstate, false);
// Copy the all cursors from the old vbucket into the new vbucket
RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
- newvb->checkpointManager.resetTAPCursors(tap_cursors);
+ newvb->checkpointManager.resetCursors(tap_cursors);
rv = true;
}
void EventuallyPersistentStore::completeBGFetch(const std::string &key,
uint16_t vbucket,
- uint64_t rowid,
const void *cookie,
hrtime_t init,
bool isMeta) {
} else {
++stats.bg_fetched;
}
- getROUnderlying(vbucket)->get(key, rowid, vbucket, gcb);
+ getROUnderlying(vbucket)->get(key, vbucket, gcb);
gcb.waitForValue();
cb_assert(gcb.fired);
ENGINE_ERROR_CODE status = gcb.val.getStatus();
// returns, the item may have been updated and queued
// Hence test the CAS value to be the same first.
// exptime mutated, schedule it into new checkpoint
- queueDirty(vb, v, &hlh);
+ queueDirty(vb, v, &hlh, NULL);
}
} else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
v->setStoredValueState(
// underlying kvstore couldn't fetch requested data
// log returned error and notify TMPFAIL to client
LOG(EXTENSION_LOG_WARNING,
- "Warning: failed background fetch for vb=%d seq=%d "
- "key=%s", vbucket, v->getBySeqno(), key.c_str());
+ "Warning: failed background fetch for vb=%d "
+ "seq=%" PRId64 " key=%s", vbucket, v->getBySeqno(),
+ key.c_str());
status = ENGINE_TMPFAIL;
}
}
if (status == ENGINE_SUCCESS) {
v->unlocked_restoreValue(fetchedValue, vb->ht);
cb_assert(v->isResident());
+ ReaderLockHolder(vb->getStateLock());
if (vb->getState() == vbucket_state_active &&
v->getExptime() != fetchedValue->getExptime() &&
v->getCas() == fetchedValue->getCas()) {
// updated and queued
// Hence test the CAS value to be the same first.
// exptime mutated, schedule it into new checkpoint
- queueDirty(vb, v, &blh);
+ queueDirty(vb, v, &blh, NULL);
}
} else if (status == ENGINE_KEY_ENOENT) {
v->setStoredValueState(StoredValue::state_non_existent_key);
void EventuallyPersistentStore::bgFetch(const std::string &key,
uint16_t vbucket,
- uint64_t rowid,
const void *cookie,
bool isMeta) {
std::stringstream ss;
stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
bgFetchQueue.load());
ExecutorPool* iom = ExecutorPool::get();
- ExTask task = new BGFetchTask(&engine, key, vbucket, rowid, cookie,
- isMeta,
- Priority::BgFetcherGetMetaPriority,
- bgFetchDelay, false);
+ ExTask task = new SingleBGFetcherTask(&engine, key, vbucket, cookie,
+ isMeta, bgFetchDelay, false);
iom->schedule(task, READER_TASK_IDX);
ss << "Queued a background fetch, now at " << bgFetchQueue.load()
<< std::endl;
bool queueBG,
bool honorStates,
vbucket_state_t allowedState,
- bool trackReference) {
+ bool trackReference,
+ bool deleteTempItem,
+ bool hideLockedCAS) {
vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
vbucket_state_replica : vbucket_state_active;
StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
trackReference);
if (v) {
- if (v->isDeleted() || v->isTempDeletedItem() ||
- v->isTempNonExistentItem()) {
+ if (v->isDeleted()) {
GetValue rv;
return rv;
}
+ if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
+ // Delete a temp non-existent item to ensure that
+ // if the get were issued over an item that doesn't
+ // exist, then we dont preserve a temp item.
+ if (deleteTempItem) {
+ vb->ht.unlocked_del(key, bucket_num);
+ }
+ GetValue rv;
+ return rv;
+ }
+
// If the value is not resident, wait for it...
if (!v->isResident()) {
if (queueBG) {
- bgFetch(key, vbucket, v->getBySeqno(), cookie);
+ bgFetch(key, vbucket, cookie);
}
return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
true, v->getNRUValue());
}
- GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
- ENGINE_SUCCESS, v->getBySeqno(), false, v->getNRUValue());
+ // Should we hide (return -1) for the items' CAS?
+ const bool hide_cas = hideLockedCAS &&
+ v->isLocked(ep_current_time());
+ GetValue rv(v->toItem(hide_cas, vbucket), ENGINE_SUCCESS,
+ v->getBySeqno(), false, v->getNRUValue());
return rv;
} else {
if (eviction_policy == VALUE_ONLY || diskFlushAll) {
GetValue rv;
return rv;
}
- ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
- if (queueBG) { // Full eviction and need a bg fetch.
- ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
- cookie, false);
+
+ if (vb->maybeKeyExistsInFilter(key)) {
+ ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
+ if (queueBG) { // Full eviction and need a bg fetch.
+ ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
+ cookie, false);
+ }
+ return GetValue(NULL, ec, -1, true);
+ } else {
+ // As bloomfilter predicted that item surely doesn't exist
+ // on disk, return ENONET, for getInternal().
+ GetValue rv;
+ return rv;
}
- return GetValue(NULL, ec, -1, true);
}
}
const void *cookie,
ItemMetaData &metadata,
uint32_t &deleted,
+ uint8_t &confResMode,
bool trackReferenced)
{
(void) cookie;
stats.numOpsGetMeta++;
if (v->isTempInitialItem()) { // Need bg meta fetch.
- bgFetch(key, vbucket, -1, cookie, true);
+ bgFetch(key, vbucket, cookie, true);
return ENGINE_EWOULDBLOCK;
} else if (v->isTempNonExistentItem()) {
metadata.cas = v->getCas();
metadata.flags = v->getFlags();
metadata.exptime = v->getExptime();
metadata.revSeqno = v->getRevSeqno();
+ confResMode = v->getConflictResMode();
return ENGINE_SUCCESS;
}
} else {
// So, add a temporary item corresponding to the key to the hash table
// and schedule a background fetch for its metadata from the persistent
// store. The item's state will be updated after the fetch completes.
- return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
+ //
+ // Schedule this bgFetch only if the key is predicted to be may-be
+ // existent on disk by the bloomfilter.
+
+ if (vb->maybeKeyExistsInFilter(key)) {
+ return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
+ } else {
+ return ENGINE_KEY_ENOENT;
+ }
}
}
-ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
- uint64_t cas,
- const void *cookie,
- bool force,
- bool allowExisting,
- uint8_t nru,
- bool genBySeqno,
- bool isReplication)
+ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
+ const Item &itm,
+ uint64_t cas,
+ uint64_t *seqno,
+ const void *cookie,
+ bool force,
+ bool allowExisting,
+ uint8_t nru,
+ bool genBySeqno,
+ ExtendedMetaData *emd,
+ bool isReplication)
{
RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
if (!vb) {
}
}
+ //check for the incoming item's CAS validity
+ if (!Item::isValidCas(itm.getCas())) {
+ return ENGINE_KEY_EEXISTS;
+ }
+
int bucket_num(0);
LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
false);
+ bool maybeKeyExists = true;
if (!force) {
if (v) {
if (v->isTempInitialItem()) {
- bgFetch(itm.getKey(), itm.getVBucketId(), -1, cookie, true);
+ bgFetch(itm.getKey(), itm.getVBucketId(), cookie, true);
return ENGINE_EWOULDBLOCK;
}
- if (!conflictResolver->resolve(v, itm.getMetaData(), false)) {
+
+ enum conflict_resolution_mode confResMode = revision_seqno;
+ if (emd) {
+ confResMode = static_cast<enum conflict_resolution_mode>(
+ emd->getConflictResMode());
+ }
+
+ if (!conflictResolver->resolve(vb, v, itm.getMetaData(), false,
+ confResMode)) {
++stats.numOpsSetMetaResolutionFailed;
return ENGINE_KEY_EEXISTS;
}
} else {
- return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
- cookie, true, isReplication);
+ if (vb->maybeKeyExistsInFilter(itm.getKey())) {
+ return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
+ cookie, true, isReplication);
+ } else {
+ maybeKeyExists = false;
+ }
+ }
+ } else {
+ if (eviction_policy == FULL_EVICTION) {
+ // Check Bloomfilter's prediction
+ if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
+ maybeKeyExists = false;
+ }
}
}
vb->getState() == vbucket_state_pending)) {
v->unlock();
}
+
mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
true, eviction_policy, nru,
- isReplication);
+ maybeKeyExists, isReplication);
ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
switch (mtype) {
break;
case WAS_DIRTY:
case WAS_CLEAN:
- queueDirty(vb, v, &lh, false, true, genBySeqno);
+ /* set the conflict resolution mode from the extended meta data *
+ * Given that the mode is already set, we don't need to set the *
+ * conflict resolution mode in queueDirty */
+ if (emd) {
+ v->setConflictResMode(
+ static_cast<enum conflict_resolution_mode>(
+ emd->getConflictResMode()));
+ }
+ vb->setMaxCas(v->getCas());
+ queueDirty(vb, v, &lh, seqno, false, true, genBySeqno, false);
break;
case NOT_FOUND:
ret = ENGINE_KEY_ENOENT;
{ // CAS operation with non-resident item + full eviction.
if (v) { // temp item is already created. Simply schedule a
lh.unlock(); // bg fetch job.
- bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
+ bgFetch(itm.getKey(), vb->getId(), cookie, true);
return ENGINE_EWOULDBLOCK;
}
+
ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
cookie, true, isReplication);
}
}
+ // Update drift counter for vbucket upon a success only
+ if (ret == ENGINE_SUCCESS && emd) {
+ vb->setDriftCounter(emd->getAdjustedTime());
+ }
+
return ret;
}
}
if (!v->isResident()) {
- bgFetch(key, vbucket, v->getBySeqno(), cookie);
+ bgFetch(key, vbucket, cookie);
return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
}
if (v->isLocked(ep_current_time())) {
if (vb->getState() == vbucket_state_active) {
// persist the item in the underlying storage for
// mutated exptime but only if VB is active.
- queueDirty(vb, v, &lh);
+ queueDirty(vb, v, &lh, NULL);
}
}
return rv;
GetValue rv;
return rv;
} else {
- ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
- vb, cookie, false);
- return GetValue(NULL, ec, -1, true);
+ if (vb->maybeKeyExistsInFilter(key)) {
+ ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
+ key, vb, cookie,
+ false);
+ return GetValue(NULL, ec, -1, true);
+ } else {
+ // As bloomfilter predicted that item surely doesn't exist
+ // on disk, return ENOENT for getAndUpdateTtl().
+ GetValue rv;
+ return rv;
+ }
}
}
}
ExecutorPool* iom = ExecutorPool::get();
ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
v->getBySeqno(), cookie,
- Priority::VKeyStatBgFetcherPriority,
bgFetchDelay, false);
iom->schedule(task, READER_TASK_IDX);
return ENGINE_EWOULDBLOCK;
ExecutorPool* iom = ExecutorPool::get();
ExTask task = new VKeyStatBGFetchTask(&engine, key,
vbucket, -1, cookie,
- Priority::VKeyStatBgFetcherPriority,
bgFetchDelay, false);
iom->schedule(task, READER_TASK_IDX);
}
uint64_t bySeqNum) {
RememberingCallback<GetValue> gcb;
- getROUnderlying(vbid)->get(key, bySeqNum, vbid, gcb);
+ getROUnderlying(vbid)->get(key, vbid, gcb);
gcb.waitForValue();
cb_assert(gcb.fired);
// underlying kvstore couldn't fetch requested data
// log returned error and notify TMPFAIL to client
LOG(EXTENSION_LOG_WARNING,
- "Warning: failed background fetch for vb=%d seq=%d "
- "key=%s", vbid, v->getBySeqno(), key.c_str());
+ "Warning: failed background fetch for vb=%d "
+ "seq=%" PRId64 " key=%s", vbid, v->getBySeqno(),
+ key.c_str());
}
}
}
// If the value is not resident, wait for it...
if (!v->isResident()) {
if (cookie) {
- bgFetch(key, vbucket, v->getBySeqno(), cookie);
+ bgFetch(key, vbucket, cookie);
}
GetValue rv(NULL, ENGINE_EWOULDBLOCK, -1, true);
cb.callback(rv);
v->lock(currentTime + lockTimeout);
Item *it = v->toItem(false, vbucket);
- it->setCas();
+ it->setCas(vb->nextHLCCas());
v->setCas(it->getCas());
GetValue rv(it);
cb.callback(rv);
return true;
} else {
- ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
- vb, cookie, false);
- GetValue rv(NULL, ec, -1, true);
- cb.callback(rv);
- return false;
+ if (vb->maybeKeyExistsInFilter(key)) {
+ ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
+ key, vb, cookie,
+ false);
+ GetValue rv(NULL, ec, -1, true);
+ cb.callback(rv);
+ return false;
+ } else {
+ // As bloomfilter predicted that item surely doesn't exist
+ // on disk, return ENOENT for getLocked().
+ GetValue rv;
+ cb.callback(rv);
+ return true;
+ }
}
}
}
if (eviction_policy == FULL_EVICTION &&
v->isTempInitialItem() && bgfetch) {
lh.unlock();
- bgFetch(key, vbucket, -1, cookie, true);
+ bgFetch(key, vbucket, cookie, true);
return ENGINE_EWOULDBLOCK;
}
kstats.logically_deleted = v->isDeleted();
if (eviction_policy == VALUE_ONLY) {
return ENGINE_KEY_ENOENT;
} else {
- if (bgfetch) {
+ if (bgfetch && vb->maybeKeyExistsInFilter(key)) {
return addTempItemForBgFetch(lh, bucket_num, key, vb,
cookie, true);
} else {
+ // If bgFetch were false, or bloomfilter predicted that
+ // item surely doesn't exist on disk, return ENOENT for
+ // getKeyStats().
return ENGINE_KEY_ENOENT;
}
}
}
ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
- uint64_t* cas,
+ uint64_t *cas,
uint16_t vbucket,
const void *cookie,
bool force,
ItemMetaData *itemMeta,
+ mutation_descr_t *mutInfo,
bool tapBackfill)
{
RCPtr<VBucket> vb = getVBucket(vbucket);
} else { // Full eviction.
if (!force) {
if (!v) { // Item might be evicted from cache.
- return addTempItemForBgFetch(lh, bucket_num, key, vb,
- cookie, true);
+ if (vb->maybeKeyExistsInFilter(key)) {
+ return addTempItemForBgFetch(lh, bucket_num, key, vb,
+ cookie, true);
+ } else {
+ // As bloomfilter predicted that item surely doesn't
+ // exist on disk, return ENOENT for deleteItem().
+ return ENGINE_KEY_ENOENT;
+ }
} else if (v->isTempInitialItem()) {
lh.unlock();
- bgFetch(key, vbucket, -1, cookie, true);
+ bgFetch(key, vbucket, cookie, true);
return ENGINE_EWOULDBLOCK;
} else { // Non-existent or deleted key.
+ if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
+ // Delete a temp non-existent item to ensure that
+ // if a delete were issued over an item that doesn't
+ // exist, then we don't preserve a temp item.
+ vb->ht.unlocked_del(key, bucket_num);
+ }
return ENGINE_KEY_ENOENT;
}
} else {
if (!v) { // Item might be evicted from cache.
// Create a temp item and delete it below as it is a
- // force deletion.
- add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num,
- key,
- eviction_policy);
- if (rv == ADD_NOMEM) {
- return ENGINE_ENOMEM;
+ // force deletion, only if bloomfilter predicts that
+ // item may exist on disk.
+ if (vb->maybeKeyExistsInFilter(key)) {
+ add_type_t rv = vb->ht.unlocked_addTempItem(
+ bucket_num,
+ key,
+ eviction_policy);
+ if (rv == ADD_NOMEM) {
+ return ENGINE_ENOMEM;
+ }
+ v = vb->ht.unlocked_find(key, bucket_num, true, false);
+ v->setStoredValueState(StoredValue::state_deleted_key);
+ } else {
+ return ENGINE_KEY_ENOENT;
}
- v = vb->ht.unlocked_find(key, bucket_num, true, false);
- v->setStoredValueState(StoredValue::state_deleted_key);
} else if (v->isTempInitialItem()) {
v->setStoredValueState(StoredValue::state_deleted_key);
} else { // Non-existent or deleted key.
+ if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
+ // Delete a temp non-existent item to ensure that
+ // if a delete were issued over an item that doesn't
+ // exist, then we don't preserve a temp item.
+ vb->ht.unlocked_del(key, bucket_num);
+ }
return ENGINE_KEY_ENOENT;
}
}
}
*cas = v ? v->getCas() : 0;
+ uint64_t seqno = 0;
ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
switch (delrv) {
case NOMEM:
case NOT_FOUND:
ret = ENGINE_KEY_ENOENT;
if (v) {
- queueDirty(vb, v, &lh, tapBackfill);
+ queueDirty(vb, v, &lh, NULL, tapBackfill);
}
break;
case WAS_DIRTY:
case WAS_CLEAN:
- queueDirty(vb, v, &lh, tapBackfill);
+ queueDirty(vb, v, &lh, &seqno, tapBackfill);
+ mutInfo->seqno = seqno;
+ mutInfo->vbucket_uuid = vb->failovers->getLatestUUID();
break;
case NEED_BG_FETCH:
// We already figured out if a bg fetch is requred for a full-evicted
}
ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
- const std::string &key,
- uint64_t* cas,
- uint16_t vbucket,
- const void *cookie,
- bool force,
- ItemMetaData *itemMeta,
- bool tapBackfill,
- bool genBySeqno,
- uint64_t bySeqno,
- bool isReplication)
+ const std::string &key,
+ uint64_t *cas,
+ uint64_t *seqno,
+ uint16_t vbucket,
+ const void *cookie,
+ bool force,
+ ItemMetaData *itemMeta,
+ bool tapBackfill,
+ bool genBySeqno,
+ uint64_t bySeqno,
+ ExtendedMetaData *emd,
+ bool isReplication)
{
RCPtr<VBucket> vb = getVBucket(vbucket);
if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
}
}
+ //check for the incoming item's CAS validity
+ if (!Item::isValidCas(itemMeta->cas)) {
+ return ENGINE_KEY_EEXISTS;
+ }
+
int bucket_num(0);
LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
if (!force) { // Need conflict resolution.
if (v) {
if (v->isTempInitialItem()) {
- bgFetch(key, vbucket, -1, cookie, true);
+ bgFetch(key, vbucket, cookie, true);
return ENGINE_EWOULDBLOCK;
}
- if (!conflictResolver->resolve(v, *itemMeta, true)) {
+
+ enum conflict_resolution_mode confResMode = revision_seqno;
+ if (emd) {
+ confResMode = static_cast<enum conflict_resolution_mode>(
+ emd->getConflictResMode());
+ }
+
+ if (!conflictResolver->resolve(vb, v, *itemMeta, true, confResMode)) {
++stats.numOpsDelMetaResolutionFailed;
return ENGINE_KEY_EEXISTS;
}
} else {
// Item is 1) deleted or not existent in the value eviction case OR
// 2) deleted or evicted in the full eviction.
- return addTempItemForBgFetch(lh, bucket_num, key, vb,
- cookie, true, isReplication);
+ if (vb->maybeKeyExistsInFilter(key)) {
+ return addTempItemForBgFetch(lh, bucket_num, key, vb,
+ cookie, true, isReplication);
+ } else {
+ // Even though bloomfilter predicted that item doesn't exist
+ // on disk, we must put this delete on disk if the cas is valid.
+ add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
+ eviction_policy,
+ isReplication);
+ if (rv == ADD_NOMEM) {
+ return ENGINE_ENOMEM;
+ }
+ v = vb->ht.unlocked_find(key, bucket_num, true, false);
+ v->setStoredValueState(StoredValue::state_deleted_key);
+ }
}
} else {
if (!v) {
- // Create a temp item and delete it below as it is a force deletion
+ // We should always try to persist a delete here.
add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
eviction_policy,
isReplication);
}
v = vb->ht.unlocked_find(key, bucket_num, true, false);
v->setStoredValueState(StoredValue::state_deleted_key);
+ v->setCas(*cas);
} else if (v->isTempInitialItem()) {
v->setStoredValueState(StoredValue::state_deleted_key);
+ v->setCas(*cas);
}
}
if (!genBySeqno) {
v->setBySeqno(bySeqno);
}
- queueDirty(vb, v, &lh, tapBackfill, true, genBySeqno);
+
+ /* set the conflict resolution mode from the extended meta data *
+ * Given that the mode is already set, we don't need to set the *
+ * conflict resolution mode in queueDirty */
+ if (emd) {
+ v->setConflictResMode(
+ static_cast<enum conflict_resolution_mode>(
+ emd->getConflictResMode()));
+ }
+ vb->setMaxCas(v->getCas());
+ queueDirty(vb, v, &lh, seqno, tapBackfill, true, genBySeqno, false);
break;
case NEED_BG_FETCH:
lh.unlock();
- bgFetch(key, vbucket, -1, cookie, true);
+ bgFetch(key, vbucket, cookie, true);
ret = ENGINE_EWOULDBLOCK;
}
+ // Update drift counter for vbucket upon a success only
+ if (ret == ENGINE_SUCCESS && emd) {
+ vb->setDriftCounter(emd->getAdjustedTime());
+ }
+
return ret;
}
vb->ht.clear();
vb->checkpointManager.clear(vb->getState());
vb->resetStats();
- vb->setCurrentSnapshot(0, 0);
+ vb->setPersistedSnapshot(0, 0);
}
}
- bool inverse = false;
- if (diskFlushAll.compare_exchange_strong(inverse, true)) {
- ++stats.diskQueueSize;
- // wake up (notify) one flusher is good enough for diskFlushAll
- vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
- }
+ ++stats.diskQueueSize;
+ bool inverse = true;
+ flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, false);
+ // Waking up (notifying) one flusher is good enough for diskFlushAll
+ vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
}
/**
++vbucket->opsCreate;
vbucket->incrMetaDataDisk(*queuedItem);
} else { // Update in full eviction mode.
- --vbucket->ht.numTotalItems;
+ vbucket->ht.decrNumTotalItems();
++vbucket->opsUpdate;
}
v->setNewCacheItem(false);
// exists on DB file, but not in memory (i.e., full eviction),
// because we created the temp item in memory and incremented
// the item counter when a deletion is pushed in the queue.
- --vbucket->ht.numTotalItems;
+ vbucket->ht.decrNumTotalItems();
}
}
+ /**
+ * Deleted items are to be added to the bloomfilter,
+ * in either eviction policy.
+ */
+ vbucket->addToFilter(queuedItem->getKey());
+
if (value > 0) {
++stats->totalPersisted;
++vbucket->opsDelete;
DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
};
+bool EventuallyPersistentStore::scheduleFlushAllTask(const void* cookie,
+ time_t when) {
+ bool inverse = false;
+ if (diskFlushAll.compare_exchange_strong(inverse, true)) {
+ flushAllTaskCtx.cookie = cookie;
+ flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
+ ExTask task = new FlushAllTask(&engine, static_cast<double>(when));
+ ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void EventuallyPersistentStore::setFlushAllComplete() {
+ // Notify memcached about flushAll task completion, and
+ // set diskFlushall flag to false
+ if (flushAllTaskCtx.cookie) {
+ engine.notifyIOComplete(flushAllTaskCtx.cookie, ENGINE_SUCCESS);
+ }
+ bool inverse = false;
+ flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
+ inverse = true;
+ diskFlushAll.compare_exchange_strong(inverse, false);
+}
+
void EventuallyPersistentStore::flushOneDeleteAll() {
for (size_t i = 0; i < vbMap.getSize(); ++i) {
RCPtr<VBucket> vb = getVBucket(i);
}
}
- bool inverse = true;
- diskFlushAll.compare_exchange_strong(inverse, false);
stats.decrDiskQueueSize(1);
+ setFlushAllComplete();
}
int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
KVShard *shard = vbMap.getShard(vbid);
- if (diskFlushAll) {
+ if (diskFlushAll && !flushAllTaskCtx.delayFlushAll) {
if (shard->getId() == EP_PRIMARY_SHARD) {
flushOneDeleteAll();
} else {
vb->rejectQueue.pop();
}
- LockHolder slh = vb->getSnapshotLock();
- uint64_t snapStart;
- uint64_t snapEnd;
- vb->getCurrentSnapshot_UNLOCKED(snapStart, snapEnd);
-
+ const std::string cursor(CheckpointManager::pCursorName);
vb->getBackfillItems(items);
- vb->checkpointManager.getAllItemsForPersistence(items);
- slh.unlock();
+
+ snapshot_range_t range;
+ range = vb->checkpointManager.getAllItemsForCursor(cursor, items);
if (!items.empty()) {
while (!rwUnderlying->begin()) {
Item *prev = NULL;
uint64_t maxSeqno = 0;
+ uint64_t maxCas = 0;
std::list<PersistenceCallback*> pcbs;
std::vector<queued_item>::iterator it = items.begin();
for(; it != items.end(); ++it) {
if (cb) {
pcbs.push_back(cb);
}
+
maxSeqno = std::max(maxSeqno, (uint64_t)(*it)->getBySeqno());
+ maxCas = std::max(maxCas, (uint64_t)(*it)->getCas());
++stats.flusher_todo;
} else {
stats.decrDiskQueueSize(1);
hrtime_t start = gethrtime();
if (vb->getState() == vbucket_state_active) {
- snapStart = maxSeqno;
- snapEnd = maxSeqno;
+ range.start = maxSeqno;
+ range.end = maxSeqno;
}
- while (!rwUnderlying->commit(&cb, snapStart, snapEnd)) {
+ while (!rwUnderlying->commit(&cb, range.start, range.end, maxCas,
+ vb->getDriftCounter())) {
++stats.commitFailed;
LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
"1 sec...\n");
}
if (vb->rejectQueue.empty()) {
+ vb->setPersistedSnapshot(range.start, range.end);
uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
if (highSeqno > 0 &&
highSeqno != vbMap.getPersistenceSeqno(vbid)) {
vbMap.setPersistenceSeqno(vbid, highSeqno);
+ vb->notifySeqnoPersisted(highSeqno);
}
}
if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
vbMap.setPersistenceCheckpointId(vbid, chkid);
}
+ } else {
+ return RETRY_FLUSH_VBUCKET;
}
}
void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
StoredValue* v,
LockHolder *plh,
+ uint64_t *seqno,
bool tapBackfill,
bool notifyReplicator,
- bool genBySeqno) {
+ bool genBySeqno,
+ bool setConflictMode) {
if (vb) {
+ if (setConflictMode && (v->getConflictResMode() != last_write_wins) &&
+ vb->isTimeSyncEnabled()) {
+ v->setConflictResMode(last_write_wins);
+ }
+
queued_item qi(v->toItem(false, vb->getId()));
- bool rv;
- if (genBySeqno) {
- LockHolder slh = vb->getSnapshotLock();
- rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
- vb->checkpointManager.queueDirty(vb, qi,
- genBySeqno);
- v->setBySeqno(qi->getBySeqno());
- vb->setCurrentSnapshot_UNLOCKED(qi->getBySeqno(), qi->getBySeqno());
- } else {
- rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
- vb->checkpointManager.queueDirty(vb, qi,
- genBySeqno);
- v->setBySeqno(qi->getBySeqno());
+
+ bool rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
+ vb->checkpointManager.queueDirty(vb, qi,
+ genBySeqno);
+ v->setBySeqno(qi->getBySeqno());
+
+ if (seqno) {
+ *seqno = v->getBySeqno();
}
if (plh) {
void EventuallyPersistentStore::warmupCompleted() {
// Run the vbucket state snapshot job once after the warmup
- scheduleVBSnapshot(Priority::VBucketPersistHighPriority);
+ scheduleVBSnapshot(VBSnapshotTask::Priority::HIGH);
if (engine.getConfiguration().getAlogPath().length() > 0) {
// right after warmup. Subsequent snapshot tasks will be scheduled every
// 60 sec by default.
ExecutorPool *iom = ExecutorPool::get();
- ExTask task = new StatSnap(&engine, Priority::StatSnapPriority, 0, false);
+ ExTask task = new StatSnap(&engine, 0, false);
statsSnapshotTaskId = iom->schedule(task, WRITER_TASK_IDX);
}
accessScanner.sleeptime = alogSleepTime * 60;
if (accessScanner.sleeptime != 0) {
ExTask task = new AccessScanner(*this, stats,
- Priority::AccessScannerPriority,
accessScanner.sleeptime);
accessScanner.task = ExecutorPool::get()->schedule(task,
AUXIO_TASK_IDX);
accessScanner.sleeptime = val * 60;
if (accessScanner.sleeptime != 0) {
ExTask task = new AccessScanner(*this, stats,
- Priority::AccessScannerPriority,
accessScanner.sleeptime);
accessScanner.task = ExecutorPool::get()->schedule(task,
AUXIO_TASK_IDX);
ExecutorPool::get()->cancel(accessScanner.task);
// re-schedule task according to the new task start hour
ExTask task = new AccessScanner(*this, stats,
- Priority::AccessScannerPriority,
accessScanner.sleeptime);
accessScanner.task = ExecutorPool::get()->schedule(task,
AUXIO_TASK_IDX);
}
}
+void EventuallyPersistentStore::setAllBloomFilters(bool to) {
+ size_t maxSize = vbMap.getSize();
+ cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
+ for (size_t i = 0; i < maxSize; i++) {
+ uint16_t vbid = static_cast<uint16_t>(i);
+ RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+ if (vb) {
+ if (to) {
+ vb->setFilterStatus(BFILTER_ENABLED);
+ } else {
+ vb->setFilterStatus(BFILTER_DISABLED);
+ }
+ }
+ }
+}
+
void EventuallyPersistentStore::visit(VBucketVisitor &visitor)
{
size_t maxSize = vbMap.getSize();
visitor.complete();
}
-VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s,
+EventuallyPersistentStore::Position
+EventuallyPersistentStore::pauseResumeVisit(PauseResumeEPStoreVisitor& visitor,
+ Position& start_pos)
+{
+ const size_t maxSize = vbMap.getSize();
+
+ uint16_t vbid = start_pos.vbucket_id;
+ for (; vbid < maxSize; ++vbid) {
+ RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+ if (vb) {
+ bool paused = !visitor.visit(vbid, vb->ht);
+ if (paused) {
+ break;
+ }
+ }
+ }
+
+ return EventuallyPersistentStore::Position(vbid);
+}
+
+EventuallyPersistentStore::Position
+EventuallyPersistentStore::startPosition() const
+{
+ return EventuallyPersistentStore::Position(0);
+}
+
+EventuallyPersistentStore::Position
+EventuallyPersistentStore::endPosition() const
+{
+ return EventuallyPersistentStore::Position(vbMap.getSize());
+}
+
+VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s, TaskId id,
shared_ptr<VBucketVisitor> v,
- const char *l, const Priority &p, double sleep) :
- GlobalTask(&s->getEPEngine(), p, 0, false), store(s),
+ const char *l, double sleep) :
+ GlobalTask(&s->getEPEngine(), id, 0, false), store(s),
visitor(v), label(l), sleepTime(sleep), currentvb(0)
{
const VBucketFilter &vbFilter = visitor->getVBucketFilter();
VBucketVisitorTask::VBucketVisitorTask(EventuallyPersistentStore *s,
shared_ptr<VBucketVisitor> v,
uint16_t sh, const char *l,
- double sleep, bool shutdown):
- GlobalTask(&(s->getEPEngine()), Priority::AccessScannerPriority,
- 0, shutdown),
- store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
- shardID(sh)
-{
+ double sleep, bool shutdown)
+ : GlobalTask(&(s->getEPEngine()), TaskId::VBucketVisitorTask, 0, shutdown),
+ store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
+ shardID(sh) {
const VBucketFilter &vbFilter = visitor->getVBucketFilter();
std::vector<int> vbs = store->vbMap.getShard(shardID)->getVBuckets();
cb_assert(vbs.size() <= std::numeric_limits<uint16_t>::max());
shard->getROUnderlying()->resetStats();
}
- for (size_t i = 0; i < MAX_TYPE_ID; i++) {
+ for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
stats.schedulingHisto[i].reset();
stats.taskRuntimeHisto[i].reset();
}
if (result.success) {
RCPtr<VBucket> vb = vbMap.getBucket(vbid);
vb->failovers->pruneEntries(result.highSeqno);
- vb->checkpointManager.clear(vb->getState());
- vb->checkpointManager.setBySeqno(result.highSeqno);
- vb->setCurrentSnapshot(result.snapStartSeqno, result.snapEndSeqno);
+ vb->checkpointManager.clear(vb, result.highSeqno);
+ vb->setPersistedSnapshot(result.snapStartSeqno, result.snapEndSeqno);
return ENGINE_SUCCESS;
}
}
}
return ENGINE_NOT_MY_VBUCKET;
}
+
+void EventuallyPersistentStore::runDefragmenterTask() {
+ defragmenterTask->run();
+}
+
+std::ostream& operator<<(std::ostream& os,
+ const EventuallyPersistentStore::Position& pos) {
+ os << "vbucket:" << pos.vbucket_id;
+ return os;
+}