#include "ep_engine.h"
#include "mutation_log.h"
-class ItemAccessVisitor : public VBucketVisitor {
+class ItemAccessVisitor : public VBucketVisitor,
+ public HashTableVisitor {
public:
ItemAccessVisitor(EventuallyPersistentStore &_store, EPStats &_stats,
uint16_t sh, AtomicValue<bool> &sfin, AccessScanner &aS) :
}
}
- void visit(StoredValue *v) {
+ void visit(StoredValue *v) override {
if (log != NULL && v->isResident()) {
if (v->isExpired(startTime) || v->isDeleted()) {
LOG(EXTENSION_LOG_INFO,
accessed.clear();
}
- bool visitBucket(RCPtr<VBucket> &vb) {
+ void visitBucket(RCPtr<VBucket> &vb) override {
+ currentBucket = vb;
update();
if (log == NULL) {
- return false;
+ return;
}
- return VBucketVisitor::visitBucket(vb);
+ if (vBucketFilter(vb->getId())) {
+ vb->ht.visit(*this);
+ }
}
- virtual void complete() {
+ void complete() override {
update();
if (log == nullptr) {
MutationLog *log;
AtomicValue<bool> &stateFinalizer;
AccessScanner &as;
+ RCPtr<VBucket> currentBucket;
};
AccessScanner::AccessScanner(EventuallyPersistentStore &_store, EPStats &st,
return rv.str();
}
-bool BackFillVisitor::visitBucket(RCPtr<VBucket> &vb) {
- if (VBucketVisitor::visitBucket(vb)) {
+void BackFillVisitor::visitBucket(RCPtr<VBucket> &vb) {
+ if (vBucketFilter(vb->getId())) {
item_eviction_policy_t policy =
engine->getEpStore()->getItemEvictionPolicy();
double num_items = static_cast<double>(vb->getNumItems(policy));
if (num_items == 0) {
- return false;
+ return;
}
KVStore *underlying(engine->getEpStore()->
0, false);
ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
}
- return false;
-}
-
-void BackFillVisitor::visit(StoredValue*) {
- abort();
}
bool BackFillVisitor::pauseVisitor() {
virtual ~BackFillVisitor() {}
- bool visitBucket(RCPtr<VBucket> &vb);
+ void visitBucket(RCPtr<VBucket> &vb) override;
- void visit(StoredValue *v);
-
- bool shouldContinue() {
- return checkValidity();
- }
-
- void complete(void);
+ void complete(void) override;
private:
- bool pauseVisitor();
+ bool pauseVisitor() override;
bool checkValidity();
: store(s), stats(st), removed(0), taskStart(gethrtime()),
wasHighMemoryUsage(s->isMemoryUsageTooHigh()), stateFinalizer(sfin) {}
- bool visitBucket(RCPtr<VBucket> &vb) {
- currentBucket = vb;
+ void visitBucket(RCPtr<VBucket> &vb) override {
bool newCheckpointCreated = false;
removed = vb->checkpointManager.removeClosedUnrefCheckpoints(vb,
newCheckpointCreated);
vb->getId(),
vb->checkpointManager.getHighSeqno());
}
- update();
- return false;
- }
- void update() {
stats.itemsRemovedFromCheckpoints.fetch_add(removed);
if (removed > 0) {
LOG(EXTENSION_LOG_INFO,
"Removed %ld closed unreferenced checkpoints from VBucket %d",
- removed, currentBucket->getId());
+ removed, vb->getId());
}
removed = 0;
}
- void complete() {
+ void complete() override {
bool inverse = false;
stateFinalizer.compare_exchange_strong(inverse, true);
for (VBucketMap::id_type vbid = 0; vbid < vbMap.getSize(); ++vbid) {
RCPtr<VBucket> vb = vbMap.getBucket(vbid);
if (vb) {
- bool wantData = visitor.visitBucket(vb);
- // We could've lost this along the way.
- if (wantData) {
- vb->ht.visit(visitor);
- }
+ visitor.visitBucket(vb);
}
}
visitor.complete();
snooze(sleepTime);
return true;
}
- if (visitor->visitBucket(vb)) {
- vb->ht.visit(*visitor);
- }
+ visitor->visitBucket(vb);
}
vbList.pop();
}
snooze(sleepTime);
return true;
}
- if (visitor->visitBucket(vb)) {
- vb->ht.visit(*visitor);
- }
+ visitor->visitBucket(vb);
}
vbList.pop();
}
/**
* vbucket-aware hashtable visitor.
+ *
+ * The caller (e.g. EventuallyPersistentStore::visit) will call visitBucket()
+ * for each valid VBucket, finally calling complete() after all vBuckets have
+ * been visited.
+ *
+ * Callers *may* call the pauseVisitor() method periodically (typically between
+ * vBuckets) which should return true if visiting VBuckets should be paused
+ * temporarily (typically to break up long-running visitation tasks to allow
+ * other Tasks to run).
*/
-class VBucketVisitor : public HashTableVisitor {
+class VBucketVisitor {
public:
- VBucketVisitor() : HashTableVisitor() { }
+ VBucketVisitor() { }
+
+ virtual ~VBucketVisitor() {}
- VBucketVisitor(const VBucketFilter &filter) :
- HashTableVisitor(), vBucketFilter(filter) { }
+ VBucketVisitor(const VBucketFilter &filter)
+ : vBucketFilter(filter) { }
/**
* Begin visiting a bucket.
*
* @param vb the vbucket we are beginning to visit
- *
- * @return true iff we want to walk the hashtable in this vbucket
*/
- virtual bool visitBucket(RCPtr<VBucket> &vb) {
- if (vBucketFilter(vb->getId())) {
- currentBucket = vb;
- return true;
- }
- return false;
- }
-
- // This is unused in all implementations so far.
- void visit(StoredValue* v) {
- (void)v;
- abort();
- }
+ virtual void visitBucket(RCPtr<VBucket> &vb) = 0;
const VBucketFilter &getVBucketFilter() {
return vBucketFilter;
protected:
VBucketFilter vBucketFilter;
- RCPtr<VBucket> currentBucket;
};
// Forward declaration
ExecutorPool::get()->schedule(backfillTask, NONIO_TASK_IDX);
}
-bool VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
+void VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
++numVbucket;
item_eviction_policy_t policy = engine.getEpStore()->
getItemEvictionPolicy();
totalHLCDriftExceptionCounters.ahead += driftExceptionCounters.ahead;
totalHLCDriftExceptionCounters.behind += driftExceptionCounters.behind;
}
-
- return false;
}
-bool VBucketCountAggregator::visitBucket(RCPtr<VBucket> &vb) {
+void VBucketCountAggregator::visitBucket(RCPtr<VBucket> &vb) {
std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
it = visitorMap.find(vb->getState());
if ( it != visitorMap.end() ) {
it->second->visitBucket(vb);
}
-
- return false;
}
void VBucketCountAggregator::addVisitor(VBucketCountVisitor* visitor) {
isPrevState(isPrevStateRequested),
isDetailsRequested(detailsRequested) {}
- bool visitBucket(RCPtr<VBucket> &vb) {
+ void visitBucket(RCPtr<VBucket> &vb) override {
addVBStats(cookie, add_stat, vb, eps, isPrevState,
isDetailsRequested);
- return false;
}
static void addVBStats(const void *cookie, ADD_STAT add_stat,
StatVBucketVisitor(const void *c, ADD_STAT a) : cookie(c),
add_stat(a) {}
- bool visitBucket(RCPtr<VBucket> &vb) {
+ void visitBucket(RCPtr<VBucket> &vb) override {
uint16_t vbid = vb->getId();
char buf[32];
try {
"StatVBucketVisitor::visitBucket: Failed to build stat: %s",
error.what());
}
- return false;
}
const void *cookie;
StatCheckpointVisitor(EventuallyPersistentStore * eps, const void *c,
ADD_STAT a) : epstore(eps), cookie(c), add_stat(a) {}
- bool visitBucket(RCPtr<VBucket> &vb) {
+ void visitBucket(RCPtr<VBucket> &vb) override {
addCheckpointStat(cookie, add_stat, epstore, vb);
- return false;
}
static void addCheckpointStat(const void *cookie, ADD_STAT add_stat,
public:
StatVBucketVisitor(const void *c, ADD_STAT a) :
cookie(c), add_stat(a) {}
- bool visitBucket(RCPtr<VBucket> &vb) {
+
+ void visitBucket(RCPtr<VBucket> &vb) override {
vb->failovers->addStats(cookie, vb->getId(), add_stat);
- return false;
}
+
private:
const void *cookie;
ADD_STAT add_stat;
: cookie(c), add_stat(a), detailed(d), fileSpaceUsed(0),
fileSize(0) {}
- bool visitBucket(RCPtr<VBucket> &vb) {
+ void visitBucket(RCPtr<VBucket> &vb) override {
if (detailed) {
char buf[32];
uint16_t vbid = vb->getId();
fileSpaceUsed += vb->fileSpaceUsed;
fileSize += vb->fileSize;
- return false;
}
size_t getFileSize() {
totalHLCDriftExceptionCounters()
{ }
- bool visitBucket(RCPtr<VBucket> &vb);
-
- void visit(StoredValue* v) {
- throw std::logic_error("VBucketCountVisitor:visit: Should never "
- "be called");
- }
+ void visitBucket(RCPtr<VBucket> &vb) override;
vbucket_state_t getVBucketState() { return desired_state; }
*/
class VBucketCountAggregator : public VBucketVisitor {
public:
- bool visitBucket(RCPtr<VBucket> &vb);
+ void visitBucket(RCPtr<VBucket> &vb) override;
void addVisitor(VBucketCountVisitor* visitor);
private:
*/
class ResizingVisitor : public VBucketVisitor {
public:
-
ResizingVisitor() { }
- bool visitBucket(RCPtr<VBucket> &vb) {
+ void visitBucket(RCPtr<VBucket> &vb) override {
vb->ht.resize();
- return false;
}
-
};
bool HashtableResizerTask::run(void) {
* As part of the ItemPager, visit all of the objects in memory and
* eject some within a constrained probability
*/
-class PagingVisitor : public VBucketVisitor {
+class PagingVisitor : public VBucketVisitor,
+ public HashTableVisitor {
public:
/**
wasHighMemoryUsage(s.isMemoryUsageTooHigh()),
taskStart(gethrtime()), pager_phase(phase) {}
- void visit(StoredValue *v) {
+ void visit(StoredValue *v) override {
// Delete expired items for an active vbucket.
bool isExpired = (currentBucket->getState() == vbucket_state_active) &&
v->isExpired(startTime) && !v->isDeleted();
}
}
- bool visitBucket(RCPtr<VBucket> &vb) {
+ void visitBucket(RCPtr<VBucket> &vb) override {
update();
bool newCheckpointCreated = false;
// fast path for expiry item pager
if (percent <= 0 || !pager_phase) {
- return VBucketVisitor::visitBucket(vb);
+ if (vBucketFilter(vb->getId())) {
+ currentBucket = vb;
+ vb->ht.visit(*this);
+ }
+ return;
}
// skip active vbuckets if active resident ratio is lower than replica
double high = static_cast<double>(stats.mem_high_wat);
if (vb->getState() == vbucket_state_active && current < high &&
store.cachedResidentRatio.activeRatio <
- store.cachedResidentRatio.replicaRatio)
- {
- return false;
+ store.cachedResidentRatio.replicaRatio) {
+ return;
}
if (current > lower) {
double p = (current - static_cast<double>(lower)) / current;
adjustPercent(p, vb->getState());
- return VBucketVisitor::visitBucket(vb);
+ if (vBucketFilter(vb->getId())) {
+ currentBucket = vb;
+ vb->ht.visit(*this);
+ }
+
} else { // stop eviction whenever memory usage is below low watermark
completePhase = false;
- return false;
}
}
expired.clear();
}
- bool pauseVisitor() {
+ bool pauseVisitor() override {
size_t queueSize = stats.diskQueueSize.load();
return canPause && queueSize >= MAX_PERSISTENCE_QUEUE_SIZE;
}
- void complete() {
+ void complete() override {
update();
hrtime_t elapsed_time = (gethrtime() - taskStart) / 1000;
bool wasHighMemoryUsage;
hrtime_t taskStart;
std::atomic<item_pager_phase>* pager_phase;
+ RCPtr<VBucket> currentBucket;
};
ItemPager::ItemPager(EventuallyPersistentEngine *e, EPStats &st) :
}
void LoadStorageKVPairCallback::purge() {
- class EmergencyPurgeVisitor : public VBucketVisitor {
+ class EmergencyPurgeVisitor : public VBucketVisitor,
+ public HashTableVisitor {
public:
EmergencyPurgeVisitor(EventuallyPersistentStore& store) :
epstore(store) {}
- void visit(StoredValue *v) {
+ void visitBucket(RCPtr<VBucket> &vb) override {
+ if (vBucketFilter(vb->getId())) {
+ currentBucket = vb;
+ vb->ht.visit(*this);
+ }
+ }
+
+ void visit(StoredValue *v) override {
currentBucket->ht.unlocked_ejectItem(v,
epstore.getItemEvictionPolicy());
}
+
private:
EventuallyPersistentStore& epstore;
- };
+ RCPtr<VBucket> currentBucket;
+ };
auto vbucketIds(vbuckets.getBuckets());
EmergencyPurgeVisitor epv(epstore);
for (auto vbid : vbucketIds) {
RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
- if (vb && epv.visitBucket(vb)) {
- vb->ht.visit(epv);
+ if (vb) {
+ epv.visitBucket(vb);
}
}
hasPurged = true;