MB-23795: Ephemeral Tombstone purging Task 85/76485/16
authorDave Rigby <daver@couchbase.com>
Fri, 7 Apr 2017 17:20:52 +0000 (18:20 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 13 Apr 2017 16:27:56 +0000 (16:27 +0000)
Expands on the previous patch to implement a Task which performs
Tombstone purging. This Task is scheduled periodically (see
ep_ephemeral_metadata_purge_interval), and when run it visits all
vBuckets and purges all applicable OSVs.

Task can be reconfigured dynamically via epctl parameters:

- ephemeral_metadata_purge_age: Age in seconds after which deleted
  items are purged.

- ephemeral_metadata_purge_interval: How often should Tombstone
  purging task run to check for items to be purged.

Example output:

    NOTICE (eph) Ephemeral Tombstone Purger starting with purge age:60
    NOTICE (eph) Ephemeral Tombstone Purger completed. Purged 39000 items. Took 104ms. Sleeping for 60 seconds.

Change-Id: I126c74f2e82c0a31a2843240303548a24af2e90f
Reviewed-on: http://review.couchbase.org/76485
Reviewed-by: Jim Walker <jim@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
configuration.json
src/ep_engine.cc
src/ephemeral_bucket.cc
src/ephemeral_bucket.h
src/ephemeral_tombstone_purger.cc
src/ephemeral_tombstone_purger.h
src/ephemeral_vb.h
src/tasks.def.h
tests/ep_testsuite.cc

index 39bf143..dd7b777 100644 (file)
             "descr": "Age in seconds after which Ephemeral metadata is purged entirely from memory. Purging disabled if set to -1.",
             "type": "ssize_t"
         },
+        "ephemeral_metadata_purge_interval": {
+            "default": "60",
+            "descr": "Time in seconds between automatic, periodic runs of the Ephemeral metadata purge task. Periodic purging disabled if set to 0.",
+            "type": "size_t",
+            "requires": {
+                "bucket_type": "ephemeral"
+            }
+        },
         "exp_pager_enabled": {
             "default": "true",
             "descr": "True if expiry pager task is enabled",
index a38d609..af448d6 100644 (file)
@@ -583,6 +583,10 @@ protocol_binary_response_status EventuallyPersistentEngine::setFlushParam(
         } else if (strcmp(keyz, "ephemeral_full_policy") == 0) {
             getConfiguration().requirementsMetOrThrow("ephemeral_full_policy");
             getConfiguration().setEphemeralFullPolicy(valz);
+        } else if (strcmp(keyz, "ephemeral_metadata_purge_interval") == 0) {
+            getConfiguration().requirementsMetOrThrow("ephemeral_metadata_purge_interval");
+            getConfiguration().setEphemeralMetadataPurgeInterval(
+                    std::stoull(valz));
         } else {
             msg = "Unknown config param";
             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
index e699bdc..fa0a6d6 100644 (file)
@@ -19,6 +19,7 @@
 
 #include "ep_engine.h"
 #include "ep_types.h"
+#include "ephemeral_tombstone_purger.h"
 #include "ephemeral_vb.h"
 #include "ephemeral_vb_count_visitor.h"
 #include "failover-table.h"
@@ -69,7 +70,18 @@ public:
                 "unknown key '%s'",
                 key.c_str());
         }
+    }
 
+    void sizeValueChanged(const std::string& key, size_t value) override {
+        if (key == "ephemeral_metadata_purge_interval") {
+            // Cancel and re-schedule the task to pick up the new interval.
+            bucket.enableTombstonePurgerTask();
+        } else {
+            LOG(EXTENSION_LOG_WARNING,
+                "EphemeralValueChangedListener: Failed to change value for "
+                "unknown key '%s'",
+                key.c_str());
+        }
     }
 
 private:
@@ -99,6 +111,18 @@ bool EphemeralBucket::initialize() {
     engine.getConfiguration().addValueChangedListener(
             "ephemeral_full_policy", new EphemeralValueChangedListener(*this));
 
+    // Tombstone purger - scheduled periodically as long as we have a
+    // non-zero interval. Can be dynamically adjusted, so add config listeners.
+    tombstonePurgerTask = new EphTombstonePurgerTask(&engine, stats);
+    auto interval = config.getEphemeralMetadataPurgeInterval();
+    if (interval > 0) {
+        enableTombstonePurgerTask();
+    }
+    config.addValueChangedListener("ephemeral_metadata_purge_age",
+                                   new EphemeralValueChangedListener(*this));
+    config.addValueChangedListener("ephemeral_metadata_purge_interval",
+                                   new EphemeralValueChangedListener(*this));
+
     return true;
 }
 
@@ -154,11 +178,12 @@ RollbackResult EphemeralBucket::doRollback(uint16_t vbid,
 }
 
 void EphemeralBucket::enableTombstonePurgerTask() {
-    // TODO
+    ExecutorPool::get()->cancel(tombstonePurgerTask->getId());
+    ExecutorPool::get()->schedule(tombstonePurgerTask);
 }
 
 void EphemeralBucket::disableTombstonePurgerTask() {
-    // TODO
+    ExecutorPool::get()->cancel(tombstonePurgerTask->getId());
 }
 
 void EphemeralBucket::reconfigureForEphemeral(Configuration& config) {
index 48fec35..221d41c 100644 (file)
@@ -125,4 +125,9 @@ protected:
                                       VBucketCountVisitor& dead,
                                       const void* cookie,
                                       ADD_STAT add_stat) override;
+
+    // Protected member variables /////////////////////////////////////////////
+
+    /// Task responsible for purging in-memory tombstones.
+    ExTask tombstonePurgerTask;
 };
index 7d9b75b..3c3bfd8 100644 (file)
@@ -49,3 +49,65 @@ void EphemeralVBucket::HTTombstonePurger::visit(
     vbucket.seqList->markItemStale(std::move(ownedSV));
     ++numPurgedItems;
 }
+
+EphemeralVBucket::VBTombstonePurger::VBTombstonePurger(rel_time_t purgeAge)
+    : purgeAge(purgeAge), numPurgedItems(0) {
+}
+
+void EphemeralVBucket::VBTombstonePurger::visitBucket(RCPtr<VBucket>& vb) {
+    auto vbucket = dynamic_cast<EphemeralVBucket*>(vb.get());
+    if (!vbucket) {
+        throw std::invalid_argument(
+                "VBTombstonePurger::visitBucket: Called with a non-Ephemeral "
+                "bucket");
+    }
+    numPurgedItems += vbucket->purgeTombstones(purgeAge);
+}
+
+EphTombstonePurgerTask::EphTombstonePurgerTask(EventuallyPersistentEngine* e,
+                                               EPStats& stats_)
+    : GlobalTask(e, TaskId::EphTombstonePurgerTask, 0, false) {
+}
+
+bool EphTombstonePurgerTask::run() {
+    if (engine->getEpStats().isShutdown) {
+        return false;
+    }
+
+    LOG(EXTENSION_LOG_NOTICE,
+        "%s starting with purge age:%" PRIu64,
+        to_string(getDescription()).c_str(),
+        uint64_t(getDeletedPurgeAge()));
+
+    // Create a VB purger, and run across all VBuckets.
+    auto start = ProcessClock::now();
+    EphemeralVBucket::VBTombstonePurger purger(getDeletedPurgeAge());
+    engine->getKVBucket()->visit(purger);
+    auto end = ProcessClock::now();
+
+    auto duration_ms =
+            std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
+
+    LOG(EXTENSION_LOG_NOTICE,
+        "%s completed. Purged %" PRIu64 " items. Took %" PRIu64
+        "ms. Sleeping for %" PRIu64 " seconds.",
+        to_string(getDescription()).c_str(),
+        uint64_t(purger.getNumPurgedItems()),
+        uint64_t(duration_ms.count()),
+        uint64_t(getSleepTime()));
+
+    snooze(getSleepTime());
+    return true;
+}
+
+cb::const_char_buffer EphTombstonePurgerTask::getDescription() {
+    return "Ephemeral Tombstone Purger";
+}
+
+size_t EphTombstonePurgerTask::getSleepTime() const {
+    return engine->getConfiguration().getEphemeralMetadataPurgeInterval();
+}
+
+size_t EphTombstonePurgerTask::getDeletedPurgeAge() const {
+    return engine->getConfiguration().getEphemeralMetadataPurgeAge();
+}
index cee0aff..cb1e36b 100644 (file)
@@ -58,3 +58,46 @@ protected:
     /// Count of how many items have been purged.
     size_t numPurgedItems;
 };
+
+/**
+ * VBucket Tombstone Purger visitor
+ *
+ * Visitor which is responsible for removing deleted items from each VBucket.
+ * Mostly delegates to HTTombstonePurger for the 'real' work.
+ */
+class EphemeralVBucket::VBTombstonePurger : public VBucketVisitor {
+public:
+    VBTombstonePurger(rel_time_t purgeAge);
+
+    void visitBucket(RCPtr<VBucket>& vb) override;
+
+    size_t getNumPurgedItems() const {
+        return numPurgedItems;
+    }
+
+protected:
+    /// Items older than this age are purged.
+    const rel_time_t purgeAge;
+
+    /// Count of how many items have been purged for all visited vBuckets.
+    size_t numPurgedItems;
+};
+
+/**
+ * Task responsible for purging tombstones from an Ephemeral Bucket.
+ */
+class EphTombstonePurgerTask : public GlobalTask {
+public:
+    EphTombstonePurgerTask(EventuallyPersistentEngine* e, EPStats& stats_);
+
+    bool run() override;
+
+    cb::const_char_buffer getDescription() override;
+
+private:
+    /// Duration (in seconds) task should sleep for between runs.
+    size_t getSleepTime() const;
+
+    /// Age (in seconds) which deleted items will be purged after.
+    size_t getDeletedPurgeAge() const;
+};
index b0c65d3..103f669 100644 (file)
@@ -27,6 +27,7 @@ class EphemeralVBucket : public VBucket {
 public:
     class CountVisitor;
     class HTTombstonePurger;
+    class VBTombstonePurger;
 
     EphemeralVBucket(id_type i,
                      vbucket_state_t newState,
index 1a89768..0ec1e2f 100644 (file)
@@ -73,6 +73,7 @@ TASK(ExpiredItemPager, NONIO_TASK_IDX, 7)
 TASK(ItemPagerVisitor, NONIO_TASK_IDX, 7)
 TASK(ExpiredItemPagerVisitor, NONIO_TASK_IDX, 7)
 TASK(DefragmenterTask, NONIO_TASK_IDX, 7)
+TASK(EphTombstonePurgerTask, NONIO_TASK_IDX, 7)
 TASK(BackfillVisitorTask, NONIO_TASK_IDX, 8)
 TASK(ConnManager, NONIO_TASK_IDX, 8)
 TASK(WorkLoadMonitor, NONIO_TASK_IDX, 10)
index 1f2aef8..d0f8618 100644 (file)
@@ -6887,6 +6887,7 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
         auto& eng_stats = statsKeys.at("");
         eng_stats.insert(eng_stats.end(),
                          {"ep_ephemeral_full_policy",
+                          "ep_ephemeral_metadata_purge_interval",
 
                           "vb_active_auto_delete_count",
                           "vb_active_seqlist_count",
@@ -6927,7 +6928,10 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
                            "vb_0:seqlist_stale_value_bytes"});
 
         auto& config_stats = statsKeys.at("config");
-        config_stats.insert(config_stats.end(), {"ep_ephemeral_full_policy"});
+        config_stats.insert(config_stats.end(),
+                            std::initializer_list<std::string>{
+                                    "ep_ephemeral_full_policy",
+                                    "ep_ephemeral_metadata_purge_interval"});
     }
 
     if (isTapEnabled(h, h1)) {