"descr": "Age in seconds after which Ephemeral metadata is purged entirely from memory. Purging disabled if set to -1.",
"type": "ssize_t"
},
+ "ephemeral_metadata_purge_interval": {
+ "default": "60",
+ "descr": "Time in seconds between automatic, periodic runs of the Ephemeral metadata purge task. Periodic purging disabled if set to 0.",
+ "type": "size_t",
+ "requires": {
+ "bucket_type": "ephemeral"
+ }
+ },
"exp_pager_enabled": {
"default": "true",
"descr": "True if expiry pager task is enabled",
} else if (strcmp(keyz, "ephemeral_full_policy") == 0) {
getConfiguration().requirementsMetOrThrow("ephemeral_full_policy");
getConfiguration().setEphemeralFullPolicy(valz);
+ } else if (strcmp(keyz, "ephemeral_metadata_purge_interval") == 0) {
+ getConfiguration().requirementsMetOrThrow("ephemeral_metadata_purge_interval");
+ getConfiguration().setEphemeralMetadataPurgeInterval(
+ std::stoull(valz));
} else {
msg = "Unknown config param";
rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
#include "ep_engine.h"
#include "ep_types.h"
+#include "ephemeral_tombstone_purger.h"
#include "ephemeral_vb.h"
#include "ephemeral_vb_count_visitor.h"
#include "failover-table.h"
"unknown key '%s'",
key.c_str());
}
+ }
+ void sizeValueChanged(const std::string& key, size_t value) override {
+ if (key == "ephemeral_metadata_purge_interval") {
+ // Cancel and re-schedule the task to pick up the new interval.
+ bucket.enableTombstonePurgerTask();
+ } else {
+ LOG(EXTENSION_LOG_WARNING,
+ "EphemeralValueChangedListener: Failed to change value for "
+ "unknown key '%s'",
+ key.c_str());
+ }
}
private:
engine.getConfiguration().addValueChangedListener(
"ephemeral_full_policy", new EphemeralValueChangedListener(*this));
+ // Tombstone purger - scheduled periodically as long as we have a
+ // non-zero interval. Can be dynamically adjusted, so add config listeners.
+ tombstonePurgerTask = new EphTombstonePurgerTask(&engine, stats);
+ auto interval = config.getEphemeralMetadataPurgeInterval();
+ if (interval > 0) {
+ enableTombstonePurgerTask();
+ }
+ config.addValueChangedListener("ephemeral_metadata_purge_age",
+ new EphemeralValueChangedListener(*this));
+ config.addValueChangedListener("ephemeral_metadata_purge_interval",
+ new EphemeralValueChangedListener(*this));
+
return true;
}
}
void EphemeralBucket::enableTombstonePurgerTask() {
- // TODO
+ ExecutorPool::get()->cancel(tombstonePurgerTask->getId());
+ ExecutorPool::get()->schedule(tombstonePurgerTask);
}
void EphemeralBucket::disableTombstonePurgerTask() {
- // TODO
+ ExecutorPool::get()->cancel(tombstonePurgerTask->getId());
}
void EphemeralBucket::reconfigureForEphemeral(Configuration& config) {
VBucketCountVisitor& dead,
const void* cookie,
ADD_STAT add_stat) override;
+
+ // Protected member variables /////////////////////////////////////////////
+
+ /// Task responsible for purging in-memory tombstones.
+ ExTask tombstonePurgerTask;
};
vbucket.seqList->markItemStale(std::move(ownedSV));
++numPurgedItems;
}
+
+EphemeralVBucket::VBTombstonePurger::VBTombstonePurger(rel_time_t purgeAge)
+ : purgeAge(purgeAge), numPurgedItems(0) {
+}
+
+void EphemeralVBucket::VBTombstonePurger::visitBucket(RCPtr<VBucket>& vb) {
+ auto vbucket = dynamic_cast<EphemeralVBucket*>(vb.get());
+ if (!vbucket) {
+ throw std::invalid_argument(
+ "VBTombstonePurger::visitBucket: Called with a non-Ephemeral "
+ "bucket");
+ }
+ numPurgedItems += vbucket->purgeTombstones(purgeAge);
+}
+
+EphTombstonePurgerTask::EphTombstonePurgerTask(EventuallyPersistentEngine* e,
+ EPStats& stats_)
+ : GlobalTask(e, TaskId::EphTombstonePurgerTask, 0, false) {
+}
+
+bool EphTombstonePurgerTask::run() {
+ if (engine->getEpStats().isShutdown) {
+ return false;
+ }
+
+ LOG(EXTENSION_LOG_NOTICE,
+ "%s starting with purge age:%" PRIu64,
+ to_string(getDescription()).c_str(),
+ uint64_t(getDeletedPurgeAge()));
+
+ // Create a VB purger, and run across all VBuckets.
+ auto start = ProcessClock::now();
+ EphemeralVBucket::VBTombstonePurger purger(getDeletedPurgeAge());
+ engine->getKVBucket()->visit(purger);
+ auto end = ProcessClock::now();
+
+ auto duration_ms =
+ std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
+
+ LOG(EXTENSION_LOG_NOTICE,
+ "%s completed. Purged %" PRIu64 " items. Took %" PRIu64
+ "ms. Sleeping for %" PRIu64 " seconds.",
+ to_string(getDescription()).c_str(),
+ uint64_t(purger.getNumPurgedItems()),
+ uint64_t(duration_ms.count()),
+ uint64_t(getSleepTime()));
+
+ snooze(getSleepTime());
+ return true;
+}
+
+cb::const_char_buffer EphTombstonePurgerTask::getDescription() {
+ return "Ephemeral Tombstone Purger";
+}
+
+size_t EphTombstonePurgerTask::getSleepTime() const {
+ return engine->getConfiguration().getEphemeralMetadataPurgeInterval();
+}
+
+size_t EphTombstonePurgerTask::getDeletedPurgeAge() const {
+ return engine->getConfiguration().getEphemeralMetadataPurgeAge();
+}
/// Count of how many items have been purged.
size_t numPurgedItems;
};
+
+/**
+ * VBucket Tombstone Purger visitor
+ *
+ * Visitor which is responsible for removing deleted items from each VBucket.
+ * Mostly delegates to HTTombstonePurger for the 'real' work.
+ */
+class EphemeralVBucket::VBTombstonePurger : public VBucketVisitor {
+public:
+ VBTombstonePurger(rel_time_t purgeAge);
+
+ void visitBucket(RCPtr<VBucket>& vb) override;
+
+ size_t getNumPurgedItems() const {
+ return numPurgedItems;
+ }
+
+protected:
+ /// Items older than this age are purged.
+ const rel_time_t purgeAge;
+
+ /// Count of how many items have been purged for all visited vBuckets.
+ size_t numPurgedItems;
+};
+
+/**
+ * Task responsible for purging tombstones from an Ephemeral Bucket.
+ */
+class EphTombstonePurgerTask : public GlobalTask {
+public:
+ EphTombstonePurgerTask(EventuallyPersistentEngine* e, EPStats& stats_);
+
+ bool run() override;
+
+ cb::const_char_buffer getDescription() override;
+
+private:
+ /// Duration (in seconds) task should sleep for between runs.
+ size_t getSleepTime() const;
+
+ /// Age (in seconds) which deleted items will be purged after.
+ size_t getDeletedPurgeAge() const;
+};
public:
class CountVisitor;
class HTTombstonePurger;
+ class VBTombstonePurger;
EphemeralVBucket(id_type i,
vbucket_state_t newState,
TASK(ItemPagerVisitor, NONIO_TASK_IDX, 7)
TASK(ExpiredItemPagerVisitor, NONIO_TASK_IDX, 7)
TASK(DefragmenterTask, NONIO_TASK_IDX, 7)
+TASK(EphTombstonePurgerTask, NONIO_TASK_IDX, 7)
TASK(BackfillVisitorTask, NONIO_TASK_IDX, 8)
TASK(ConnManager, NONIO_TASK_IDX, 8)
TASK(WorkLoadMonitor, NONIO_TASK_IDX, 10)
auto& eng_stats = statsKeys.at("");
eng_stats.insert(eng_stats.end(),
{"ep_ephemeral_full_policy",
+ "ep_ephemeral_metadata_purge_interval",
"vb_active_auto_delete_count",
"vb_active_seqlist_count",
"vb_0:seqlist_stale_value_bytes"});
auto& config_stats = statsKeys.at("config");
- config_stats.insert(config_stats.end(), {"ep_ephemeral_full_policy"});
+ config_stats.insert(config_stats.end(),
+ std::initializer_list<std::string>{
+ "ep_ephemeral_full_policy",
+ "ep_ephemeral_metadata_purge_interval"});
}
if (isTapEnabled(h, h1)) {