Add a number of new stats to the bucket stats and vbucket-details.
Add drift +/- threshold config parameters.
engine stats:
1. The maximum absolute cummulative-drift of the active vbuckets. See
vbucket-details total_abs_drift
ep_active_hlc_drift
2. The number of updates applied to ep_active_hlc_drift
ep_active_hlc_drift_count
3. The maximum absolute drift of the replica vbuckets
ep_replica_hlc_drift
4. The number of counts applied to ep_replica_hlc_drift
ep_replica_hlc_drift_count
5. The total number of times a setMaxCas was from a peer who is
ahead of the ahead threshold (hlc_ahead_threshold_us).
ep_active_ahead_exceptions
6. The total number of times a setMaxCas was from a peer who is
behind of the behind threshold (hlc_behind_threshold_us).
ep_active_behind_exceptions
7. The total number of times a setMaxCas was from a peer who is
ahead of the ahead threshold (hlc_ahead_threshold_us) for
replica VBs.
ep_replica_ahead_exceptions
8. The total number of times a setMaxCas was from a peer who is
behind of the behind threshold (hlc_behind_threshold_us) for
replica VBs.
ep_replica_behind_exceptions
vbucket-details stats:
1. The current max_hlc (reported as max_cas for compatibility with
other modules)
vb_n:max_cas
2. The vbucket's absolute cummulative drift
vb_n:total_abs_drift
3. How many updates have been applied to total_abs_drift
vb_n:total_abs_drift_count
4. How many times the ahead threshold has been exceeded.
vb_n:drift_ahead_threshold_exceeded
5. How many times the behind threshold has been exceeded.
vb_n:drift_behind_threshold_exceeded
6. How many logical clock ticks this vbucket's HLC has returned
vb_n:logical_clock_tick
Change-Id: I063782d4451b97f58a3c89208506bd8bd08b705e
Reviewed-on: http://review.couchbase.org/68158
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
"descr": "The maximum timeout for a getl lock in (s)",
"type": "size_t"
},
+ "hlc_ahead_threshold_us": {
+ "default": "5000000",
+ "descr": "The μs threshold at which we will increment a vbucket's ahead counter.",
+ "type": "size_t"
+ },
+ "hlc_behind_threshold_us": {
+ "default": "5000000",
+ "descr": "The μs threshold at which we will increment a vbucket's behind counter.",
+ "type": "size_t"
+ },
"ht_locks": {
"default": "47",
"type": "size_t"
FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
KVShard* shard = vbMap.getShardByVbId(vbid);
std::shared_ptr<Callback<uint16_t> > cb(new NotifyFlusherCB(shard));
+ Configuration& config = engine.getConfiguration();
RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
engine.getCheckpointConfig(),
- shard, 0, 0, 0, ft, cb));
- Configuration& config = engine.getConfiguration();
+ shard, 0, 0, 0, ft, cb,
+ config));
+
if (config.isBfilterEnabled()) {
// Initialize bloom filters upon vbucket creation during
// bucket creation and rebalance
break;
case WAS_DIRTY:
case WAS_CLEAN:
- vb->setMaxCas(v->getCas());
+ vb->setMaxCasAndTrackDrift(v->getCas());
queueDirty(vb, v, &lh, seqno,
genBySeqno ? GenerateBySeqno::Yes : GenerateBySeqno::No,
GenerateCas::No);
v->setBySeqno(bySeqno);
}
- vb->setMaxCas(v->getCas());
+ vb->setMaxCasAndTrackDrift(v->getCas());
if (tapBackfill) {
tapQueueDirty(vb, v, lh, seqno,
pendingWrites += vb->dirtyQueuePendingWrites;
rollbackItemCount += vb->getRollbackItemCount();
+
+ /*
+ * The bucket stat only reports the largest drift of the vbuckets.
+ */
+ auto driftStats = vb->getHLCDriftStats();
+ // If this vbucket's max is bigger than ours
+ if (driftStats.total > maxAbsHLCDrift.total) {
+ maxAbsHLCDrift = driftStats;
+ }
+
+ /*
+ * Total up the exceptions
+ */
+ auto driftExceptionCounters = vb->getHLCDriftExceptionCounters();
+ totalHLCDriftExceptionCounters.ahead += driftExceptionCounters.ahead;
+ totalHLCDriftExceptionCounters.behind += driftExceptionCounters.behind;
}
return false;
add_casted_stat("ep_io_compaction_write_bytes", value, add_stat, cookie);
}
+ // Add stats for tracking HLC drift
+ add_casted_stat("ep_active_hlc_drift",
+ activeCountVisitor.getMaxAbsHLCDrift().total, add_stat, cookie);
+ add_casted_stat("ep_active_hlc_drift_count",
+ activeCountVisitor.getMaxAbsHLCDrift().updates, add_stat, cookie);
+ add_casted_stat("ep_replica_hlc_drift",
+ replicaCountVisitor.getMaxAbsHLCDrift().total, add_stat, cookie);
+ add_casted_stat("ep_replica_hlc_drift_count",
+ replicaCountVisitor.getMaxAbsHLCDrift().updates, add_stat, cookie);
+
+ add_casted_stat("ep_active_ahead_exceptions",
+ activeCountVisitor.getTotalHLCDriftExceptionCounters().ahead,
+ add_stat, cookie);
+ add_casted_stat("ep_active_behind_exceptions",
+ activeCountVisitor.getTotalHLCDriftExceptionCounters().behind,
+ add_stat, cookie);
+ add_casted_stat("ep_replica_ahead_exceptions",
+ replicaCountVisitor.getTotalHLCDriftExceptionCounters().ahead,
+ add_stat, cookie);
+ add_casted_stat("ep_replica_behind_exceptions",
+ replicaCountVisitor.getTotalHLCDriftExceptionCounters().behind,
+ add_stat, cookie);
+
return ENGINE_SUCCESS;
}
queueFill(0), queueDrain(0),
pendingWrites(0), chkPersistRemaining(0),
fileSpaceUsed(0), fileSize(0),
- rollbackItemCount(0)
+ rollbackItemCount(0), maxAbsHLCDrift(),
+ totalHLCDriftExceptionCounters()
{ }
bool visitBucket(RCPtr<VBucket> &vb);
uint64_t getRollbackItemCount() { return rollbackItemCount; }
+ HLC::DriftStats getMaxAbsHLCDrift() {return maxAbsHLCDrift;}
+ HLC::DriftExceptions getTotalHLCDriftExceptionCounters() {return totalHLCDriftExceptionCounters;}
+
private:
EventuallyPersistentEngine &engine;
vbucket_state_t desired_state;
size_t fileSize;
uint64_t rollbackItemCount;
+ HLC::DriftStats maxAbsHLCDrift;
+ HLC::DriftExceptions totalHLCDriftExceptionCounters;
};
/**
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2016 Couchbase, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+
+#include "atomic.h"
+#include "statwriter.h"
+
+/*
+ * HLC manages a hybrid logical clock for 'time' stamping events.
+ * The class only implements the send logic of the HLC algorithm.
+ * - http://www.cse.buffalo.edu/tech-reports/2014-04.pdf
+ *
+ * The class allows multiple threads to concurrently call
+ * - nextHLC
+ * - setMaxCas
+ * - setMaxHLCAndTrackDrift
+ *
+ * The paired "drift" counter, which is used to monitor drift over time
+ * isn't atomic in that the total and update parts are not a consistent
+ * snapshot.
+ */
+class HLC {
+public:
+ struct DriftStats {
+ uint64_t total;
+ uint64_t updates;
+ };
+
+ struct DriftExceptions {
+ uint32_t ahead;
+ uint32_t behind;
+ };
+
+ /*
+ * @param initHLC a HLC value to start from
+ * @param aheadThresholdAhead threshold a peer can be ahead before we
+ * increment driftAheadExceeded
+ * @param behindThresholdhreshold a peer can be ahead before we
+ * increment driftBehindExceeded
+ */
+ HLC(uint64_t initHLC, uint64_t aheadThreshold, uint64_t behindThreshold)
+ : maxHLC(initHLC),
+ cummulativeDrift(0),
+ cummulativeDriftIncrements(0),
+ logicalClockTicks(0),
+ driftAheadExceeded(0),
+ driftBehindExceeded(0),
+ driftAheadThreshold(aheadThreshold),
+ driftBehindThreshold(behindThreshold) {}
+
+ uint64_t nextHLC() {
+ // Create a monotonic timestamp using part of the HLC algorithm by.
+ // a) Reading system time
+ // b) dropping 16-bits (done by nowHLC)
+ // c) comparing it with the last known time (max_cas)
+ // d) returning either now or max_cas + 1
+ uint64_t timeNow = getMasked48(getTime());
+ uint64_t l = maxHLC.load();
+
+ if (timeNow > l) {
+ atomic_setIfBigger(maxHLC, timeNow);
+ return timeNow;
+ }
+ logicalClockTicks++;
+ atomic_setIfBigger(maxHLC, l + 1);
+ return l + 1;
+ }
+
+ void setMaxHLCAndTrackDrift(uint64_t hlc) {
+ auto timeNow = getMasked48(getTime());
+
+ // Track the +/- difference between our time and their time
+ int64_t difference = getMasked48(hlc) - timeNow;
+
+ // Accumulate the absolute drift
+ cummulativeDrift += std::abs(difference);
+ cummulativeDriftIncrements++;
+
+ // If the difference is greater, count peer ahead exeception
+ // If the difference is less than our -ve threshold.. count that
+ if (difference > int64_t(driftAheadThreshold)) {
+ driftAheadExceeded++;
+ } else if(difference < (0 - int64_t(driftBehindThreshold))) {
+ driftBehindExceeded++;
+ }
+
+ setMaxHLC(hlc);
+ }
+
+ void setMaxHLC(uint64_t hlc) {
+ atomic_setIfBigger(maxHLC, hlc);
+ }
+
+ uint64_t getMaxHLC() const {
+ return maxHLC;
+ }
+
+ DriftStats getDriftStats() const {
+ // Deliberately not locking to read this pair
+ return {cummulativeDrift, cummulativeDriftIncrements};
+ }
+
+ DriftExceptions getDriftExceptionCounters() const {
+ // Deliberately not locking to read this pair
+ return {driftAheadExceeded, driftBehindExceeded};
+ }
+
+ void addStats(const std::string& prefix, ADD_STAT add_stat, const void *c) const {
+ add_prefixed_stat(prefix.data(), "max_cas", getMaxHLC(), add_stat, c);
+ add_prefixed_stat(prefix.data(), "total_abs_drift", cummulativeDrift.load(), add_stat, c);
+ add_prefixed_stat(prefix.data(), "total_abs_drift_count", cummulativeDriftIncrements.load(), add_stat, c);
+ add_prefixed_stat(prefix.data(), "drift_ahead_threshold_exceeded", driftAheadExceeded.load(), add_stat, c);
+ add_prefixed_stat(prefix.data(), "drift_ahead_threshold", driftAheadThreshold.load(), add_stat, c);
+ add_prefixed_stat(prefix.data(), "drift_behind_threshold_exceeded", driftBehindExceeded.load(), add_stat, c);
+ add_prefixed_stat(prefix.data(), "drift_behind_threshold", driftBehindThreshold.load(), add_stat, c);
+ add_prefixed_stat(prefix.data(), "logical_clock_ticks", logicalClockTicks.load(), add_stat, c);
+ }
+
+private:
+ /*
+ * Returns 48-bit of t (bottom 16-bit zero)
+ */
+ static int64_t getMasked48(int64_t t) {
+ return t & ~((1<<16)-1);
+ }
+
+ static int64_t getTime() {
+ auto now = std::chrono::system_clock::now();
+ return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
+ }
+
+ /*
+ * maxHLC tracks the current highest time, either our own or a peer who
+ * has a larger clock value. nextHLC and setMax* methods change this and can
+ * be called from different threads
+ */
+ std::atomic<uint64_t> maxHLC;
+
+ /*
+ * The following are used for stats/drift tracking.
+ * many threads could be setting cas so they need to be atomically
+ * updated for consisent totals.
+ */
+ std::atomic<uint64_t> cummulativeDrift;
+ std::atomic<uint64_t> cummulativeDriftIncrements;
+ std::atomic<uint64_t> logicalClockTicks;
+ std::atomic<uint32_t> driftAheadExceeded;
+ std::atomic<uint32_t> driftBehindExceeded;
+ std::atomic<uint64_t> driftAheadThreshold;
+ std::atomic<uint64_t> driftBehindThreshold;
+};
\ No newline at end of file
template <typename T>
void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
const void *c) {
- std::stringstream name;
- name << "vb_" << id;
+ std::string stat = statPrefix;
if (nm != NULL) {
- name << ":" << nm;
+ add_prefixed_stat(statPrefix, nm, val, add_stat, c);
+ } else {
+ add_casted_stat(statPrefix.data(), val, add_stat, c);
}
- std::stringstream value;
- value << val;
- std::string n = name.str();
- add_casted_stat(n.data(), value.str().data(), add_stat, c);
}
size_t VBucket::queueBGFetchItem(const std::string &key,
}
}
-uint64_t VBucket::nextHLCCas() {
- // Create a monotonic timestamp using part of the HLC algorithm by.
- // a) Reading system time (now)
- // b) dropping 16-bits (masking 65355 µs)
- // c) comparing it with the last known time (max_cas)
- // d) returning either now or max_cas + 1
- auto now = std::chrono::duration_cast<std::chrono::microseconds>
- (std::chrono::system_clock::now().time_since_epoch()).count();
-
- uint64_t now48bits = ((uint64_t)now) & ~((1 << 16) - 1);
- uint64_t local_max_cas = max_cas.load();
-
- if (now48bits > local_max_cas) {
- atomic_setIfBigger(max_cas, now48bits);
- return now48bits;
- }
-
- atomic_setIfBigger(max_cas, local_max_cas + 1);
- return local_max_cas + 1;
-}
-
void VBucket::addStats(bool details, ADD_STAT add_stat, const void *c,
item_eviction_policy_t policy) {
addStat(NULL, toString(state), add_stat, c);
add_stat, c);
addStat("ht_memory", ht.memorySize(), add_stat, c);
addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
- addStat("ht_cache_size", ht.cacheSize, add_stat, c);
+ addStat("ht_cache_size", ht.cacheSize.load(), add_stat, c);
addStat("num_ejects", ht.getNumEjects(), add_stat, c);
- addStat("ops_create", opsCreate, add_stat, c);
- addStat("ops_update", opsUpdate, add_stat, c);
- addStat("ops_delete", opsDelete, add_stat, c);
- addStat("ops_reject", opsReject, add_stat, c);
- addStat("queue_size", dirtyQueueSize, add_stat, c);
- addStat("queue_memory", dirtyQueueMem, add_stat, c);
- addStat("queue_fill", dirtyQueueFill, add_stat, c);
- addStat("queue_drain", dirtyQueueDrain, add_stat, c);
+ addStat("ops_create", opsCreate.load(), add_stat, c);
+ addStat("ops_update", opsUpdate.load(), add_stat, c);
+ addStat("ops_delete", opsDelete.load(), add_stat, c);
+ addStat("ops_reject", opsReject.load(), add_stat, c);
+ addStat("queue_size", dirtyQueueSize.load(), add_stat, c);
+ addStat("queue_memory", dirtyQueueMem.load(), add_stat, c);
+ addStat("queue_fill", dirtyQueueFill.load(), add_stat, c);
+ addStat("queue_drain", dirtyQueueDrain.load(), add_stat, c);
addStat("queue_age", getQueueAge(), add_stat, c);
- addStat("pending_writes", dirtyQueuePendingWrites, add_stat, c);
- addStat("db_data_size", fileSpaceUsed, add_stat, c);
- addStat("db_file_size", fileSize, add_stat, c);
+ addStat("pending_writes", dirtyQueuePendingWrites.load(), add_stat, c);
+ addStat("db_data_size", fileSpaceUsed.load(), add_stat, c);
+ addStat("db_file_size", fileSize.load(), add_stat, c);
addStat("high_seqno", getHighSeqno(), add_stat, c);
addStat("uuid", failovers->getLatestUUID(), add_stat, c);
addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
add_stat, c);
addStat("bloom_filter_size", getFilterSize(), add_stat, c);
addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
- addStat("max_cas", getMaxCas(), add_stat, c);
addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
+ hlc.addStats(statPrefix, add_stat, c);
}
}
#include "checkpoint.h"
#include "ep_types.h"
#include "failover-table.h"
+#include "hlc.h"
#include "kvstore.h"
#include "stored-value.h"
#include "utility.h"
int64_t lastSeqno, uint64_t lastSnapStart,
uint64_t lastSnapEnd, FailoverTable *table,
std::shared_ptr<Callback<id_type> > cb,
+ Configuration& config,
vbucket_state_t initState = vbucket_state_dead,
uint64_t chkId = 1, uint64_t purgeSeqno = 0,
uint64_t maxCas = 0):
initialState(initState),
stats(st),
purge_seqno(purgeSeqno),
- max_cas(maxCas),
takeover_backed_up(false),
persisted_snapshot_start(lastSnapStart),
persisted_snapshot_end(lastSnapEnd),
shard(kvshard),
bFilter(NULL),
tempFilter(NULL),
- rollbackItemCount(0)
+ rollbackItemCount(0),
+ hlc(maxCas,
+ config.getHlcAheadThresholdUs(),
+ config.getHlcBehindThresholdUs()),
+ statPrefix("vb_" + std::to_string(i))
{
backfill.isBackfillPhase = false;
pendingOpsStart = 0;
id, VBucket::toString(state), VBucket::toString(initialState),
lastSeqno, lastSnapStart, lastSnapEnd,
persisted_snapshot_start, persisted_snapshot_end,
- max_cas.load());
+ getMaxCas());
}
~VBucket();
}
uint64_t getMaxCas() {
- return max_cas;
+ return hlc.getMaxHLC();
}
void setMaxCas(uint64_t cas) {
- atomic_setIfBigger(max_cas, cas);
+ hlc.setMaxHLC(cas);
+ }
+
+ void setMaxCasAndTrackDrift(uint64_t cas) {
+ hlc.setMaxHLCAndTrackDrift(cas);
+ }
+
+ HLC::DriftStats getHLCDriftStats() const {
+ return hlc.getDriftStats();
+ }
+
+ HLC::DriftExceptions getHLCDriftExceptionCounters() const {
+ return hlc.getDriftExceptionCounters();
}
bool isTakeoverBackedUp() {
size_t getFilterSize();
size_t getNumOfKeysInFilter();
- uint64_t nextHLCCas();
+ uint64_t nextHLCCas() {
+ return hlc.nextHLC();
+ }
// Applicable only for FULL EVICTION POLICY
bool isResidentRatioUnderThreshold(float threshold,
EPStats &stats;
uint64_t purge_seqno;
- AtomicValue<uint64_t> max_cas;
AtomicValue<bool> takeover_backed_up;
Mutex pendingBGFetchesLock;
AtomicValue<uint64_t> rollbackItemCount;
+ HLC hlc;
+ std::string statPrefix;
+
static size_t chkFlushTimeout;
DISALLOW_COPY_AND_ASSIGN(VBucket);
store.getEPEngine().getEpStats(),
store.getEPEngine().getCheckpointConfig(),
shard, vbs.highSeqno, vbs.lastSnapStart,
- vbs.lastSnapEnd, table, cb, vbs.state, 1,
- vbs.purgeSeqno, vbs.maxCas));
+ vbs.lastSnapEnd, table, cb, config,
+ vbs.state, 1, vbs.purgeSeqno, vbs.maxCas));
if(vbs.state == vbucket_state_active && !cleanShutdown) {
if (static_cast<uint64_t>(vbs.highSeqno) == vbs.lastSnapEnd) {
#include <string>
#include <thread>
#include <unordered_map>
+#include <unordered_set>
#include <vector>
#include "atomic.h"
"vb_0:bloom_filter_size",
"vb_0:db_data_size",
"vb_0:db_file_size",
+ "vb_0:drift_ahead_threshold",
+ "vb_0:drift_ahead_threshold_exceeded",
+ "vb_0:drift_behind_threshold",
+ "vb_0:drift_behind_threshold_exceeded",
"vb_0:high_seqno",
"vb_0:ht_cache_size",
"vb_0:ht_item_memory",
"vb_0:ht_memory",
+ "vb_0:logical_clock_ticks",
"vb_0:max_cas",
"vb_0:num_ejects",
"vb_0:num_items",
"vb_0:queue_memory",
"vb_0:queue_size",
"vb_0:rollback_item_count",
+ "vb_0:total_abs_drift",
+ "vb_0:total_abs_drift_count",
"vb_0:uuid"
}
},
"ep_flushall_enabled",
"ep_getl_default_timeout",
"ep_getl_max_timeout",
+ "ep_hlc_ahead_threshold_us",
+ "ep_hlc_behind_threshold_us",
"ep_ht_locks",
"ep_ht_size",
"ep_initfile",
"vb_0:data_size",
"vb_0:file_size"
}
+ },
+ {"", // Note: we convert empty to a null to get engine stats
+ {
+ "bytes",
+ "curr_items",
+ "curr_items_tot",
+ "curr_temp_items",
+ "ep_access_scanner_enabled",
+ "ep_access_scanner_last_runtime",
+ "ep_access_scanner_num_items",
+ "ep_access_scanner_task_time",
+ "ep_active_ahead_exceptions",
+ "ep_active_behind_exceptions",
+ "ep_active_hlc_drift",
+ "ep_active_hlc_drift_count",
+ "ep_alog_block_size",
+ "ep_alog_path",
+ "ep_alog_resident_ratio_threshold",
+ "ep_alog_sleep_time",
+ "ep_alog_task_time",
+ "ep_backend",
+ "ep_backfill_mem_threshold",
+ "ep_bfilter_enabled",
+ "ep_bfilter_fp_prob",
+ "ep_bfilter_key_count",
+ "ep_bfilter_residency_threshold",
+ "ep_bg_fetch_delay",
+ "ep_bg_fetched",
+ "ep_bg_meta_fetched",
+ "ep_bg_remaining_jobs",
+ "ep_blob_num",
+ "ep_blob_overhead",
+ "ep_bucket_priority",
+ "ep_chk_max_items",
+ "ep_chk_period",
+ "ep_chk_persistence_remains",
+ "ep_chk_persistence_timeout",
+ "ep_chk_remover_stime",
+ "ep_commit_num",
+ "ep_commit_time",
+ "ep_commit_time_total",
+ "ep_compaction_exp_mem_threshold",
+ "ep_compaction_write_queue_cap",
+ "ep_config_file",
+ "ep_conflict_resolution_type",
+ "ep_couch_bucket",
+ "ep_cursor_dropping_lower_mark",
+ "ep_cursor_dropping_lower_threshold",
+ "ep_cursor_dropping_upper_mark",
+ "ep_cursor_dropping_upper_threshold",
+ "ep_cursors_dropped",
+ "ep_data_traffic_enabled",
+ "ep_db_data_size",
+ "ep_db_file_size",
+ "ep_dbname",
+ "ep_dcp_backfill_byte_limit",
+ "ep_dcp_conn_buffer_size",
+ "ep_dcp_conn_buffer_size_aggr_mem_threshold",
+ "ep_dcp_conn_buffer_size_aggressive_perc",
+ "ep_dcp_conn_buffer_size_max",
+ "ep_dcp_conn_buffer_size_perc",
+ "ep_dcp_consumer_process_buffered_messages_batch_size",
+ "ep_dcp_consumer_process_buffered_messages_yield_limit",
+ "ep_dcp_enable_noop",
+ "ep_dcp_flow_control_policy",
+ "ep_dcp_max_unacked_bytes",
+ "ep_dcp_min_compression_ratio",
+ "ep_dcp_noop_interval",
+ "ep_dcp_producer_snapshot_marker_yield_limit",
+ "ep_dcp_scan_byte_limit",
+ "ep_dcp_scan_item_limit",
+ "ep_dcp_takeover_max_time",
+ "ep_dcp_value_compression_enabled",
+ "ep_defragmenter_age_threshold",
+ "ep_defragmenter_chunk_duration",
+ "ep_defragmenter_enabled",
+ "ep_defragmenter_interval",
+ "ep_defragmenter_num_moved",
+ "ep_defragmenter_num_visited",
+ "ep_degraded_mode",
+ "ep_diskqueue_drain",
+ "ep_diskqueue_fill",
+ "ep_diskqueue_items",
+ "ep_diskqueue_memory",
+ "ep_diskqueue_pending",
+ "ep_enable_chk_merge",
+ "ep_exp_pager_enabled",
+ "ep_exp_pager_initial_run_time",
+ "ep_exp_pager_stime",
+ "ep_expired_access",
+ "ep_expired_compactor",
+ "ep_expired_pager",
+ "ep_expiry_pager_task_time",
+ "ep_failpartialwarmup",
+ "ep_flush_all",
+ "ep_flush_duration_total",
+ "ep_flushall_enabled",
+ "ep_flusher_state",
+ "ep_flusher_todo",
+ "ep_getl_default_timeout",
+ "ep_getl_max_timeout",
+ "ep_hlc_ahead_threshold_us",
+ "ep_hlc_behind_threshold_us",
+ "ep_ht_locks",
+ "ep_ht_size",
+ "ep_initfile",
+ "ep_io_compaction_read_bytes",
+ "ep_io_compaction_write_bytes",
+ "ep_io_total_read_bytes",
+ "ep_io_total_write_bytes",
+ "ep_item_begin_failed",
+ "ep_item_commit_failed",
+ "ep_item_eviction_policy",
+ "ep_item_flush_expired",
+ "ep_item_flush_failed",
+ "ep_item_num",
+ "ep_item_num_based_new_chk",
+ "ep_items_rm_from_checkpoints",
+ "ep_keep_closed_chks",
+ "ep_kv_size",
+ "ep_max_bg_remaining_jobs",
+ "ep_max_checkpoints",
+ "ep_max_failover_entries",
+ "ep_max_item_size",
+ "ep_max_num_auxio",
+ "ep_max_num_nonio",
+ "ep_max_num_readers",
+ "ep_max_num_shards",
+ "ep_max_num_workers",
+ "ep_max_num_writers",
+ "ep_max_size",
+ "ep_max_threads",
+ "ep_max_vbuckets",
+ "ep_mem_high_wat",
+ "ep_mem_high_wat_percent",
+ "ep_mem_low_wat",
+ "ep_mem_low_wat_percent",
+ "ep_mem_tracker_enabled",
+ "ep_meta_data_disk",
+ "ep_meta_data_memory",
+ "ep_mlog_compactor_runs",
+ "ep_mutation_mem_threshold",
+ "ep_num_access_scanner_runs",
+ "ep_num_access_scanner_skips",
+ "ep_num_eject_failures",
+ "ep_num_expiry_pager_runs",
+ "ep_num_non_resident",
+ "ep_num_not_my_vbuckets",
+ "ep_num_ops_del_meta",
+ "ep_num_ops_del_meta_res_fail",
+ "ep_num_ops_del_ret_meta",
+ "ep_num_ops_get_meta",
+ "ep_num_ops_get_meta_on_set_meta",
+ "ep_num_ops_set_meta",
+ "ep_num_ops_set_meta_res_fail",
+ "ep_num_ops_set_ret_meta",
+ "ep_num_pager_runs",
+ "ep_num_value_ejects",
+ "ep_num_workers",
+ "ep_oom_errors",
+ "ep_overhead",
+ "ep_pager_active_vb_pcnt",
+ "ep_pending_compactions",
+ "ep_pending_ops",
+ "ep_pending_ops_max",
+ "ep_pending_ops_max_duration",
+ "ep_pending_ops_total",
+ "ep_persist_vbstate_total",
+ "ep_postInitfile",
+ "ep_queue_size",
+ "ep_replica_ahead_exceptions",
+ "ep_replica_behind_exceptions",
+ "ep_replica_hlc_drift",
+ "ep_replica_hlc_drift_count",
+ "ep_replication_throttle_cap_pcnt",
+ "ep_replication_throttle_queue_cap",
+ "ep_replication_throttle_threshold",
+ "ep_rollback_count",
+ "ep_startup_time",
+ "ep_storage_age",
+ "ep_storage_age_highwat",
+ "ep_storedval_num",
+ "ep_storedval_overhead",
+ "ep_storedval_size",
+ "ep_tap_ack_grace_period",
+ "ep_tap_ack_initial_sequence_number",
+ "ep_tap_ack_interval",
+ "ep_tap_ack_window_size",
+ "ep_tap_backfill_resident",
+ "ep_tap_backlog_limit",
+ "ep_tap_backoff_period",
+ "ep_tap_bg_fetch_requeued",
+ "ep_tap_bg_fetched",
+ "ep_tap_bg_max_pending",
+ "ep_tap_keepalive",
+ "ep_tap_noop_interval",
+ "ep_tap_requeue_sleep_time",
+ "ep_time_synchronization",
+ "ep_tmp_oom_errors",
+ "ep_total_cache_size",
+ "ep_total_del_items",
+ "ep_total_enqueued",
+ "ep_total_new_items",
+ "ep_total_persisted",
+ "ep_uncommitted_items",
+ "ep_uuid",
+ "ep_value_size",
+ "ep_vb0",
+ "ep_vb_snapshot_total",
+ "ep_vb_total",
+ "ep_vbucket_del",
+ "ep_vbucket_del_fail",
+ "ep_version",
+ "ep_waitforwarmup",
+ "ep_warmup",
+ "ep_warmup_batch_size",
+ "ep_warmup_dups",
+ "ep_warmup_min_items_threshold",
+ "ep_warmup_min_memory_threshold",
+ "ep_warmup_oom",
+ "ep_warmup_thread",
+ "ep_warmup_time",
+ "ep_workload_pattern",
+ "mem_used",
+ "rollback_item_count",
+ "vb_active_curr_items",
+ "vb_active_eject",
+ "vb_active_expired",
+ "vb_active_ht_memory",
+ "vb_active_itm_memory",
+ "vb_active_meta_data_disk",
+ "vb_active_meta_data_memory",
+ "vb_active_num",
+ "vb_active_num_non_resident",
+ "vb_active_ops_create",
+ "vb_active_ops_delete",
+ "vb_active_ops_reject",
+ "vb_active_ops_update",
+ "vb_active_perc_mem_resident",
+ "vb_active_queue_age",
+ "vb_active_queue_drain",
+ "vb_active_queue_fill",
+ "vb_active_queue_memory",
+ "vb_active_queue_pending",
+ "vb_active_queue_size",
+ "vb_active_rollback_item_count",
+ "vb_dead_num",
+ "vb_pending_curr_items",
+ "vb_pending_eject",
+ "vb_pending_expired",
+ "vb_pending_ht_memory",
+ "vb_pending_itm_memory",
+ "vb_pending_meta_data_disk",
+ "vb_pending_meta_data_memory",
+ "vb_pending_num",
+ "vb_pending_num_non_resident",
+ "vb_pending_ops_create",
+ "vb_pending_ops_delete",
+ "vb_pending_ops_reject",
+ "vb_pending_ops_update",
+ "vb_pending_perc_mem_resident",
+ "vb_pending_queue_age",
+ "vb_pending_queue_drain",
+ "vb_pending_queue_fill",
+ "vb_pending_queue_memory",
+ "vb_pending_queue_pending",
+ "vb_pending_queue_size",
+ "vb_pending_rollback_item_count",
+ "vb_replica_curr_items",
+ "vb_replica_eject",
+ "vb_replica_expired",
+ "vb_replica_ht_memory",
+ "vb_replica_itm_memory",
+ "vb_replica_meta_data_disk",
+ "vb_replica_meta_data_memory",
+ "vb_replica_num",
+ "vb_replica_num_non_resident",
+ "vb_replica_ops_create",
+ "vb_replica_ops_delete",
+ "vb_replica_ops_reject",
+ "vb_replica_ops_update",
+ "vb_replica_perc_mem_resident",
+ "vb_replica_queue_age",
+ "vb_replica_queue_drain",
+ "vb_replica_queue_fill",
+ "vb_replica_queue_memory",
+ "vb_replica_queue_pending",
+ "vb_replica_queue_size",
+ "vb_replica_rollback_item_count"
+ }
}
};
bool error = false;
for (const auto& entry : statsKeys) {
+
vals.clear();
checkeq(ENGINE_SUCCESS,
- h1->get_stats(h, nullptr, entry.first.data(),
+ h1->get_stats(h, nullptr, entry.first.empty() ?
+ nullptr : entry.first.data(),
entry.first.size(), add_stats),
(std::string("Failed to get stats: ") + entry.first).c_str());
+ std::unordered_set<std::string> accountedFor;
for (const auto& key : entry.second) {
auto iter = vals.find(key);
if (iter == vals.end()) {
fprintf(stderr, "Missing stat: %s from stat group %s\n",
key.c_str(),
entry.first.c_str());
+ } else {
+ accountedFor.insert(key);
}
}
if (entry.second.size() != vals.size()) {
fprintf(stderr,
- "Incorrect number of stats returned for stat group %s: %lu != %lu\n",
+ "Incorrect number of stats returned for stat group %s: %lu != %lu\n"
+ " Unaccounted for stat keys:\n",
entry.first.c_str(), (unsigned long)entry.second.size(),
(unsigned long)vals.size());
error = true;
+ for (const auto& statPair : vals) {
+ if (accountedFor.count(statPair.first) == 0) {
+ fprintf(stderr, " \"%s\",\n", statPair.first.c_str());
+ }
+ }
}
}
#include "ep_test_apis.h"
#include "ep_testsuite_common.h"
+#include "hlc.h"
#include <platform/cb_malloc.h>
return SUCCESS;
}
+static enum test_result test_set_with_meta_and_check_drift_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+ // Activate n vbuckets (vb 0 is already)
+ const int n_vbuckets = 10;
+ for (int ii = 1; ii < n_vbuckets; ii++) {
+ check(set_vbucket_state(h, h1, ii, vbucket_state_active),
+ "Failed to set vbucket state.");
+ }
+
+ // Let's make vbucket n/2 be the one who is ahead, n/3 is behind
+ const int aheadVb = n_vbuckets/2;
+ const int behindVb = n_vbuckets/3;
+ checkne(aheadVb, behindVb, "Cannot have the same VB as ahead/behind");
+
+ HLC hlc(0/*init HLC*/, 0/*ahead threshold*/, 0/*behind threshold*/);
+
+ // grab the drift behind threshold
+ uint64_t driftBehindThreshold = get_ull_stat(h, h1,
+ "ep_hlc_ahead_threshold_us",
+ nullptr);
+ // Create n keys
+ const int n_keys = 5;
+ for (int ii = 0 ; ii < n_vbuckets; ii++) {
+ for (int k = 0; k < n_keys; k++) {
+ std::string key = "key_" + std::to_string(k);
+ ItemMetaData itm_meta;
+ itm_meta.cas = hlc.nextHLC();
+ if (ii == aheadVb) {
+ // Push this guy *far* ahead (1 year)
+ itm_meta.cas += 3154E10;
+ } else if(ii == behindVb) {
+ // just be sure it was already greater then 1 + driftthreshold
+ checkge(itm_meta.cas, uint64_t(1) + driftBehindThreshold,
+ "HLC was already zero");
+ // set to be way way behind...
+ itm_meta.cas = 1;
+ }
+ set_with_meta(h, h1, key.data(), key.size(), NULL, 0, ii, &itm_meta,
+ 0, false, PROTOCOL_BINARY_RAW_BYTES, true, 0);
+ checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
+ "Expected success");
+ }
+ }
+
+ // Bucket stats should report drift
+ checkge(get_ull_stat(h, h1, "ep_active_hlc_drift"), uint64_t(0),
+ "Expected drift above zero");
+ checkeq(uint64_t(n_keys), get_ull_stat(h, h1, "ep_active_hlc_drift_count"),
+ "Expected ahead counter to match mutations");
+
+ // Victim VBs should have exceptions
+ {
+ std::string vbAheadName = "vb_" + std::to_string(aheadVb);
+ std::string ahead_threshold_exceeded = vbAheadName + ":drift_ahead_threshold_exceeded";
+ std::string behind_threshold_exceeded = vbAheadName + ":drift_behind_threshold_exceeded";
+ std::string total_abs_drift = vbAheadName + ":total_abs_drift";
+ std::string details = "vbucket-details " + std::to_string(aheadVb);
+ checkeq(uint64_t(n_keys), get_ull_stat(h, h1, ahead_threshold_exceeded.data(), details.data()),
+ "Expected ahead threshold to match mutations");
+ checkeq(uint64_t(0), get_ull_stat(h, h1, behind_threshold_exceeded.data(), details.data()),
+ "Expected no behind exceptions");
+ checkge(get_ull_stat(h, h1, total_abs_drift.data(), details.data()), uint64_t(0),
+ "Expected some drift");
+ }
+
+ {
+ std::string vbBehindName = "vb_" + std::to_string(behindVb);
+ std::string ahead_threshold_exceeded = vbBehindName + ":drift_ahead_threshold_exceeded";
+ std::string behind_threshold_exceeded = vbBehindName + ":drift_behind_threshold_exceeded";
+ std::string total_abs_drift = vbBehindName + ":total_abs_drift";
+ std::string details = "vbucket-details " + std::to_string(behindVb);
+ checkeq(uint64_t(n_keys), get_ull_stat(h, h1, behind_threshold_exceeded.data(), details.data()),
+ "Expected behind threshold to match mutations");
+ checkeq(uint64_t(0), get_ull_stat(h, h1, ahead_threshold_exceeded.data(), details.data()),
+ "Expected no ahead exceptions");
+ checkge(get_ull_stat(h, h1, total_abs_drift.data(), details.data()), uint64_t(0),
+ "Expected some drift");
+ }
+
+
+ return SUCCESS;
+}
+
+static enum test_result test_del_with_meta_and_check_drift_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+ // Activate n vbuckets (vb 0 is already)
+ const int n_vbuckets = 10;
+ for (int ii = 1; ii < n_vbuckets; ii++) {
+ check(set_vbucket_state(h, h1, ii, vbucket_state_active),
+ "Failed to set vbucket state.");
+ }
+
+ // Let's make vbucket n/2 be the one who is ahead, n/3 is behind
+ const int aheadVb = n_vbuckets/2;
+ const int behindVb = n_vbuckets/3;
+ checkne(aheadVb, behindVb, "Cannot have the same VB as ahead/behind");
+
+ HLC hlc(0/*init HLC*/, 0/*ahead threshold*/, 0/*behind threshold*/);
+
+ // grab the drift behind threshold
+ uint64_t driftBehindThreshold = get_ull_stat(h, h1,
+ "ep_hlc_ahead_threshold_us",
+ nullptr);
+ // Create n keys * n_vbuckets
+ const int n_keys = 5;
+ for (int ii = 0 ; ii < n_vbuckets; ii++) {
+ for (int k = 0; k < n_keys; k++) {
+ std::string key = "key_" + std::to_string(k);
+
+ // In the del_with_meta test we want to pretend a del_wm came from
+ // the past, so we want to ensure a delete doesn't get rejected
+ // by LWW conflict resolution, thus write all documents that are
+ // going to be deleted with set_with_meta, and write them way in the past.
+ // This will trigger threshold and increment drift stats... so we
+ // account for these later
+ ItemMetaData itm_meta;
+ itm_meta.cas = 1; // set to 1
+ set_with_meta(h, h1, key.data(), key.size(), NULL, 0, ii, &itm_meta,
+ 0, false, PROTOCOL_BINARY_RAW_BYTES, true, 0);
+ checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
+ "Expected success");
+ }
+ }
+
+ checkeq(uint64_t(0), get_ull_stat(h, h1, "ep_active_ahead_exceptions"),
+ "Expected ahead counter to match mutations");
+ checkeq(uint64_t(n_keys*n_vbuckets), get_ull_stat(h, h1, "ep_active_behind_exceptions"),
+ "Expected behind counter to match mutations");
+
+ // Del_with_meta n_keys to n_vbuckets
+ for (int ii = 0 ; ii < n_vbuckets; ii++) {
+ for (int k = 0; k < n_keys; k++) {
+ std::string key = "key_" + std::to_string(k);
+ ItemMetaData itm_meta;
+ itm_meta.cas = hlc.nextHLC();
+ if (ii == aheadVb) {
+ // Push this guy *far* ahead (1 year)
+ itm_meta.cas += 3154E10;
+ } else if(ii == behindVb) {
+ // just be sure it was already greater than 1 + driftthreshold
+ checkge(itm_meta.cas, uint64_t(1) + driftBehindThreshold,
+ "HLC was already zero");
+ // set to be way way behind, but ahead of the documents we have set
+ itm_meta.cas = 2;
+ }
+ del_with_meta(h, h1, key.data(), key.size(), ii, &itm_meta,
+ 1, false, PROTOCOL_BINARY_RAW_BYTES);
+ checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
+ "Expected success");
+ }
+ }
+
+ // Bucket stats should report drift
+ checkge(get_ull_stat(h, h1, "ep_active_hlc_drift"), uint64_t(0),
+ "Expected drift above zero");
+ checkeq(uint64_t(n_keys*2), get_ull_stat(h, h1, "ep_active_hlc_drift_count"),
+ "Expected ahead counter to match mutations");
+
+ // and should report total exception of all VBs
+ checkeq(uint64_t(n_keys), get_ull_stat(h, h1, "ep_active_ahead_exceptions"),
+ "Expected ahead counter to match mutations");
+ checkeq(uint64_t(n_keys + (n_keys*n_vbuckets)), get_ull_stat(h, h1, "ep_active_behind_exceptions"),
+ "Expected behind counter to match mutations");
+
+ // Victim VBs should have exceptions
+ {
+ std::string vbAheadName = "vb_" + std::to_string(aheadVb);
+ std::string ahead_threshold_exceeded = vbAheadName + ":drift_ahead_threshold_exceeded";
+ std::string behind_threshold_exceeded = vbAheadName + ":drift_behind_threshold_exceeded";
+ std::string total_abs_drift = vbAheadName + ":total_abs_drift";
+ std::string details = "vbucket-details " + std::to_string(aheadVb);
+
+ checkeq(uint64_t(n_keys),
+ get_ull_stat(h, h1, ahead_threshold_exceeded.data(), details.data()),
+ "Expected ahead threshold to match mutations");
+ checkge(get_ull_stat(h, h1, total_abs_drift.data(), details.data()), uint64_t(0),
+ "Expected some drift");
+ }
+
+ {
+ std::string vbBehindName = "vb_" + std::to_string(behindVb);
+ std::string ahead_threshold_exceeded = vbBehindName + ":drift_ahead_threshold_exceeded";
+ std::string behind_threshold_exceeded = vbBehindName + ":drift_behind_threshold_exceeded";
+ std::string total_abs_drift = vbBehindName + ":total_abs_drift";
+ std::string details = "vbucket-details " + std::to_string(behindVb);
+
+ // *2 behind due to the initial set_with_meta
+ checkeq(uint64_t(n_keys*2), get_ull_stat(h, h1, behind_threshold_exceeded.data(), details.data()),
+ "Expected behind threshold to match mutations");
+ checkeq(uint64_t(0), get_ull_stat(h, h1, ahead_threshold_exceeded.data(), details.data()),
+ "Expected no ahead exceptions");
+ checkge(get_ull_stat(h, h1, total_abs_drift.data(), details.data()), uint64_t(0),
+ "Expected some drift");
+ }
+
+
+ return SUCCESS;
+}
// Test manifest //////////////////////////////////////////////////////////////
test_getMeta_with_item_eviction, test_setup, teardown,
"item_eviction_policy=full_eviction", prepare, cleanup),
+ TestCase("test set_with_meta and drift stats",
+ test_set_with_meta_and_check_drift_stats, test_setup,
+ teardown, "hlc_ahead_threshold_us=5000000;hlc_behind_threshold_us=0;conflict_resolution_type=lww",
+ prepare, cleanup),
+ TestCase("test del_with_meta and drift stats",
+ test_del_with_meta_and_check_drift_stats, test_setup,
+ teardown, "hlc_ahead_threshold_us=0;hlc_behind_threshold_us=5000000;conflict_resolution_type=lww",
+ prepare, cleanup),
+
TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
};
checkpoint_config, /*kvshard*/NULL,
/*lastSeqno*/1000, /*lastSnapStart*/0,
/*lastSnapEnd*/0, /*table*/NULL,
- callback)) {
+ callback, config)) {
createManager();
}
EPStats global_stats;
CheckpointConfig checkpoint_config;
-
+ Configuration config;
std::shared_ptr<Callback<uint16_t> > callback;
RCPtr<VBucket> vbucket;
std::unique_ptr<CheckpointManager> manager;
std::shared_ptr<Callback<uint16_t> > cb(new DummyCB());
RCPtr<VBucket> vbucket(new VBucket(0, vbucket_state_active, global_stats,
checkpoint_config, NULL, 0, 0, 0, NULL,
- cb));
+ cb, config));
CheckpointManager *checkpoint_manager = new CheckpointManager(global_stats, 0,
checkpoint_config,
std::shared_ptr<Callback<uint16_t> > cb(new DummyCB());
RCPtr<VBucket> vbucket(new VBucket(0, vbucket_state_active, global_stats,
checkpoint_config, NULL, 0, 0, 0, NULL,
- cb));
+ cb, config));
CheckpointManager *manager =
new CheckpointManager(global_stats, 0, checkpoint_config, 1, 0, 0, cb);
/* Create the vbucket */
std::shared_ptr<Callback<uint16_t> > cb(new DummyCB());
- vbucket.reset(new VBucket(0, vbucket_state_active, stats, config,
- nullptr, 0, 0, 0, nullptr, cb));
+ vbucket.reset(new VBucket(0, vbucket_state_active, stats, chkConfig,
+ nullptr, 0, 0, 0, nullptr, cb, config));
}
static void TearDownTestCase() {
}
static EPStats stats;
- static CheckpointConfig config;
+ static CheckpointConfig chkConfig;
+ static Configuration config;
static std::unique_ptr<VBucket> vbucket;
};
EPStats DefragmenterBenchmarkTest::stats;
-CheckpointConfig DefragmenterBenchmarkTest::config;
+CheckpointConfig DefragmenterBenchmarkTest::chkConfig;
+Configuration DefragmenterBenchmarkTest::config;
std::unique_ptr<VBucket> DefragmenterBenchmarkTest::vbucket;
/* Create the vbucket */
std::shared_ptr<Callback<uint16_t> > cb(new DummyCB());
EPStats stats;
- CheckpointConfig config;
- VBucket vbucket(0, vbucket_state_active, stats, config, nullptr, 0, 0, 0,
- nullptr, cb);
+ CheckpointConfig chkConfig;
+ Configuration config;
+ VBucket vbucket(0, vbucket_state_active, stats, chkConfig, nullptr, 0, 0, 0,
+ nullptr, cb, config);
// 1. Create a number of small documents. Doesn't really matter that
// they are small, main thing is we create enough to span multiple