MB-21540: Refactor VBucketVisitor to remove need for abort 71/69371/5
authorDave Rigby <daver@couchbase.com>
Tue, 1 Nov 2016 15:42:12 +0000 (15:42 +0000)
committerDave Rigby <daver@couchbase.com>
Fri, 4 Nov 2016 13:29:48 +0000 (13:29 +0000)
VBucketVisitor is used to visit all VBuckets in a Bucket, and is
currently implemented as a derived class of HashTableVisitor - as the
intent is that concrete classes will iterate at two levels - first
over each VBucket, and then for all StoredValues within the HashTable
of that VBucket.

However, not all derived classes actually visit the individual
StoredValues - many just operate at the VBucket level. As such, these
classes do not expect the visit(StoredValue*) method to be called, and
have abort()s on these code paths.

This is undesirable - it should be possible to statically (i.e. at
compile-time) ensure that such functions are never called.

To address this (and remove the abort() calls), modify VBucketVisitor
so it doesn't inherit from HashTableVisitor - essentially it just
deals with visiting at the VBucket level, which *is* true for all
derived classes. For those derived classes which /do/ wish to perform
recursive visiting of StoredValue, the derived class itself can
inherit from HashTableVisitor.

Additionally, as VBucketVisitor only deals with VBucket visiting,
simpify the visiting logic to a "classic" Visitor pattern, making
visitBucket return void (instead of bool), and let the visitBucket()
method itself handle what do with the VBucket is it passed. This means
that the caller (e.g. EventuallyPersistentStore::visit()) no longer
chooses if the HashTable::visit method is called - the derived class
can call that itself if it chooses.

Change-Id: I1dc8724db9e89daff261bccfe22dc6e2db885b22
Reviewed-on: http://review.couchbase.org/69371
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Daniel Owen <owend@couchbase.com>
src/access_scanner.cc
src/backfill.cc
src/backfill.h
src/checkpoint_remover.cc
src/ep.cc
src/ep.h
src/ep_engine.cc
src/ep_engine.h
src/htresizer.cc
src/item_pager.cc
src/warmup.cc

index 94a5de9..c6053ef 100644 (file)
@@ -23,7 +23,8 @@
 #include "ep_engine.h"
 #include "mutation_log.h"
 
-class ItemAccessVisitor : public VBucketVisitor {
+class ItemAccessVisitor : public VBucketVisitor,
+                          public HashTableVisitor {
 public:
     ItemAccessVisitor(EventuallyPersistentStore &_store, EPStats &_stats,
                       uint16_t sh, AtomicValue<bool> &sfin, AccessScanner &aS) :
@@ -51,7 +52,7 @@ public:
         }
     }
 
-    void visit(StoredValue *v) {
+    void visit(StoredValue *v) override {
         if (log != NULL && v->isResident()) {
             if (v->isExpired(startTime) || v->isDeleted()) {
                 LOG(EXTENSION_LOG_INFO,
@@ -72,17 +73,20 @@ public:
         accessed.clear();
     }
 
-    bool visitBucket(RCPtr<VBucket> &vb) {
+    void visitBucket(RCPtr<VBucket> &vb) override {
+        currentBucket = vb;
         update();
 
         if (log == NULL) {
-            return false;
+            return;
         }
 
-        return VBucketVisitor::visitBucket(vb);
+        if (vBucketFilter(vb->getId())) {
+            vb->ht.visit(*this);
+        }
     }
 
-    virtual void complete() {
+    void complete() override {
         update();
 
         if (log == nullptr) {
@@ -174,6 +178,7 @@ private:
     MutationLog *log;
     AtomicValue<bool> &stateFinalizer;
     AccessScanner &as;
+    RCPtr<VBucket> currentBucket;
 };
 
 AccessScanner::AccessScanner(EventuallyPersistentStore &_store, EPStats &st,
index 463bd59..42ab731 100644 (file)
@@ -168,14 +168,14 @@ std::string BackfillDiskLoad::getDescription() {
     return rv.str();
 }
 
-bool BackFillVisitor::visitBucket(RCPtr<VBucket> &vb) {
-    if (VBucketVisitor::visitBucket(vb)) {
+void BackFillVisitor::visitBucket(RCPtr<VBucket> &vb) {
+    if (vBucketFilter(vb->getId())) {
         item_eviction_policy_t policy =
             engine->getEpStore()->getItemEvictionPolicy();
         double num_items = static_cast<double>(vb->getNumItems(policy));
 
         if (num_items == 0) {
-            return false;
+            return;
         }
 
         KVStore *underlying(engine->getEpStore()->
@@ -187,11 +187,6 @@ bool BackFillVisitor::visitBucket(RCPtr<VBucket> &vb) {
                                           0, false);
         ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
     }
-    return false;
-}
-
-void BackFillVisitor::visit(StoredValue*) {
-    abort();
 }
 
 bool BackFillVisitor::pauseVisitor() {
index 66d8a8a..950f7c5 100644 (file)
@@ -88,19 +88,13 @@ public:
 
     virtual ~BackFillVisitor() {}
 
-    bool visitBucket(RCPtr<VBucket> &vb);
+    void visitBucket(RCPtr<VBucket> &vb) override;
 
-    void visit(StoredValue *v);
-
-    bool shouldContinue() {
-        return checkValidity();
-    }
-
-    void complete(void);
+    void complete(void) override;
 
 private:
 
-    bool pauseVisitor();
+    bool pauseVisitor() override;
 
     bool checkValidity();
 
index e771db4..52cd4be 100644 (file)
@@ -37,8 +37,7 @@ public:
         : store(s), stats(st), removed(0), taskStart(gethrtime()),
           wasHighMemoryUsage(s->isMemoryUsageTooHigh()), stateFinalizer(sfin) {}
 
-    bool visitBucket(RCPtr<VBucket> &vb) {
-        currentBucket = vb;
+    void visitBucket(RCPtr<VBucket> &vb) override {
         bool newCheckpointCreated = false;
         removed = vb->checkpointManager.removeClosedUnrefCheckpoints(vb,
                                                          newCheckpointCreated);
@@ -51,21 +50,17 @@ public:
                                         vb->getId(),
                                         vb->checkpointManager.getHighSeqno());
         }
-        update();
-        return false;
-    }
 
-    void update() {
         stats.itemsRemovedFromCheckpoints.fetch_add(removed);
         if (removed > 0) {
             LOG(EXTENSION_LOG_INFO,
                 "Removed %ld closed unreferenced checkpoints from VBucket %d",
-                removed, currentBucket->getId());
+                removed, vb->getId());
         }
         removed = 0;
     }
 
-    void complete() {
+    void complete() override {
         bool inverse = false;
         stateFinalizer.compare_exchange_strong(inverse, true);
 
index bd71b57..360d22d 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -3715,11 +3715,7 @@ void EventuallyPersistentStore::visit(VBucketVisitor &visitor)
     for (VBucketMap::id_type vbid = 0; vbid < vbMap.getSize(); ++vbid) {
         RCPtr<VBucket> vb = vbMap.getBucket(vbid);
         if (vb) {
-            bool wantData = visitor.visitBucket(vb);
-            // We could've lost this along the way.
-            if (wantData) {
-                vb->ht.visit(visitor);
-            }
+            visitor.visitBucket(vb);
         }
     }
     visitor.complete();
@@ -3779,9 +3775,7 @@ bool VBCBAdaptor::run(void) {
                 snooze(sleepTime);
                 return true;
             }
-            if (visitor->visitBucket(vb)) {
-                vb->ht.visit(*visitor);
-            }
+            visitor->visitBucket(vb);
         }
         vbList.pop();
     }
@@ -3818,9 +3812,7 @@ bool VBucketVisitorTask::run() {
                 snooze(sleepTime);
                 return true;
             }
-            if (visitor->visitBucket(vb)) {
-                vb->ht.visit(*visitor);
-            }
+            visitor->visitBucket(vb);
         }
         vbList.pop();
     }
index c45faa6..493bddc 100644 (file)
--- a/src/ep.h
+++ b/src/ep.h
@@ -32,35 +32,32 @@ class ExtendedMetaData;
 
 /**
  * vbucket-aware hashtable visitor.
+ *
+ * The caller (e.g. EventuallyPersistentStore::visit) will call visitBucket()
+ * for each valid VBucket, finally calling complete() after all vBuckets have
+ * been visited.
+ *
+ * Callers *may* call the pauseVisitor() method periodically (typically between
+ * vBuckets) which should return true if visiting VBuckets should be paused
+ * temporarily (typically to break up long-running visitation tasks to allow
+ * other Tasks to run).
  */
-class VBucketVisitor : public HashTableVisitor {
+class VBucketVisitor {
 public:
 
-    VBucketVisitor() : HashTableVisitor() { }
+    VBucketVisitor() { }
+
+    virtual ~VBucketVisitor() {}
 
-    VBucketVisitor(const VBucketFilter &filter) :
-        HashTableVisitor(), vBucketFilter(filter) { }
+    VBucketVisitor(const VBucketFilter &filter)
+        : vBucketFilter(filter) { }
 
     /**
      * Begin visiting a bucket.
      *
      * @param vb the vbucket we are beginning to visit
-     *
-     * @return true iff we want to walk the hashtable in this vbucket
      */
-    virtual bool visitBucket(RCPtr<VBucket> &vb) {
-        if (vBucketFilter(vb->getId())) {
-            currentBucket = vb;
-            return true;
-        }
-        return false;
-    }
-
-    // This is unused in all implementations so far.
-    void visit(StoredValue* v) {
-        (void)v;
-        abort();
-    }
+    virtual void visitBucket(RCPtr<VBucket> &vb) = 0;
 
     const VBucketFilter &getVBucketFilter() {
         return vBucketFilter;
@@ -80,7 +77,6 @@ public:
 
 protected:
     VBucketFilter vBucketFilter;
-    RCPtr<VBucket> currentBucket;
 };
 
 // Forward declaration
index 88f4369..d694e12 100644 (file)
@@ -3080,7 +3080,7 @@ void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
     ExecutorPool::get()->schedule(backfillTask, NONIO_TASK_IDX);
 }
 
-bool VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
+void VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
     ++numVbucket;
     item_eviction_policy_t policy = engine.getEpStore()->
                                                        getItemEvictionPolicy();
@@ -3131,18 +3131,14 @@ bool VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
         totalHLCDriftExceptionCounters.ahead += driftExceptionCounters.ahead;
         totalHLCDriftExceptionCounters.behind += driftExceptionCounters.behind;
     }
-
-    return false;
 }
 
-bool VBucketCountAggregator::visitBucket(RCPtr<VBucket> &vb) {
+void VBucketCountAggregator::visitBucket(RCPtr<VBucket> &vb) {
     std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
     it = visitorMap.find(vb->getState());
     if ( it != visitorMap.end() ) {
         it->second->visitBucket(vb);
     }
-
-    return false;
 }
 
 void VBucketCountAggregator::addVisitor(VBucketCountVisitor* visitor) {
@@ -3802,10 +3798,9 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doVBucketStats(
             isPrevState(isPrevStateRequested),
             isDetailsRequested(detailsRequested) {}
 
-        bool visitBucket(RCPtr<VBucket> &vb) {
+        void visitBucket(RCPtr<VBucket> &vb) override {
             addVBStats(cookie, add_stat, vb, eps, isPrevState,
                        isDetailsRequested);
-            return false;
         }
 
         static void addVBStats(const void *cookie, ADD_STAT add_stat,
@@ -3872,7 +3867,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doHashStats(const void *cookie,
         StatVBucketVisitor(const void *c, ADD_STAT a) : cookie(c),
                                                         add_stat(a) {}
 
-        bool visitBucket(RCPtr<VBucket> &vb) {
+        void visitBucket(RCPtr<VBucket> &vb) override {
             uint16_t vbid = vb->getId();
             char buf[32];
             try {
@@ -3918,7 +3913,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doHashStats(const void *cookie,
                     "StatVBucketVisitor::visitBucket: Failed to build stat: %s",
                     error.what());
             }
-            return false;
         }
 
         const void *cookie;
@@ -3936,9 +3930,8 @@ public:
     StatCheckpointVisitor(EventuallyPersistentStore * eps, const void *c,
                           ADD_STAT a) : epstore(eps), cookie(c), add_stat(a) {}
 
-    bool visitBucket(RCPtr<VBucket> &vb) {
+    void visitBucket(RCPtr<VBucket> &vb) override {
         addCheckpointStat(cookie, add_stat, epstore, vb);
-        return false;
     }
 
     static void addCheckpointStat(const void *cookie, ADD_STAT add_stat,
@@ -4403,10 +4396,11 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doAllFailoverLogStats(
     public:
         StatVBucketVisitor(const void *c, ADD_STAT a) :
             cookie(c), add_stat(a) {}
-        bool visitBucket(RCPtr<VBucket> &vb) {
+
+        void visitBucket(RCPtr<VBucket> &vb) override {
             vb->failovers->addStats(cookie, vb->getId(), add_stat);
-            return false;
         }
+
     private:
         const void *cookie;
         ADD_STAT add_stat;
@@ -4667,7 +4661,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doDiskStats(const void *cookie,
             : cookie(c), add_stat(a), detailed(d), fileSpaceUsed(0),
               fileSize(0) {}
 
-        bool visitBucket(RCPtr<VBucket> &vb) {
+        void visitBucket(RCPtr<VBucket> &vb) override {
             if (detailed) {
                 char buf[32];
                 uint16_t vbid = vb->getId();
@@ -4686,7 +4680,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doDiskStats(const void *cookie,
 
             fileSpaceUsed += vb->fileSpaceUsed;
             fileSize += vb->fileSize;
-            return false;
         }
 
         size_t getFileSize() {
index 629ef0b..6e9f6b3 100644 (file)
@@ -111,12 +111,7 @@ public:
         totalHLCDriftExceptionCounters()
     { }
 
-    bool visitBucket(RCPtr<VBucket> &vb);
-
-    void visit(StoredValue* v) {
-        throw std::logic_error("VBucketCountVisitor:visit: Should never "
-                "be called");
-    }
+    void visitBucket(RCPtr<VBucket> &vb) override;
 
     vbucket_state_t getVBucketState() { return desired_state; }
 
@@ -210,7 +205,7 @@ private:
  */
 class VBucketCountAggregator : public VBucketVisitor  {
 public:
-    bool visitBucket(RCPtr<VBucket> &vb);
+    void visitBucket(RCPtr<VBucket> &vb) override;
 
     void addVisitor(VBucketCountVisitor* visitor);
 private:
index 9711d29..408b92d 100644 (file)
@@ -28,14 +28,11 @@ static const double FREQUENCY(60.0);
  */
 class ResizingVisitor : public VBucketVisitor {
 public:
-
     ResizingVisitor() { }
 
-    bool visitBucket(RCPtr<VBucket> &vb) {
+    void visitBucket(RCPtr<VBucket> &vb) override {
         vb->ht.resize();
-        return false;
     }
-
 };
 
 bool HashtableResizerTask::run(void) {
index b5d31a3..5ec5f42 100644 (file)
@@ -40,7 +40,8 @@ enum pager_type_t {
  * As part of the ItemPager, visit all of the objects in memory and
  * eject some within a constrained probability
  */
-class PagingVisitor : public VBucketVisitor {
+class PagingVisitor : public VBucketVisitor,
+                      public HashTableVisitor {
 public:
 
     /**
@@ -67,7 +68,7 @@ public:
         wasHighMemoryUsage(s.isMemoryUsageTooHigh()),
         taskStart(gethrtime()), pager_phase(phase) {}
 
-    void visit(StoredValue *v) {
+    void visit(StoredValue *v) override {
         // Delete expired items for an active vbucket.
         bool isExpired = (currentBucket->getState() == vbucket_state_active) &&
             v->isExpired(startTime) && !v->isDeleted();
@@ -97,7 +98,7 @@ public:
         }
     }
 
-    bool visitBucket(RCPtr<VBucket> &vb) {
+    void visitBucket(RCPtr<VBucket> &vb) override {
         update();
 
         bool newCheckpointCreated = false;
@@ -116,7 +117,11 @@ public:
 
         // fast path for expiry item pager
         if (percent <= 0 || !pager_phase) {
-            return VBucketVisitor::visitBucket(vb);
+            if (vBucketFilter(vb->getId())) {
+                currentBucket = vb;
+                vb->ht.visit(*this);
+            }
+            return;
         }
 
         // skip active vbuckets if active resident ratio is lower than replica
@@ -125,18 +130,20 @@ public:
         double high = static_cast<double>(stats.mem_high_wat);
         if (vb->getState() == vbucket_state_active && current < high &&
             store.cachedResidentRatio.activeRatio <
-            store.cachedResidentRatio.replicaRatio)
-        {
-            return false;
+            store.cachedResidentRatio.replicaRatio) {
+            return;
         }
 
         if (current > lower) {
             double p = (current - static_cast<double>(lower)) / current;
             adjustPercent(p, vb->getState());
-            return VBucketVisitor::visitBucket(vb);
+            if (vBucketFilter(vb->getId())) {
+                currentBucket = vb;
+                vb->ht.visit(*this);
+            }
+
         } else { // stop eviction whenever memory usage is below low watermark
             completePhase = false;
-            return false;
         }
     }
 
@@ -156,12 +163,12 @@ public:
         expired.clear();
     }
 
-    bool pauseVisitor() {
+    bool pauseVisitor() override {
         size_t queueSize = stats.diskQueueSize.load();
         return canPause && queueSize >= MAX_PERSISTENCE_QUEUE_SIZE;
     }
 
-    void complete() {
+    void complete() override {
         update();
 
         hrtime_t elapsed_time = (gethrtime() - taskStart) / 1000;
@@ -240,6 +247,7 @@ private:
     bool wasHighMemoryUsage;
     hrtime_t taskStart;
     std::atomic<item_pager_phase>* pager_phase;
+    RCPtr<VBucket> currentBucket;
 };
 
 ItemPager::ItemPager(EventuallyPersistentEngine *e, EPStats &st) :
index 1a66a40..058babb 100644 (file)
@@ -323,25 +323,35 @@ void LoadStorageKVPairCallback::callback(GetValue &val) {
 }
 
 void LoadStorageKVPairCallback::purge() {
-    class EmergencyPurgeVisitor : public VBucketVisitor {
+    class EmergencyPurgeVisitor : public VBucketVisitor,
+                                  public HashTableVisitor {
     public:
         EmergencyPurgeVisitor(EventuallyPersistentStore& store) :
             epstore(store) {}
 
-        void visit(StoredValue *v) {
+        void visitBucket(RCPtr<VBucket> &vb) override {
+            if (vBucketFilter(vb->getId())) {
+                currentBucket = vb;
+                vb->ht.visit(*this);
+            }
+        }
+
+        void visit(StoredValue *v) override {
             currentBucket->ht.unlocked_ejectItem(v,
                                              epstore.getItemEvictionPolicy());
         }
+
     private:
         EventuallyPersistentStore& epstore;
-    };
+        RCPtr<VBucket> currentBucket;
+   };
 
     auto vbucketIds(vbuckets.getBuckets());
     EmergencyPurgeVisitor epv(epstore);
     for (auto vbid : vbucketIds) {
         RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
-        if (vb && epv.visitBucket(vb)) {
-            vb->ht.visit(epv);
+        if (vb) {
+            epv.visitBucket(vb);
         }
     }
     hasPurged = true;