MB-21190: cbepctl support to change max_cas 17/68517/8
authorJim Walker <jim@couchbase.com>
Fri, 7 Oct 2016 10:37:48 +0000 (12:37 +0200)
committerDave Rigby <daver@couchbase.com>
Wed, 12 Oct 2016 07:43:15 +0000 (07:43 +0000)
If the max_cas of a vbucket is forced forward by a
peer with a 'bad' clock, there are limited recovery
options. Allowing the max_cas to be "reset" could
be useful to recover from such a problem.

Change-Id: I9235520283ee1cd0d5b49820190a9eed3daac3c2
Reviewed-on: http://review.couchbase.org/68517
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
management/cbepctl
management/mc_bin_client.py
src/ep.cc
src/ep.h
src/ep_engine.cc
src/hlc.h
src/vbucket.h
tests/ep_test_apis.cc
tests/ep_test_apis.h
tests/ep_testsuite_basic.cc

index 5e6b818..114803f 100755 (executable)
@@ -41,7 +41,7 @@ def cmd(f):
     return g
 
 @cmd
-def set_param(mc, type, key, val):
+def set_param(mc, type, key, val, vbucket):
     engine_param = None
     if type == 'checkpoint_param':
         engine_param = memcacheConstants.ENGINE_PARAM_CHECKPOINT
@@ -75,7 +75,7 @@ def set_param(mc, type, key, val):
             val = str(int(float(_x_)*(_quota_)/100))
 
     try:
-        mc.set_param(key, val, engine_param)
+        mc.set_param(int(vbucket), key, val, engine_param)
         print 'set %s to %s' %(key, val)
     except mc_bin_client.MemcachedError, error:
         print 'Error: %s' % error.msg
@@ -250,11 +250,16 @@ Available params for "set":
     dcp_consumer_process_buffered_messages_batch_size - The number of items the
                                                         DCP processor will consume
                                                         in a single batch.
+  Available params for "set vbucket_param":
+    max_cas - Change the max_cas of a vbucket. The value and vbucket are specified as decimal
+              integers. The new-value is interpretted as an unsigned 64-bit integer.
+
+              cbepctl host:port -b default set vbucket_param max_cas <new-value> <vbucket-id>
 
     """)
 
     c.addCommand('drain', drain, "drain")
-    c.addCommand('set', set_param, 'set type param value')
+    c.addCommand('set', set_param, 'set type param value vbucket')
     c.addCommand('start', start, 'start')
     c.addCommand('stop', stop, 'stop')
     c.addFlag('-a', 'allBuckets', 'iterate over all buckets (requires admin u/p)')
index 6d7526c..1411e95 100644 (file)
@@ -276,8 +276,9 @@ class MemcachedClient(object):
     def start_persistence(self):
         return self._doCmd(memcacheConstants.CMD_START_PERSISTENCE, '', '')
 
-    def set_param(self, key, val, type):
+    def set_param(self, vbucket, key, val, type):
         print "setting param:", key, val
+        self.vbucketId = vbucket
         type = struct.pack(memcacheConstants.SET_PARAM_FMT, type)
         return self._doCmd(memcacheConstants.CMD_SET_PARAM, key, val, type)
 
index 922f1ce..f2bb606 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -4147,6 +4147,16 @@ void EventuallyPersistentStore::setCursorDroppingLowerUpperThresholds(
                     ((double)(config.getCursorDroppingUpperMark()) / 100)));
 }
 
+ENGINE_ERROR_CODE EventuallyPersistentStore::forceMaxCas(uint16_t vbucket,
+                                                         uint64_t cas) {
+    RCPtr<VBucket> vb = vbMap.getBucket(vbucket);
+    if (vb) {
+        vb->forceMaxCas(cas);
+        return ENGINE_SUCCESS;
+    }
+    return ENGINE_NOT_MY_VBUCKET;
+}
+
 std::ostream& operator<<(std::ostream& os,
                          const EventuallyPersistentStore::Position& pos) {
     os << "vbucket:" << pos.vbucket_id;
index 6465736..b9325ee 100644 (file)
--- a/src/ep.h
+++ b/src/ep.h
@@ -894,6 +894,12 @@ public:
     //Check if there were any out-of-memory errors during warmup
     bool isWarmupOOMFailure(void);
 
+    /*
+     * Change the max_cas of the specified vbucket to cas without any
+     * care for the data or ongoing operations...
+     */
+    ENGINE_ERROR_CODE forceMaxCas(uint16_t vbucket, uint64_t cas);
+
 protected:
     // During the warmup phase we might want to enable external traffic
     // at a given point in time.. The LoadStorageKvPairCallback will be
index 17943fa..9be74fa 100644 (file)
@@ -608,6 +608,7 @@ extern "C" {
 
     static protocol_binary_response_status setVbucketParam(
                                                     EventuallyPersistentEngine *e,
+                                                    uint16_t vbucket,
                                                     const char *keyz,
                                                     const char *valz,
                                                     std::string& msg) {
@@ -621,6 +622,15 @@ extern "C" {
                 uint64_t v = std::strtoull(valz, nullptr, 10);
                 checkNumeric(valz);
                 e->getConfiguration().setHlcBehindThresholdUs(v);
+            } else if (strcmp(keyz, "max_cas") == 0) {
+                uint64_t v = std::strtoull(valz, nullptr, 10);
+                checkNumeric(valz);
+                LOG(EXTENSION_LOG_WARNING, "setVbucketParam max_cas=%" PRIu64 " "
+                                           "vbucket=%" PRIu16 "\n", v, vbucket);
+                if (e->getEpStore()->forceMaxCas(vbucket, v) != ENGINE_SUCCESS) {
+                    rv = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
+                    msg = "Not my vbucket";
+                }
             } else {
                 msg = "Unknown config param";
                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
@@ -816,6 +826,7 @@ extern "C" {
         size_t keylen = ntohs(req->message.header.request.keylen);
         uint8_t extlen = req->message.header.request.extlen;
         size_t vallen = ntohl(req->message.header.request.bodylen);
+        uint16_t vbucket = ntohs(req->message.header.request.vbucket);
         protocol_binary_engine_param_t paramtype =
             static_cast<protocol_binary_engine_param_t>(ntohl(req->message.body.param_type));
 
@@ -863,7 +874,7 @@ extern "C" {
             rv = setDcpParam(e, keyz, valz, msg);
             break;
         case protocol_binary_engine_param_vbucket:
-            rv = setVbucketParam(e, keyz, valz, msg);
+            rv = setVbucketParam(e, vbucket, keyz, valz, msg);
             break;
         default:
             rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
index 0264e52..29b256c 100644 (file)
--- a/src/hlc.h
+++ b/src/hlc.h
@@ -108,6 +108,10 @@ public:
         atomic_setIfBigger(maxHLC, hlc);
     }
 
+    void forceMaxHLC(uint64_t hlc) {
+        maxHLC = hlc;
+    }
+
     uint64_t getMaxHLC() const {
         return maxHLC;
     }
index 9fd130f..eabbebb 100644 (file)
@@ -248,6 +248,10 @@ public:
         hlc.setMaxHLCAndTrackDrift(cas);
     }
 
+    void forceMaxCas(uint64_t cas) {
+        hlc.forceMaxHLC(cas);
+    }
+
     HLC::DriftStats getHLCDriftStats() const {
         return hlc.getDriftStats();
     }
index 18a5e19..14ef0a9 100644 (file)
@@ -687,11 +687,11 @@ protocol_binary_request_header* prepare_get_replica(ENGINE_HANDLE *h,
 }
 
 bool set_param(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, protocol_binary_engine_param_t paramtype,
-               const char *param, const char *val) {
+               const char *param, const char *val, uint16_t vb) {
     char ext[4];
     protocol_binary_request_header *pkt;
     encodeExt(ext, static_cast<uint32_t>(paramtype));
-    pkt = createPacket(PROTOCOL_BINARY_CMD_SET_PARAM, 0, 0, ext, sizeof(protocol_binary_engine_param_t), param,
+    pkt = createPacket(PROTOCOL_BINARY_CMD_SET_PARAM, vb, 0, ext, sizeof(protocol_binary_engine_param_t), param,
                        strlen(param), val, strlen(val));
 
     if (h1->unknown_command(h, NULL, pkt, add_response) != ENGINE_SUCCESS) {
index 9ee0afb..983e083 100644 (file)
@@ -196,7 +196,7 @@ prepare_get_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
                     vbucket_state_t state, bool makeinvalidkey = false) CB_MUST_USE_RESULT;
 
 bool set_param(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, protocol_binary_engine_param_t paramtype,
-               const char *param, const char *val);
+               const char *param, const char *val, uint16_t vb = 0);
 bool set_vbucket_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
                        uint16_t vb, vbucket_state_t state);
 bool get_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
index a44837d..dd13dca 100644 (file)
@@ -2513,6 +2513,27 @@ static enum test_result test_CBD_152(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     return SUCCESS;
 }
 
+static enum test_result set_max_cas_mb21190(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
+    uint64_t max_cas = get_ull_stat(h, h1, "vb_0:max_cas", "vbucket-details 0");
+    std::string max_cas_str = std::to_string(max_cas+1);
+    set_param(h, h1, protocol_binary_engine_param_vbucket,
+              "max_cas", max_cas_str.data(), 0);
+    checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
+            "Failed to set_param max_cas");
+    checkeq(max_cas + 1,
+            get_ull_stat(h, h1, "vb_0:max_cas", "vbucket-details 0"),
+            "max_cas didn't change");
+    set_param(h, h1, protocol_binary_engine_param_vbucket,
+              "max_cas", max_cas_str.data(), 1);
+    checkeq(PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, last_status.load(),
+            "Expected not my vbucket for vb 1");
+    set_param(h, h1, protocol_binary_engine_param_vbucket,
+              "max_cas", "JUNK", 0);
+    checkeq(PROTOCOL_BINARY_RESPONSE_EINVAL, last_status.load(),
+            "Expected EINVAL");
+    return SUCCESS;
+}
+
 ///////////////////////////////////////////////////////////////////////////////
 // Test manifest //////////////////////////////////////////////////////////////
 ///////////////////////////////////////////////////////////////////////////////
@@ -2656,6 +2677,8 @@ BaseTestCase testsuite_testcases[] = {
         TestCase("flushall params", test_CBD_152, test_setup, teardown,
                  "flushall_enabled=true;max_vbuckets=16;"
                  "ht_size=7;ht_locks=3", prepare, cleanup),
+        TestCase("set max_cas MB21190", set_max_cas_mb21190, test_setup, teardown, nullptr,
+                 prepare, cleanup),
 
         // sentinel
         TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)