MB-21154: set_param support for drift thresholds 25/68325/11
authorJim Walker <jim@couchbase.com>
Tue, 4 Oct 2016 14:56:20 +0000 (15:56 +0100)
committerDave Rigby <daver@couchbase.com>
Wed, 12 Oct 2016 07:43:07 +0000 (07:43 +0000)
Enable the set_param command to change the ahead and behind drift
thresholds, enabling changes without a bucket restart.

cbepctl is updated so that it can send drift threshold changes,
but they're not publicised.

> cbepctl ... set vbucket_param hlc_drift_ahead_threshold_us 1

Change-Id: I4973d0c36bfa03ff33e50924b7c10434675d90da
Reviewed-on: http://review.couchbase.org/68325
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
management/cbepctl
management/memcacheConstants.py
src/ep_engine.cc
src/hlc.h
src/vbucket.h
src/vbucketmap.cc
src/vbucketmap.h
tests/ep_testsuite_xdcr.cc

index cbca983..5e6b818 100755 (executable)
@@ -51,6 +51,8 @@ def set_param(mc, type, key, val):
         engine_param = memcacheConstants.ENGINE_PARAM_TAP
     elif type == 'dcp_param':
         engine_param = memcacheConstants.ENGINE_PARAM_DCP
+    elif type == 'vbucket_param':
+        engine_param = memcacheConstants.ENGINE_PARAM_VBUCKET
     else:
         print 'Error: Bad parameter %s' % type
 
index 527e464..2f60009 100644 (file)
@@ -106,6 +106,7 @@ ENGINE_PARAM_FLUSH      = 1
 ENGINE_PARAM_TAP        = 2
 ENGINE_PARAM_CHECKPOINT = 3
 ENGINE_PARAM_DCP        = 4
+ENGINE_PARAM_VBUCKET    = 5
 
 
 COMMAND_NAMES = dict(((globals()[k], k) for k in globals() if k.startswith("CMD_")))
index 3bc4801..17943fa 100644 (file)
@@ -606,6 +606,32 @@ extern "C" {
         return rv;
     }
 
+    static protocol_binary_response_status setVbucketParam(
+                                                    EventuallyPersistentEngine *e,
+                                                    const char *keyz,
+                                                    const char *valz,
+                                                    std::string& msg) {
+        protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
+        try {
+            if (strcmp(keyz, "hlc_drift_ahead_threshold_us") == 0) {
+                uint64_t v = std::strtoull(valz, nullptr, 10);
+                checkNumeric(valz);
+                e->getConfiguration().setHlcAheadThresholdUs(v);
+            } else if (strcmp(keyz, "hlc_drift_behind_threshold_us") == 0) {
+                uint64_t v = std::strtoull(valz, nullptr, 10);
+                checkNumeric(valz);
+                e->getConfiguration().setHlcBehindThresholdUs(v);
+            } else {
+                msg = "Unknown config param";
+                rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+            }
+        } catch (std::runtime_error& ex) {
+            msg = "Value out of range.";
+            rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
+        }
+        return rv;
+    }
+
     static protocol_binary_response_status evictKey(
                                                  EventuallyPersistentEngine *e,
                                                  protocol_binary_request_header
@@ -836,6 +862,9 @@ extern "C" {
         case protocol_binary_engine_param_dcp:
             rv = setDcpParam(e, keyz, valz, msg);
             break;
+        case protocol_binary_engine_param_vbucket:
+            rv = setVbucketParam(e, keyz, valz, msg);
+            break;
         default:
             rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
         }
index 293617f..0264e52 100644 (file)
--- a/src/hlc.h
+++ b/src/hlc.h
@@ -122,6 +122,14 @@ public:
         return {driftAheadExceeded, driftBehindExceeded};
     }
 
+    void setDriftAheadThreshold(uint64_t threshold) {
+        driftAheadThreshold = threshold;
+    }
+
+    void setDriftBehindThreshold(uint64_t threshold) {
+        driftBehindThreshold = threshold;
+    }
+
     void addStats(const std::string& prefix, ADD_STAT add_stat, const void *c) const {
         add_prefixed_stat(prefix.data(), "max_cas", getMaxHLC(), add_stat, c);
         add_prefixed_stat(prefix.data(), "total_abs_drift", cummulativeDrift.load(), add_stat, c);
index 286918b..9fd130f 100644 (file)
@@ -256,6 +256,14 @@ public:
         return hlc.getDriftExceptionCounters();
     }
 
+    void setHLCDriftAheadThreshold(uint64_t threshold) {
+        hlc.setDriftAheadThreshold(threshold);
+    }
+
+    void setHLCDriftBehindThreshold(uint64_t threshold) {
+        hlc.setDriftBehindThreshold(threshold);
+    }
+
     bool isTakeoverBackedUp() {
         return takeover_backed_up.load();
     }
index 312bd1a..fa5593d 100644 (file)
@@ -193,3 +193,30 @@ KVShard* VBucketMap::getShard(KVShard::id_type shardId) const {
 size_t VBucketMap::getNumShards() const {
     return shards.size();
 }
+
+void VBucketMap::setHLCDriftAheadThreshold(uint64_t threshold) {
+    for (id_type id = 0; id < size; id++) {
+        auto vb = getBucket(id);
+        if (vb) {
+            vb->setHLCDriftAheadThreshold(threshold);
+        }
+    }
+}
+
+void VBucketMap::setHLCDriftBehindThreshold(uint64_t threshold) {
+    for (id_type id = 0; id < size; id++) {
+        auto vb = getBucket(id);
+        if (vb) {
+            vb->setHLCDriftBehindThreshold(threshold);
+        }
+    }
+}
+
+void VBucketMap::VBucketConfigChangeListener::sizeValueChanged(const std::string &key,
+                                                   size_t value) {
+    if (key == "hlc_drift_ahead_threshold_us") {
+        map.setHLCDriftAheadThreshold(value);
+    } else if (key == "hlc_drift_behind_threshold_us") {
+        map.setHLCDriftBehindThreshold(value);
+    }
+}
\ No newline at end of file
index 2794ee3..f7d7fe9 100644 (file)
@@ -33,6 +33,17 @@ class VBucketMap {
 friend class EventuallyPersistentStore;
 friend class Warmup;
 
+    class VBucketConfigChangeListener : public ValueChangedListener {
+    public:
+        VBucketConfigChangeListener(VBucketMap& vbucketMap)
+            : map(vbucketMap) {}
+
+        void sizeValueChanged(const std::string &key, size_t value) override;
+
+    private:
+        VBucketMap& map;
+    };
+
 public:
 
     // This class uses the same id_type as VBucket
@@ -63,6 +74,8 @@ public:
     KVShard* getShardByVbId(id_type id) const;
     KVShard* getShard(KVShard::id_type shardId) const;
     size_t getNumShards() const;
+    void setHLCDriftAheadThreshold(uint64_t threshold);
+    void setHLCDriftBehindThreshold(uint64_t threshold);
 
 private:
 
index 385ca71..97753f9 100644 (file)
@@ -1735,6 +1735,30 @@ static enum test_result test_del_with_meta_and_check_drift_stats(ENGINE_HANDLE *
     return SUCCESS;
 }
 
+static enum test_result test_setting_drift_threshold(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+
+    std::vector<std::pair<std::string, std::string> > configData =
+        {{"ep_hlc_ahead_threshold_us", "hlc_drift_ahead_threshold_us"},
+         {"ep_hlc_behind_threshold_us", "hlc_drift_behind_threshold_us"}};
+
+    std::vector<std::pair<std::string, uint64_t> > values =
+        {{"0", 0}, {"1", 1}, {"-1", -1}, {"-0", 0},
+         {"18446744073709551615", 18446744073709551615ull}};
+
+    for (auto data : values) {
+        for (auto conf : configData) {
+            check(set_param(h, h1, protocol_binary_engine_param_vbucket,
+                    conf.second.data(), data.first.data()),
+                "Expected set_param success");
+
+            checkeq(data.second,
+                    get_ull_stat(h, h1, conf.first.data(), nullptr),
+                    "Expected the stat to change to the new value");
+        }
+    }
+    return SUCCESS;
+}
+
 // Test manifest //////////////////////////////////////////////////////////////
 
 const char *default_dbname = "./ep_testsuite_xdcr";
@@ -1824,6 +1848,10 @@ BaseTestCase testsuite_testcases[] = {
                  test_del_with_meta_and_check_drift_stats, test_setup,
                  teardown, "hlc_ahead_threshold_us=0;hlc_behind_threshold_us=5000000;conflict_resolution_type=lww",
                  prepare, cleanup),
+        TestCase("test setting drift threshold",
+                 test_setting_drift_threshold, test_setup,
+                 teardown, nullptr,
+                 prepare, cleanup),
 
         TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
 };