MB-20725: LWW monitoring 58/68158/13
authorJim Walker <jim@couchbase.com>
Thu, 29 Sep 2016 14:58:18 +0000 (15:58 +0100)
committerDave Rigby <daver@couchbase.com>
Wed, 12 Oct 2016 07:42:52 +0000 (07:42 +0000)
Add a number of new stats to the bucket stats and vbucket-details.

Add drift +/- threshold config parameters.

engine stats:

1. The maximum absolute cummulative-drift of the active vbuckets. See
    vbucket-details total_abs_drift
        ep_active_hlc_drift

2. The number of updates applied to ep_active_hlc_drift
        ep_active_hlc_drift_count

3. The maximum absolute drift of the replica vbuckets
        ep_replica_hlc_drift

4. The number of counts applied to ep_replica_hlc_drift
        ep_replica_hlc_drift_count

5. The total number of times a setMaxCas was from a peer who is
   ahead of the ahead threshold (hlc_ahead_threshold_us).
        ep_active_ahead_exceptions

6. The total number of times a setMaxCas was from a peer who is
   behind of the behind threshold (hlc_behind_threshold_us).
        ep_active_behind_exceptions

7. The total number of times a setMaxCas was from a peer who is
   ahead of the ahead threshold (hlc_ahead_threshold_us) for
   replica VBs.
        ep_replica_ahead_exceptions

8. The total number of times a setMaxCas was from a peer who is
   behind of the behind threshold (hlc_behind_threshold_us) for
   replica VBs.
        ep_replica_behind_exceptions

vbucket-details stats:

1. The current max_hlc (reported as max_cas for compatibility with
   other modules)
        vb_n:max_cas

2. The vbucket's absolute cummulative drift
        vb_n:total_abs_drift

3. How many updates have been applied to total_abs_drift
        vb_n:total_abs_drift_count

4. How many times the ahead threshold has been exceeded.
        vb_n:drift_ahead_threshold_exceeded

5. How many times the behind threshold has been exceeded.
        vb_n:drift_behind_threshold_exceeded

6. How many logical clock ticks this vbucket's HLC has returned
        vb_n:logical_clock_tick

Change-Id: I063782d4451b97f58a3c89208506bd8bd08b705e
Reviewed-on: http://review.couchbase.org/68158
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
12 files changed:
configuration.json
src/ep.cc
src/ep_engine.cc
src/ep_engine.h
src/hlc.h [new file with mode: 0644]
src/vbucket.cc
src/vbucket.h
src/warmup.cc
tests/ep_testsuite.cc
tests/ep_testsuite_xdcr.cc
tests/module_tests/checkpoint_test.cc
tests/module_tests/defragmenter_test.cc

index 9702e9e..c284010 100644 (file)
             "descr": "The maximum timeout for a getl lock in (s)",
             "type": "size_t"
         },
+        "hlc_ahead_threshold_us": {
+            "default": "5000000",
+            "descr": "The μs threshold at which we will increment a vbucket's ahead counter.",
+            "type": "size_t"
+        },
+        "hlc_behind_threshold_us": {
+            "default": "5000000",
+            "descr": "The μs threshold at which we will increment a vbucket's behind counter.",
+            "type": "size_t"
+        },
         "ht_locks": {
             "default": "47",
             "type": "size_t"
index a9697e9..922f1ce 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -1325,10 +1325,12 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
         FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
         KVShard* shard = vbMap.getShardByVbId(vbid);
         std::shared_ptr<Callback<uint16_t> > cb(new NotifyFlusherCB(shard));
+        Configuration& config = engine.getConfiguration();
         RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
                                          engine.getCheckpointConfig(),
-                                         shard, 0, 0, 0, ft, cb));
-        Configuration& config = engine.getConfiguration();
+                                         shard, 0, 0, 0, ft, cb,
+                                         config));
+
         if (config.isBfilterEnabled()) {
             // Initialize bloom filters upon vbucket creation during
             // bucket creation and rebalance
@@ -2357,7 +2359,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
         break;
     case WAS_DIRTY:
     case WAS_CLEAN:
-        vb->setMaxCas(v->getCas());
+        vb->setMaxCasAndTrackDrift(v->getCas());
         queueDirty(vb, v, &lh, seqno,
                    genBySeqno ? GenerateBySeqno::Yes : GenerateBySeqno::No,
                    GenerateCas::No);
@@ -3028,7 +3030,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
             v->setBySeqno(bySeqno);
         }
 
-        vb->setMaxCas(v->getCas());
+        vb->setMaxCasAndTrackDrift(v->getCas());
 
         if (tapBackfill) {
             tapQueueDirty(vb, v, lh, seqno,
index 9b4a32f..fbb784f 100644 (file)
@@ -3076,6 +3076,22 @@ bool VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
         pendingWrites += vb->dirtyQueuePendingWrites;
 
         rollbackItemCount += vb->getRollbackItemCount();
+
+        /*
+         * The bucket stat only reports the largest drift of the vbuckets.
+         */
+        auto driftStats = vb->getHLCDriftStats();
+        // If this vbucket's max is bigger than ours
+        if (driftStats.total > maxAbsHLCDrift.total) {
+            maxAbsHLCDrift = driftStats;
+        }
+
+        /*
+         * Total up the exceptions
+         */
+        auto driftExceptionCounters = vb->getHLCDriftExceptionCounters();
+        totalHLCDriftExceptionCounters.ahead += driftExceptionCounters.ahead;
+        totalHLCDriftExceptionCounters.behind += driftExceptionCounters.behind;
     }
 
     return false;
@@ -3658,6 +3674,29 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
         add_casted_stat("ep_io_compaction_write_bytes",  value, add_stat, cookie);
     }
 
+    // Add stats for tracking HLC drift
+    add_casted_stat("ep_active_hlc_drift",
+        activeCountVisitor.getMaxAbsHLCDrift().total, add_stat, cookie);
+    add_casted_stat("ep_active_hlc_drift_count",
+        activeCountVisitor.getMaxAbsHLCDrift().updates, add_stat, cookie);
+    add_casted_stat("ep_replica_hlc_drift",
+        replicaCountVisitor.getMaxAbsHLCDrift().total, add_stat, cookie);
+    add_casted_stat("ep_replica_hlc_drift_count",
+        replicaCountVisitor.getMaxAbsHLCDrift().updates, add_stat, cookie);
+
+    add_casted_stat("ep_active_ahead_exceptions",
+        activeCountVisitor.getTotalHLCDriftExceptionCounters().ahead,
+        add_stat, cookie);
+    add_casted_stat("ep_active_behind_exceptions",
+        activeCountVisitor.getTotalHLCDriftExceptionCounters().behind,
+        add_stat, cookie);
+    add_casted_stat("ep_replica_ahead_exceptions",
+        replicaCountVisitor.getTotalHLCDriftExceptionCounters().ahead,
+        add_stat, cookie);
+    add_casted_stat("ep_replica_behind_exceptions",
+        replicaCountVisitor.getTotalHLCDriftExceptionCounters().behind,
+        add_stat, cookie);
+
     return ENGINE_SUCCESS;
 }
 
index 0c1f411..56d9a00 100644 (file)
@@ -107,7 +107,8 @@ public:
         queueFill(0), queueDrain(0),
         pendingWrites(0), chkPersistRemaining(0),
         fileSpaceUsed(0), fileSize(0),
-        rollbackItemCount(0)
+        rollbackItemCount(0), maxAbsHLCDrift(),
+        totalHLCDriftExceptionCounters()
     { }
 
     bool visitBucket(RCPtr<VBucket> &vb);
@@ -163,6 +164,9 @@ public:
 
     uint64_t getRollbackItemCount() { return rollbackItemCount; }
 
+    HLC::DriftStats getMaxAbsHLCDrift() {return maxAbsHLCDrift;}
+    HLC::DriftExceptions getTotalHLCDriftExceptionCounters() {return totalHLCDriftExceptionCounters;}
+
 private:
     EventuallyPersistentEngine &engine;
     vbucket_state_t desired_state;
@@ -196,6 +200,8 @@ private:
     size_t fileSize;
 
     uint64_t rollbackItemCount;
+    HLC::DriftStats maxAbsHLCDrift;
+    HLC::DriftExceptions totalHLCDriftExceptionCounters;
 };
 
 /**
diff --git a/src/hlc.h b/src/hlc.h
new file mode 100644 (file)
index 0000000..293617f
--- /dev/null
+++ b/src/hlc.h
@@ -0,0 +1,168 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2016 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+
+#include "atomic.h"
+#include "statwriter.h"
+
+/*
+ * HLC manages a hybrid logical clock for 'time' stamping events.
+ * The class only implements the send logic of the HLC algorithm.
+ *   - http://www.cse.buffalo.edu/tech-reports/2014-04.pdf
+ *
+ * The class allows multiple threads to concurrently call
+ *  - nextHLC
+ *  - setMaxCas
+ *  - setMaxHLCAndTrackDrift
+ *
+ * The paired "drift" counter, which is used to monitor drift over time
+ * isn't atomic in that the total and update parts are not a consistent
+ * snapshot.
+ */
+class HLC {
+public:
+    struct DriftStats {
+        uint64_t total;
+        uint64_t updates;
+    };
+
+    struct DriftExceptions {
+        uint32_t ahead;
+        uint32_t behind;
+    };
+
+    /*
+     * @param initHLC a HLC value to start from
+     * @param aheadThresholdAhead threshold a peer can be ahead before we
+     *        increment driftAheadExceeded
+     * @param behindThresholdhreshold a peer can be ahead before we
+     *        increment driftBehindExceeded
+     */
+    HLC(uint64_t initHLC, uint64_t aheadThreshold, uint64_t behindThreshold)
+        : maxHLC(initHLC),
+          cummulativeDrift(0),
+          cummulativeDriftIncrements(0),
+          logicalClockTicks(0),
+          driftAheadExceeded(0),
+          driftBehindExceeded(0),
+          driftAheadThreshold(aheadThreshold),
+          driftBehindThreshold(behindThreshold) {}
+
+    uint64_t nextHLC() {
+        // Create a monotonic timestamp using part of the HLC algorithm by.
+        // a) Reading system time
+        // b) dropping 16-bits (done by nowHLC)
+        // c) comparing it with the last known time (max_cas)
+        // d) returning either now or max_cas + 1
+        uint64_t timeNow = getMasked48(getTime());
+        uint64_t l = maxHLC.load();
+
+        if (timeNow > l) {
+            atomic_setIfBigger(maxHLC, timeNow);
+            return timeNow;
+        }
+        logicalClockTicks++;
+        atomic_setIfBigger(maxHLC, l + 1);
+        return l + 1;
+    }
+
+    void setMaxHLCAndTrackDrift(uint64_t hlc) {
+        auto timeNow = getMasked48(getTime());
+
+        // Track the +/- difference between our time and their time
+        int64_t difference = getMasked48(hlc) - timeNow;
+
+        // Accumulate the absolute drift
+        cummulativeDrift += std::abs(difference);
+        cummulativeDriftIncrements++;
+
+        // If the difference is greater, count peer ahead exeception
+        // If the difference is less than our -ve threshold.. count that
+        if (difference > int64_t(driftAheadThreshold)) {
+            driftAheadExceeded++;
+        } else if(difference < (0 - int64_t(driftBehindThreshold))) {
+            driftBehindExceeded++;
+        }
+
+        setMaxHLC(hlc);
+    }
+
+    void setMaxHLC(uint64_t hlc) {
+        atomic_setIfBigger(maxHLC, hlc);
+    }
+
+    uint64_t getMaxHLC() const {
+        return maxHLC;
+    }
+
+    DriftStats getDriftStats() const {
+        // Deliberately not locking to read this pair
+        return {cummulativeDrift, cummulativeDriftIncrements};
+    }
+
+    DriftExceptions getDriftExceptionCounters() const {
+        // Deliberately not locking to read this pair
+        return {driftAheadExceeded, driftBehindExceeded};
+    }
+
+    void addStats(const std::string& prefix, ADD_STAT add_stat, const void *c) const {
+        add_prefixed_stat(prefix.data(), "max_cas", getMaxHLC(), add_stat, c);
+        add_prefixed_stat(prefix.data(), "total_abs_drift", cummulativeDrift.load(), add_stat, c);
+        add_prefixed_stat(prefix.data(), "total_abs_drift_count", cummulativeDriftIncrements.load(), add_stat, c);
+        add_prefixed_stat(prefix.data(), "drift_ahead_threshold_exceeded", driftAheadExceeded.load(), add_stat, c);
+        add_prefixed_stat(prefix.data(), "drift_ahead_threshold", driftAheadThreshold.load(), add_stat, c);
+        add_prefixed_stat(prefix.data(), "drift_behind_threshold_exceeded", driftBehindExceeded.load(), add_stat, c);
+        add_prefixed_stat(prefix.data(), "drift_behind_threshold", driftBehindThreshold.load(), add_stat, c);
+        add_prefixed_stat(prefix.data(), "logical_clock_ticks", logicalClockTicks.load(), add_stat, c);
+    }
+
+private:
+    /*
+     * Returns 48-bit of t (bottom 16-bit zero)
+     */
+    static int64_t getMasked48(int64_t t) {
+        return t & ~((1<<16)-1);
+    }
+
+    static int64_t getTime() {
+        auto now = std::chrono::system_clock::now();
+        return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
+    }
+
+    /*
+     * maxHLC tracks the current highest time, either our own or a peer who
+     * has a larger clock value. nextHLC and setMax* methods change this and can
+     * be called from different threads
+     */
+    std::atomic<uint64_t> maxHLC;
+
+    /*
+     * The following are used for stats/drift tracking.
+     * many threads could be setting cas so they need to be atomically
+     * updated for consisent totals.
+     */
+    std::atomic<uint64_t> cummulativeDrift;
+    std::atomic<uint64_t> cummulativeDriftIncrements;
+    std::atomic<uint64_t> logicalClockTicks;
+    std::atomic<uint32_t> driftAheadExceeded;
+    std::atomic<uint32_t> driftBehindExceeded;
+    std::atomic<uint64_t> driftAheadThreshold;
+    std::atomic<uint64_t> driftBehindThreshold;
+};
\ No newline at end of file
index d66a8d5..8b298fc 100644 (file)
@@ -265,15 +265,12 @@ void VBucket::resetStats() {
 template <typename T>
 void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
                       const void *c) {
-    std::stringstream name;
-    name << "vb_" << id;
+    std::string stat = statPrefix;
     if (nm != NULL) {
-        name << ":" << nm;
+        add_prefixed_stat(statPrefix, nm, val, add_stat, c);
+    } else {
+        add_casted_stat(statPrefix.data(), val, add_stat, c);
     }
-    std::stringstream value;
-    value << val;
-    std::string n = name.str();
-    add_casted_stat(n.data(), value.str().data(), add_stat, c);
 }
 
 size_t VBucket::queueBGFetchItem(const std::string &key,
@@ -603,27 +600,6 @@ size_t VBucket::getNumOfKeysInFilter() {
     }
 }
 
-uint64_t VBucket::nextHLCCas() {
-    // Create a monotonic timestamp using part of the HLC algorithm by.
-    // a) Reading system time (now)
-    // b) dropping 16-bits (masking 65355 µs)
-    // c) comparing it with the last known time (max_cas)
-    // d) returning either now or max_cas + 1
-    auto now = std::chrono::duration_cast<std::chrono::microseconds>
-            (std::chrono::system_clock::now().time_since_epoch()).count();
-
-    uint64_t now48bits = ((uint64_t)now) & ~((1 << 16) - 1);
-    uint64_t local_max_cas = max_cas.load();
-
-    if (now48bits > local_max_cas) {
-        atomic_setIfBigger(max_cas, now48bits);
-        return now48bits;
-    }
-
-    atomic_setIfBigger(max_cas, local_max_cas + 1);
-    return local_max_cas + 1;
-}
-
 void VBucket::addStats(bool details, ADD_STAT add_stat, const void *c,
                        item_eviction_policy_t policy) {
     addStat(NULL, toString(state), add_stat, c);
@@ -636,20 +612,20 @@ void VBucket::addStats(bool details, ADD_STAT add_stat, const void *c,
                 add_stat, c);
         addStat("ht_memory", ht.memorySize(), add_stat, c);
         addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
-        addStat("ht_cache_size", ht.cacheSize, add_stat, c);
+        addStat("ht_cache_size", ht.cacheSize.load(), add_stat, c);
         addStat("num_ejects", ht.getNumEjects(), add_stat, c);
-        addStat("ops_create", opsCreate, add_stat, c);
-        addStat("ops_update", opsUpdate, add_stat, c);
-        addStat("ops_delete", opsDelete, add_stat, c);
-        addStat("ops_reject", opsReject, add_stat, c);
-        addStat("queue_size", dirtyQueueSize, add_stat, c);
-        addStat("queue_memory", dirtyQueueMem, add_stat, c);
-        addStat("queue_fill", dirtyQueueFill, add_stat, c);
-        addStat("queue_drain", dirtyQueueDrain, add_stat, c);
+        addStat("ops_create", opsCreate.load(), add_stat, c);
+        addStat("ops_update", opsUpdate.load(), add_stat, c);
+        addStat("ops_delete", opsDelete.load(), add_stat, c);
+        addStat("ops_reject", opsReject.load(), add_stat, c);
+        addStat("queue_size", dirtyQueueSize.load(), add_stat, c);
+        addStat("queue_memory", dirtyQueueMem.load(), add_stat, c);
+        addStat("queue_fill", dirtyQueueFill.load(), add_stat, c);
+        addStat("queue_drain", dirtyQueueDrain.load(), add_stat, c);
         addStat("queue_age", getQueueAge(), add_stat, c);
-        addStat("pending_writes", dirtyQueuePendingWrites, add_stat, c);
-        addStat("db_data_size", fileSpaceUsed, add_stat, c);
-        addStat("db_file_size", fileSize, add_stat, c);
+        addStat("pending_writes", dirtyQueuePendingWrites.load(), add_stat, c);
+        addStat("db_data_size", fileSpaceUsed.load(), add_stat, c);
+        addStat("db_file_size", fileSize.load(), add_stat, c);
         addStat("high_seqno", getHighSeqno(), add_stat, c);
         addStat("uuid", failovers->getLatestUUID(), add_stat, c);
         addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
@@ -657,8 +633,8 @@ void VBucket::addStats(bool details, ADD_STAT add_stat, const void *c,
                 add_stat, c);
         addStat("bloom_filter_size", getFilterSize(), add_stat, c);
         addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
-        addStat("max_cas", getMaxCas(), add_stat, c);
         addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
+        hlc.addStats(statPrefix, add_stat, c);
     }
 }
 
index 2389161..286918b 100644 (file)
@@ -24,6 +24,7 @@
 #include "checkpoint.h"
 #include "ep_types.h"
 #include "failover-table.h"
+#include "hlc.h"
 #include "kvstore.h"
 #include "stored-value.h"
 #include "utility.h"
@@ -144,6 +145,7 @@ public:
             int64_t lastSeqno, uint64_t lastSnapStart,
             uint64_t lastSnapEnd, FailoverTable *table,
             std::shared_ptr<Callback<id_type> > cb,
+            Configuration& config,
             vbucket_state_t initState = vbucket_state_dead,
             uint64_t chkId = 1, uint64_t purgeSeqno = 0,
             uint64_t maxCas = 0):
@@ -170,7 +172,6 @@ public:
         initialState(initState),
         stats(st),
         purge_seqno(purgeSeqno),
-        max_cas(maxCas),
         takeover_backed_up(false),
         persisted_snapshot_start(lastSnapStart),
         persisted_snapshot_end(lastSnapEnd),
@@ -178,7 +179,11 @@ public:
         shard(kvshard),
         bFilter(NULL),
         tempFilter(NULL),
-        rollbackItemCount(0)
+        rollbackItemCount(0),
+        hlc(maxCas,
+            config.getHlcAheadThresholdUs(),
+            config.getHlcBehindThresholdUs()),
+        statPrefix("vb_" + std::to_string(i))
     {
         backfill.isBackfillPhase = false;
         pendingOpsStart = 0;
@@ -194,7 +199,7 @@ public:
             id, VBucket::toString(state), VBucket::toString(initialState),
             lastSeqno, lastSnapStart, lastSnapEnd,
             persisted_snapshot_start, persisted_snapshot_end,
-            max_cas.load());
+            getMaxCas());
     }
 
     ~VBucket();
@@ -232,11 +237,23 @@ public:
     }
 
     uint64_t getMaxCas() {
-        return max_cas;
+        return hlc.getMaxHLC();
     }
 
     void setMaxCas(uint64_t cas) {
-        atomic_setIfBigger(max_cas, cas);
+        hlc.setMaxHLC(cas);
+    }
+
+    void setMaxCasAndTrackDrift(uint64_t cas) {
+        hlc.setMaxHLCAndTrackDrift(cas);
+    }
+
+    HLC::DriftStats getHLCDriftStats() const {
+        return hlc.getDriftStats();
+    }
+
+    HLC::DriftExceptions getHLCDriftExceptionCounters() const {
+        return hlc.getDriftExceptionCounters();
     }
 
     bool isTakeoverBackedUp() {
@@ -397,7 +414,9 @@ public:
     size_t getFilterSize();
     size_t getNumOfKeysInFilter();
 
-    uint64_t nextHLCCas();
+    uint64_t nextHLCCas() {
+        return hlc.nextHLC();
+    }
 
     // Applicable only for FULL EVICTION POLICY
     bool isResidentRatioUnderThreshold(float threshold,
@@ -498,7 +517,6 @@ private:
     EPStats                        &stats;
     uint64_t                        purge_seqno;
 
-    AtomicValue<uint64_t>           max_cas;
     AtomicValue<bool>               takeover_backed_up;
 
     Mutex pendingBGFetchesLock;
@@ -521,6 +539,9 @@ private:
 
     AtomicValue<uint64_t> rollbackItemCount;
 
+    HLC hlc;
+    std::string statPrefix;
+
     static size_t chkFlushTimeout;
 
     DISALLOW_COPY_AND_ASSIGN(VBucket);
index ddcd79a..456039a 100644 (file)
@@ -485,8 +485,8 @@ void Warmup::createVBuckets(uint16_t shardId) {
                                  store.getEPEngine().getEpStats(),
                                  store.getEPEngine().getCheckpointConfig(),
                                  shard, vbs.highSeqno, vbs.lastSnapStart,
-                                 vbs.lastSnapEnd, table, cb, vbs.state, 1,
-                                 vbs.purgeSeqno, vbs.maxCas));
+                                 vbs.lastSnapEnd, table, cb, config,
+                                 vbs.state, 1, vbs.purgeSeqno, vbs.maxCas));
 
             if(vbs.state == vbucket_state_active && !cleanShutdown) {
                 if (static_cast<uint64_t>(vbs.highSeqno) == vbs.lastSnapEnd) {
index d2fa262..1fe0491 100644 (file)
@@ -36,6 +36,7 @@
 #include <string>
 #include <thread>
 #include <unordered_map>
+#include <unordered_set>
 #include <vector>
 
 #include "atomic.h"
@@ -5871,10 +5872,15 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
                 "vb_0:bloom_filter_size",
                 "vb_0:db_data_size",
                 "vb_0:db_file_size",
+                "vb_0:drift_ahead_threshold",
+                "vb_0:drift_ahead_threshold_exceeded",
+                "vb_0:drift_behind_threshold",
+                "vb_0:drift_behind_threshold_exceeded",
                 "vb_0:high_seqno",
                 "vb_0:ht_cache_size",
                 "vb_0:ht_item_memory",
                 "vb_0:ht_memory",
+                "vb_0:logical_clock_ticks",
                 "vb_0:max_cas",
                 "vb_0:num_ejects",
                 "vb_0:num_items",
@@ -5892,6 +5898,8 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
                 "vb_0:queue_memory",
                 "vb_0:queue_size",
                 "vb_0:rollback_item_count",
+                "vb_0:total_abs_drift",
+                "vb_0:total_abs_drift_count",
                 "vb_0:uuid"
             }
         },
@@ -6163,6 +6171,8 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
                 "ep_flushall_enabled",
                 "ep_getl_default_timeout",
                 "ep_getl_max_timeout",
+                "ep_hlc_ahead_threshold_us",
+                "ep_hlc_behind_threshold_us",
                 "ep_ht_locks",
                 "ep_ht_size",
                 "ep_initfile",
@@ -6260,17 +6270,310 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
                 "vb_0:data_size",
                 "vb_0:file_size"
             }
+        },
+        {"", // Note: we convert empty to a null to get engine stats
+            {
+                "bytes",
+                "curr_items",
+                "curr_items_tot",
+                "curr_temp_items",
+                "ep_access_scanner_enabled",
+                "ep_access_scanner_last_runtime",
+                "ep_access_scanner_num_items",
+                "ep_access_scanner_task_time",
+                "ep_active_ahead_exceptions",
+                "ep_active_behind_exceptions",
+                "ep_active_hlc_drift",
+                "ep_active_hlc_drift_count",
+                "ep_alog_block_size",
+                "ep_alog_path",
+                "ep_alog_resident_ratio_threshold",
+                "ep_alog_sleep_time",
+                "ep_alog_task_time",
+                "ep_backend",
+                "ep_backfill_mem_threshold",
+                "ep_bfilter_enabled",
+                "ep_bfilter_fp_prob",
+                "ep_bfilter_key_count",
+                "ep_bfilter_residency_threshold",
+                "ep_bg_fetch_delay",
+                "ep_bg_fetched",
+                "ep_bg_meta_fetched",
+                "ep_bg_remaining_jobs",
+                "ep_blob_num",
+                "ep_blob_overhead",
+                "ep_bucket_priority",
+                "ep_chk_max_items",
+                "ep_chk_period",
+                "ep_chk_persistence_remains",
+                "ep_chk_persistence_timeout",
+                "ep_chk_remover_stime",
+                "ep_commit_num",
+                "ep_commit_time",
+                "ep_commit_time_total",
+                "ep_compaction_exp_mem_threshold",
+                "ep_compaction_write_queue_cap",
+                "ep_config_file",
+                "ep_conflict_resolution_type",
+                "ep_couch_bucket",
+                "ep_cursor_dropping_lower_mark",
+                "ep_cursor_dropping_lower_threshold",
+                "ep_cursor_dropping_upper_mark",
+                "ep_cursor_dropping_upper_threshold",
+                "ep_cursors_dropped",
+                "ep_data_traffic_enabled",
+                "ep_db_data_size",
+                "ep_db_file_size",
+                "ep_dbname",
+                "ep_dcp_backfill_byte_limit",
+                "ep_dcp_conn_buffer_size",
+                "ep_dcp_conn_buffer_size_aggr_mem_threshold",
+                "ep_dcp_conn_buffer_size_aggressive_perc",
+                "ep_dcp_conn_buffer_size_max",
+                "ep_dcp_conn_buffer_size_perc",
+                "ep_dcp_consumer_process_buffered_messages_batch_size",
+                "ep_dcp_consumer_process_buffered_messages_yield_limit",
+                "ep_dcp_enable_noop",
+                "ep_dcp_flow_control_policy",
+                "ep_dcp_max_unacked_bytes",
+                "ep_dcp_min_compression_ratio",
+                "ep_dcp_noop_interval",
+                "ep_dcp_producer_snapshot_marker_yield_limit",
+                "ep_dcp_scan_byte_limit",
+                "ep_dcp_scan_item_limit",
+                "ep_dcp_takeover_max_time",
+                "ep_dcp_value_compression_enabled",
+                "ep_defragmenter_age_threshold",
+                "ep_defragmenter_chunk_duration",
+                "ep_defragmenter_enabled",
+                "ep_defragmenter_interval",
+                "ep_defragmenter_num_moved",
+                "ep_defragmenter_num_visited",
+                "ep_degraded_mode",
+                "ep_diskqueue_drain",
+                "ep_diskqueue_fill",
+                "ep_diskqueue_items",
+                "ep_diskqueue_memory",
+                "ep_diskqueue_pending",
+                "ep_enable_chk_merge",
+                "ep_exp_pager_enabled",
+                "ep_exp_pager_initial_run_time",
+                "ep_exp_pager_stime",
+                "ep_expired_access",
+                "ep_expired_compactor",
+                "ep_expired_pager",
+                "ep_expiry_pager_task_time",
+                "ep_failpartialwarmup",
+                "ep_flush_all",
+                "ep_flush_duration_total",
+                "ep_flushall_enabled",
+                "ep_flusher_state",
+                "ep_flusher_todo",
+                "ep_getl_default_timeout",
+                "ep_getl_max_timeout",
+                "ep_hlc_ahead_threshold_us",
+                "ep_hlc_behind_threshold_us",
+                "ep_ht_locks",
+                "ep_ht_size",
+                "ep_initfile",
+                "ep_io_compaction_read_bytes",
+                "ep_io_compaction_write_bytes",
+                "ep_io_total_read_bytes",
+                "ep_io_total_write_bytes",
+                "ep_item_begin_failed",
+                "ep_item_commit_failed",
+                "ep_item_eviction_policy",
+                "ep_item_flush_expired",
+                "ep_item_flush_failed",
+                "ep_item_num",
+                "ep_item_num_based_new_chk",
+                "ep_items_rm_from_checkpoints",
+                "ep_keep_closed_chks",
+                "ep_kv_size",
+                "ep_max_bg_remaining_jobs",
+                "ep_max_checkpoints",
+                "ep_max_failover_entries",
+                "ep_max_item_size",
+                "ep_max_num_auxio",
+                "ep_max_num_nonio",
+                "ep_max_num_readers",
+                "ep_max_num_shards",
+                "ep_max_num_workers",
+                "ep_max_num_writers",
+                "ep_max_size",
+                "ep_max_threads",
+                "ep_max_vbuckets",
+                "ep_mem_high_wat",
+                "ep_mem_high_wat_percent",
+                "ep_mem_low_wat",
+                "ep_mem_low_wat_percent",
+                "ep_mem_tracker_enabled",
+                "ep_meta_data_disk",
+                "ep_meta_data_memory",
+                "ep_mlog_compactor_runs",
+                "ep_mutation_mem_threshold",
+                "ep_num_access_scanner_runs",
+                "ep_num_access_scanner_skips",
+                "ep_num_eject_failures",
+                "ep_num_expiry_pager_runs",
+                "ep_num_non_resident",
+                "ep_num_not_my_vbuckets",
+                "ep_num_ops_del_meta",
+                "ep_num_ops_del_meta_res_fail",
+                "ep_num_ops_del_ret_meta",
+                "ep_num_ops_get_meta",
+                "ep_num_ops_get_meta_on_set_meta",
+                "ep_num_ops_set_meta",
+                "ep_num_ops_set_meta_res_fail",
+                "ep_num_ops_set_ret_meta",
+                "ep_num_pager_runs",
+                "ep_num_value_ejects",
+                "ep_num_workers",
+                "ep_oom_errors",
+                "ep_overhead",
+                "ep_pager_active_vb_pcnt",
+                "ep_pending_compactions",
+                "ep_pending_ops",
+                "ep_pending_ops_max",
+                "ep_pending_ops_max_duration",
+                "ep_pending_ops_total",
+                "ep_persist_vbstate_total",
+                "ep_postInitfile",
+                "ep_queue_size",
+                "ep_replica_ahead_exceptions",
+                "ep_replica_behind_exceptions",
+                "ep_replica_hlc_drift",
+                "ep_replica_hlc_drift_count",
+                "ep_replication_throttle_cap_pcnt",
+                "ep_replication_throttle_queue_cap",
+                "ep_replication_throttle_threshold",
+                "ep_rollback_count",
+                "ep_startup_time",
+                "ep_storage_age",
+                "ep_storage_age_highwat",
+                "ep_storedval_num",
+                "ep_storedval_overhead",
+                "ep_storedval_size",
+                "ep_tap_ack_grace_period",
+                "ep_tap_ack_initial_sequence_number",
+                "ep_tap_ack_interval",
+                "ep_tap_ack_window_size",
+                "ep_tap_backfill_resident",
+                "ep_tap_backlog_limit",
+                "ep_tap_backoff_period",
+                "ep_tap_bg_fetch_requeued",
+                "ep_tap_bg_fetched",
+                "ep_tap_bg_max_pending",
+                "ep_tap_keepalive",
+                "ep_tap_noop_interval",
+                "ep_tap_requeue_sleep_time",
+                "ep_time_synchronization",
+                "ep_tmp_oom_errors",
+                "ep_total_cache_size",
+                "ep_total_del_items",
+                "ep_total_enqueued",
+                "ep_total_new_items",
+                "ep_total_persisted",
+                "ep_uncommitted_items",
+                "ep_uuid",
+                "ep_value_size",
+                "ep_vb0",
+                "ep_vb_snapshot_total",
+                "ep_vb_total",
+                "ep_vbucket_del",
+                "ep_vbucket_del_fail",
+                "ep_version",
+                "ep_waitforwarmup",
+                "ep_warmup",
+                "ep_warmup_batch_size",
+                "ep_warmup_dups",
+                "ep_warmup_min_items_threshold",
+                "ep_warmup_min_memory_threshold",
+                "ep_warmup_oom",
+                "ep_warmup_thread",
+                "ep_warmup_time",
+                "ep_workload_pattern",
+                "mem_used",
+                "rollback_item_count",
+                "vb_active_curr_items",
+                "vb_active_eject",
+                "vb_active_expired",
+                "vb_active_ht_memory",
+                "vb_active_itm_memory",
+                "vb_active_meta_data_disk",
+                "vb_active_meta_data_memory",
+                "vb_active_num",
+                "vb_active_num_non_resident",
+                "vb_active_ops_create",
+                "vb_active_ops_delete",
+                "vb_active_ops_reject",
+                "vb_active_ops_update",
+                "vb_active_perc_mem_resident",
+                "vb_active_queue_age",
+                "vb_active_queue_drain",
+                "vb_active_queue_fill",
+                "vb_active_queue_memory",
+                "vb_active_queue_pending",
+                "vb_active_queue_size",
+                "vb_active_rollback_item_count",
+                "vb_dead_num",
+                "vb_pending_curr_items",
+                "vb_pending_eject",
+                "vb_pending_expired",
+                "vb_pending_ht_memory",
+                "vb_pending_itm_memory",
+                "vb_pending_meta_data_disk",
+                "vb_pending_meta_data_memory",
+                "vb_pending_num",
+                "vb_pending_num_non_resident",
+                "vb_pending_ops_create",
+                "vb_pending_ops_delete",
+                "vb_pending_ops_reject",
+                "vb_pending_ops_update",
+                "vb_pending_perc_mem_resident",
+                "vb_pending_queue_age",
+                "vb_pending_queue_drain",
+                "vb_pending_queue_fill",
+                "vb_pending_queue_memory",
+                "vb_pending_queue_pending",
+                "vb_pending_queue_size",
+                "vb_pending_rollback_item_count",
+                "vb_replica_curr_items",
+                "vb_replica_eject",
+                "vb_replica_expired",
+                "vb_replica_ht_memory",
+                "vb_replica_itm_memory",
+                "vb_replica_meta_data_disk",
+                "vb_replica_meta_data_memory",
+                "vb_replica_num",
+                "vb_replica_num_non_resident",
+                "vb_replica_ops_create",
+                "vb_replica_ops_delete",
+                "vb_replica_ops_reject",
+                "vb_replica_ops_update",
+                "vb_replica_perc_mem_resident",
+                "vb_replica_queue_age",
+                "vb_replica_queue_drain",
+                "vb_replica_queue_fill",
+                "vb_replica_queue_memory",
+                "vb_replica_queue_pending",
+                "vb_replica_queue_size",
+                "vb_replica_rollback_item_count"
+            }
         }
     };
 
     bool error = false;
     for (const auto& entry : statsKeys) {
+
         vals.clear();
         checkeq(ENGINE_SUCCESS,
-                h1->get_stats(h, nullptr, entry.first.data(),
+                h1->get_stats(h, nullptr, entry.first.empty() ?
+                                            nullptr : entry.first.data(),
                               entry.first.size(), add_stats),
                 (std::string("Failed to get stats: ") + entry.first).c_str());
 
+        std::unordered_set<std::string> accountedFor;
         for (const auto& key : entry.second) {
             auto iter = vals.find(key);
             if (iter == vals.end()) {
@@ -6278,15 +6581,23 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
                 fprintf(stderr, "Missing stat:  %s from stat group %s\n",
                         key.c_str(),
                         entry.first.c_str());
+            } else {
+                accountedFor.insert(key);
             }
         }
 
         if (entry.second.size() != vals.size()) {
             fprintf(stderr,
-                    "Incorrect number of stats returned for stat group %s: %lu != %lu\n",
+                    "Incorrect number of stats returned for stat group %s: %lu != %lu\n"
+                    " Unaccounted for stat keys:\n",
                     entry.first.c_str(), (unsigned long)entry.second.size(),
                     (unsigned long)vals.size());
             error = true;
+            for (const auto& statPair : vals) {
+                if (accountedFor.count(statPair.first) == 0) {
+                    fprintf(stderr, "  \"%s\",\n", statPair.first.c_str());
+                }
+            }
         }
     }
 
index 628c996..385ca71 100644 (file)
@@ -23,6 +23,7 @@
 
 #include "ep_test_apis.h"
 #include "ep_testsuite_common.h"
+#include "hlc.h"
 
 #include <platform/cb_malloc.h>
 
@@ -1537,6 +1538,202 @@ static enum test_result test_getMeta_with_item_eviction(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+static enum test_result test_set_with_meta_and_check_drift_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+    // Activate n vbuckets (vb 0 is already)
+    const int n_vbuckets = 10;
+    for (int ii = 1; ii < n_vbuckets; ii++) {
+        check(set_vbucket_state(h, h1, ii, vbucket_state_active),
+              "Failed to set vbucket state.");
+    }
+
+    // Let's make vbucket n/2 be the one who is ahead, n/3 is behind
+    const int aheadVb = n_vbuckets/2;
+    const int behindVb = n_vbuckets/3;
+    checkne(aheadVb, behindVb, "Cannot have the same VB as ahead/behind");
+
+    HLC hlc(0/*init HLC*/, 0/*ahead threshold*/, 0/*behind threshold*/);
+
+    // grab the drift behind threshold
+    uint64_t driftBehindThreshold = get_ull_stat(h, h1,
+                                                 "ep_hlc_ahead_threshold_us",
+                                                 nullptr);
+    // Create n keys
+    const int n_keys = 5;
+    for (int ii = 0 ; ii < n_vbuckets; ii++) {
+        for (int k = 0; k < n_keys; k++) {
+            std::string key = "key_" + std::to_string(k);
+            ItemMetaData itm_meta;
+            itm_meta.cas = hlc.nextHLC();
+            if (ii == aheadVb) {
+                // Push this guy *far* ahead (1 year)
+                itm_meta.cas += 3154E10;
+            } else if(ii == behindVb) {
+                // just be sure it was already greater then 1 + driftthreshold
+                checkge(itm_meta.cas, uint64_t(1) + driftBehindThreshold,
+                        "HLC was already zero");
+                // set to be way way behind...
+                itm_meta.cas = 1;
+            }
+            set_with_meta(h, h1, key.data(), key.size(), NULL, 0, ii, &itm_meta,
+                          0, false, PROTOCOL_BINARY_RAW_BYTES, true, 0);
+            checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
+                    "Expected success");
+        }
+    }
+
+    // Bucket stats should report drift
+    checkge(get_ull_stat(h, h1, "ep_active_hlc_drift"), uint64_t(0),
+            "Expected drift above zero");
+    checkeq(uint64_t(n_keys), get_ull_stat(h, h1, "ep_active_hlc_drift_count"),
+            "Expected ahead counter to match mutations");
+
+    // Victim VBs should have exceptions
+    {
+        std::string vbAheadName = "vb_" + std::to_string(aheadVb);
+        std::string ahead_threshold_exceeded = vbAheadName + ":drift_ahead_threshold_exceeded";
+        std::string behind_threshold_exceeded = vbAheadName + ":drift_behind_threshold_exceeded";
+        std::string total_abs_drift = vbAheadName + ":total_abs_drift";
+        std::string details = "vbucket-details " + std::to_string(aheadVb);
+            checkeq(uint64_t(n_keys), get_ull_stat(h, h1, ahead_threshold_exceeded.data(), details.data()),
+                "Expected ahead threshold to match mutations");
+        checkeq(uint64_t(0), get_ull_stat(h, h1, behind_threshold_exceeded.data(), details.data()),
+                "Expected no behind exceptions");
+        checkge(get_ull_stat(h, h1, total_abs_drift.data(), details.data()), uint64_t(0),
+                "Expected some drift");
+    }
+
+    {
+        std::string vbBehindName = "vb_" + std::to_string(behindVb);
+        std::string ahead_threshold_exceeded = vbBehindName + ":drift_ahead_threshold_exceeded";
+        std::string behind_threshold_exceeded = vbBehindName + ":drift_behind_threshold_exceeded";
+        std::string total_abs_drift = vbBehindName + ":total_abs_drift";
+        std::string details = "vbucket-details " + std::to_string(behindVb);
+        checkeq(uint64_t(n_keys), get_ull_stat(h, h1, behind_threshold_exceeded.data(), details.data()),
+                "Expected behind threshold to match mutations");
+        checkeq(uint64_t(0), get_ull_stat(h, h1, ahead_threshold_exceeded.data(), details.data()),
+                "Expected no ahead exceptions");
+        checkge(get_ull_stat(h, h1, total_abs_drift.data(), details.data()), uint64_t(0),
+                "Expected some drift");
+    }
+
+
+    return SUCCESS;
+}
+
+static enum test_result test_del_with_meta_and_check_drift_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+    // Activate n vbuckets (vb 0 is already)
+    const int n_vbuckets = 10;
+    for (int ii = 1; ii < n_vbuckets; ii++) {
+        check(set_vbucket_state(h, h1, ii, vbucket_state_active),
+              "Failed to set vbucket state.");
+    }
+
+    // Let's make vbucket n/2 be the one who is ahead, n/3 is behind
+    const int aheadVb = n_vbuckets/2;
+    const int behindVb = n_vbuckets/3;
+    checkne(aheadVb, behindVb, "Cannot have the same VB as ahead/behind");
+
+    HLC hlc(0/*init HLC*/, 0/*ahead threshold*/, 0/*behind threshold*/);
+
+    // grab the drift behind threshold
+    uint64_t driftBehindThreshold = get_ull_stat(h, h1,
+                                                 "ep_hlc_ahead_threshold_us",
+                                                 nullptr);
+    // Create n keys * n_vbuckets
+    const int n_keys = 5;
+    for (int ii = 0 ; ii < n_vbuckets; ii++) {
+        for (int k = 0; k < n_keys; k++) {
+            std::string key = "key_" + std::to_string(k);
+
+            // In the del_with_meta test we want to pretend a del_wm came from
+            // the past, so we want to ensure a delete doesn't get rejected
+            // by LWW conflict resolution, thus write all documents that are
+            // going to be deleted with set_with_meta, and write them way in the past.
+            // This will trigger threshold and increment drift stats... so we
+            // account for these later
+            ItemMetaData itm_meta;
+            itm_meta.cas = 1; // set to 1
+            set_with_meta(h, h1, key.data(), key.size(), NULL, 0, ii, &itm_meta,
+                          0, false, PROTOCOL_BINARY_RAW_BYTES, true, 0);
+            checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
+                    "Expected success");
+        }
+    }
+
+    checkeq(uint64_t(0), get_ull_stat(h, h1, "ep_active_ahead_exceptions"),
+            "Expected ahead counter to match mutations");
+    checkeq(uint64_t(n_keys*n_vbuckets), get_ull_stat(h, h1, "ep_active_behind_exceptions"),
+            "Expected behind counter to match mutations");
+
+    // Del_with_meta n_keys to n_vbuckets
+    for (int ii = 0 ; ii < n_vbuckets; ii++) {
+        for (int k = 0; k < n_keys; k++) {
+            std::string key = "key_" + std::to_string(k);
+            ItemMetaData itm_meta;
+            itm_meta.cas = hlc.nextHLC();
+            if (ii == aheadVb) {
+                // Push this guy *far* ahead (1 year)
+                itm_meta.cas += 3154E10;
+            } else if(ii == behindVb) {
+                // just be sure it was already greater than 1 + driftthreshold
+                checkge(itm_meta.cas, uint64_t(1) + driftBehindThreshold,
+                        "HLC was already zero");
+                // set to be way way behind, but ahead of the documents we have set
+                itm_meta.cas = 2;
+            }
+            del_with_meta(h, h1, key.data(), key.size(), ii, &itm_meta,
+                          1, false, PROTOCOL_BINARY_RAW_BYTES);
+            checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
+                    "Expected success");
+        }
+    }
+
+    // Bucket stats should report drift
+    checkge(get_ull_stat(h, h1, "ep_active_hlc_drift"), uint64_t(0),
+            "Expected drift above zero");
+    checkeq(uint64_t(n_keys*2), get_ull_stat(h, h1, "ep_active_hlc_drift_count"),
+            "Expected ahead counter to match mutations");
+
+    // and should report total exception of all VBs
+    checkeq(uint64_t(n_keys), get_ull_stat(h, h1, "ep_active_ahead_exceptions"),
+            "Expected ahead counter to match mutations");
+    checkeq(uint64_t(n_keys + (n_keys*n_vbuckets)), get_ull_stat(h, h1, "ep_active_behind_exceptions"),
+            "Expected behind counter to match mutations");
+
+    // Victim VBs should have exceptions
+    {
+        std::string vbAheadName = "vb_" + std::to_string(aheadVb);
+        std::string ahead_threshold_exceeded = vbAheadName + ":drift_ahead_threshold_exceeded";
+        std::string behind_threshold_exceeded = vbAheadName + ":drift_behind_threshold_exceeded";
+        std::string total_abs_drift = vbAheadName + ":total_abs_drift";
+        std::string details = "vbucket-details " + std::to_string(aheadVb);
+
+        checkeq(uint64_t(n_keys),
+                get_ull_stat(h, h1, ahead_threshold_exceeded.data(), details.data()),
+                "Expected ahead threshold to match mutations");
+        checkge(get_ull_stat(h, h1, total_abs_drift.data(), details.data()), uint64_t(0),
+                "Expected some drift");
+    }
+
+    {
+        std::string vbBehindName = "vb_" + std::to_string(behindVb);
+        std::string ahead_threshold_exceeded = vbBehindName + ":drift_ahead_threshold_exceeded";
+        std::string behind_threshold_exceeded = vbBehindName + ":drift_behind_threshold_exceeded";
+        std::string total_abs_drift = vbBehindName + ":total_abs_drift";
+        std::string details = "vbucket-details " + std::to_string(behindVb);
+
+        // *2 behind due to the initial set_with_meta
+        checkeq(uint64_t(n_keys*2), get_ull_stat(h, h1, behind_threshold_exceeded.data(), details.data()),
+                "Expected behind threshold to match mutations");
+        checkeq(uint64_t(0), get_ull_stat(h, h1, ahead_threshold_exceeded.data(), details.data()),
+                "Expected no ahead exceptions");
+        checkge(get_ull_stat(h, h1, total_abs_drift.data(), details.data()), uint64_t(0),
+                "Expected some drift");
+    }
+
+
+    return SUCCESS;
+}
 
 // Test manifest //////////////////////////////////////////////////////////////
 
@@ -1619,5 +1816,14 @@ BaseTestCase testsuite_testcases[] = {
                  test_getMeta_with_item_eviction, test_setup, teardown,
                  "item_eviction_policy=full_eviction", prepare, cleanup),
 
+        TestCase("test set_with_meta and drift stats",
+                 test_set_with_meta_and_check_drift_stats, test_setup,
+                 teardown, "hlc_ahead_threshold_us=5000000;hlc_behind_threshold_us=0;conflict_resolution_type=lww",
+                 prepare, cleanup),
+        TestCase("test del_with_meta and drift stats",
+                 test_del_with_meta_and_check_drift_stats, test_setup,
+                 teardown, "hlc_ahead_threshold_us=0;hlc_behind_threshold_us=5000000;conflict_resolution_type=lww",
+                 prepare, cleanup),
+
         TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
 };
index 9a34783..bfa6595 100644 (file)
@@ -69,7 +69,7 @@ protected:
                               checkpoint_config, /*kvshard*/NULL,
                               /*lastSeqno*/1000, /*lastSnapStart*/0,
                               /*lastSnapEnd*/0, /*table*/NULL,
-                              callback)) {
+                              callback, config)) {
         createManager();
     }
 
@@ -83,7 +83,7 @@ protected:
 
     EPStats global_stats;
     CheckpointConfig checkpoint_config;
-
+    Configuration config;
     std::shared_ptr<Callback<uint16_t> > callback;
     RCPtr<VBucket> vbucket;
     std::unique_ptr<CheckpointManager> manager;
@@ -193,7 +193,7 @@ TEST_F(CheckpointTest, basic_chk_test) {
     std::shared_ptr<Callback<uint16_t> > cb(new DummyCB());
     RCPtr<VBucket> vbucket(new VBucket(0, vbucket_state_active, global_stats,
                                        checkpoint_config, NULL, 0, 0, 0, NULL,
-                                       cb));
+                                       cb, config));
 
     CheckpointManager *checkpoint_manager = new CheckpointManager(global_stats, 0,
                                                                   checkpoint_config,
@@ -291,7 +291,7 @@ TEST_F(CheckpointTest, reset_checkpoint_id) {
     std::shared_ptr<Callback<uint16_t> > cb(new DummyCB());
     RCPtr<VBucket> vbucket(new VBucket(0, vbucket_state_active, global_stats,
                                        checkpoint_config, NULL, 0, 0, 0, NULL,
-                                       cb));
+                                       cb, config));
     CheckpointManager *manager =
         new CheckpointManager(global_stats, 0, checkpoint_config, 1, 0, 0, cb);
 
index cb040b4..1e8666a 100644 (file)
@@ -129,8 +129,8 @@ protected:
 
         /* Create the vbucket */
         std::shared_ptr<Callback<uint16_t> > cb(new DummyCB());
-        vbucket.reset(new VBucket(0, vbucket_state_active, stats, config,
-                                  nullptr, 0, 0, 0, nullptr, cb));
+        vbucket.reset(new VBucket(0, vbucket_state_active, stats, chkConfig,
+                                  nullptr, 0, 0, 0, nullptr, cb, config));
     }
 
     static void TearDownTestCase() {
@@ -140,12 +140,14 @@ protected:
     }
 
     static EPStats stats;
-    static CheckpointConfig config;
+    static CheckpointConfig chkConfig;
+    static Configuration config;
     static std::unique_ptr<VBucket> vbucket;
 };
 
 EPStats DefragmenterBenchmarkTest::stats;
-CheckpointConfig DefragmenterBenchmarkTest::config;
+CheckpointConfig DefragmenterBenchmarkTest::chkConfig;
+Configuration DefragmenterBenchmarkTest::config;
 std::unique_ptr<VBucket> DefragmenterBenchmarkTest::vbucket;
 
 
@@ -246,9 +248,10 @@ TEST_F(DefragmenterTest, DISABLED_MappedMemory) {
     /* Create the vbucket */
     std::shared_ptr<Callback<uint16_t> > cb(new DummyCB());
     EPStats stats;
-    CheckpointConfig config;
-    VBucket vbucket(0, vbucket_state_active, stats, config, nullptr, 0, 0, 0,
-                    nullptr, cb);
+    CheckpointConfig chkConfig;
+    Configuration config;
+    VBucket vbucket(0, vbucket_state_active, stats, chkConfig, nullptr, 0, 0, 0,
+                    nullptr, cb, config);
 
     // 1. Create a number of small documents. Doesn't really matter that
     //    they are small, main thing is we create enough to span multiple