MB-17517: check for invalid CAS value in [set/delete]WithMeta 10/58910/3
authorSriram Ganesan <sriram@couchbase.com>
Thu, 21 Jan 2016 20:00:36 +0000 (12:00 -0800)
committerChiyoung Seo <chiyoung@couchbase.com>
Sat, 23 Jan 2016 04:42:33 +0000 (04:42 +0000)
In a [set/delete]WithMeta call from either XDCR or from DCP,
a corrupt CAS value for the incoming item should return an
error

Change-Id: Id87627ae35ef711de4704a960b3aa7d1e9b9a48b
Reviewed-on: http://review.couchbase.org/58910
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
src/ep.cc
tests/ep_testsuite.cc

index 3ed118b..7beac6b 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -923,6 +923,13 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
     }
 }
 
+static bool isValidCas(const uint64_t &itmCas) {
+    if (itmCas == 0 || itmCas == static_cast<uint64_t>(-1)) {
+        return false;
+    }
+    return true;
+}
+
 ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(
                                                         const Item &itm,
                                                         uint8_t nru,
@@ -937,6 +944,11 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(
         return ENGINE_NOT_MY_VBUCKET;
     }
 
+    //check for the incoming item's CAS validity
+    if (!isValidCas(itm.getCas())) {
+        return ENGINE_KEY_EEXISTS;
+    }
+
     int bucket_num(0);
     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
@@ -2063,6 +2075,11 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
         }
     }
 
+    //check for the incoming item's CAS validity
+    if (!isValidCas(itm.getCas())) {
+        return ENGINE_KEY_EEXISTS;
+    }
+
     int bucket_num(0);
     LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
     StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
@@ -2710,6 +2727,11 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
         }
     }
 
+    //check for the incoming item's CAS validity
+    if (!isValidCas(itemMeta->cas)) {
+        return ENGINE_KEY_EEXISTS;
+    }
+
     int bucket_num(0);
     LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
     StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
index c9eb5b0..161db08 100644 (file)
@@ -5711,7 +5711,7 @@ static enum test_result test_dcp_consumer_mutate(ENGINE_HANDLE *h, ENGINE_HANDLE
     char *data = static_cast<char *>(malloc(dataLen));
     memset(data, 'x', dataLen);
 
-    uint8_t cas = 0;
+    uint8_t cas = 0x1;
     uint16_t vbucket = 0;
     uint8_t datatype = 1;
     uint64_t bySeqno = 10;
@@ -5786,7 +5786,7 @@ static enum test_result test_dcp_consumer_mutate_with_time_sync(
     char *data = static_cast<char *>(malloc(dataLen));
     memset(data, 'x', dataLen);
 
-    uint8_t cas = 0;
+    uint8_t cas = 0x1;
     uint16_t vbucket = 0;
     uint8_t datatype = 1;
     uint64_t bySeqno = 10;
@@ -5927,7 +5927,7 @@ static enum test_result test_dcp_consumer_delete_with_time_sync(
 
     const void *cookie = testHarness.create_cookie();
     uint32_t opaque = 0;
-    uint8_t cas = 0;
+    uint8_t cas = 0x1;
     uint16_t vbucket = 0;
     uint32_t flags = 0;
     uint64_t bySeqno = 10;
@@ -6034,7 +6034,7 @@ static void dcp_stream_to_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
                                   uint64_t start, uint64_t end,
                                   uint64_t snap_start_seqno,
                                   uint64_t snap_end_seqno,
-                                  uint8_t cas = 0, uint8_t datatype = 1,
+                                  uint8_t cas = 0x1, uint8_t datatype = 1,
                                   uint32_t exprtime = 0, uint32_t lockTime = 0,
                                   uint64_t revSeqno = 0)
 {
@@ -6214,7 +6214,7 @@ static enum test_result test_tap_rcvr_mutate(ENGINE_HANDLE *h, ENGINE_HANDLE_V1
         char *data = static_cast<char *>(malloc(i));
         memset(data, 'x', i);
         check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                             1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 0,
+                             1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 1,
                              PROTOCOL_BINARY_RAW_BYTES,
                              data, i, 0) == ENGINE_SUCCESS,
               "Failed tap notify.");
@@ -6232,12 +6232,12 @@ static enum test_result test_tap_rcvr_checkpoint(ENGINE_HANDLE *h, ENGINE_HANDLE
     for (int i = 1; i < 10; ++i) {
         data = '0' + i;
         check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                             1, 0, TAP_CHECKPOINT_START, 1, "", 0, 828, 0, 0,
+                             1, 0, TAP_CHECKPOINT_START, 1, "", 0, 828, 0, 1,
                              PROTOCOL_BINARY_RAW_BYTES,
                              &data, 1, 1) == ENGINE_SUCCESS,
               "Failed tap notify.");
         check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                             1, 0, TAP_CHECKPOINT_END, 1, "", 0, 828, 0, 0,
+                             1, 0, TAP_CHECKPOINT_END, 1, "", 0, 828, 0, 1,
                              PROTOCOL_BINARY_RAW_BYTES,
                              &data, 1, 1) == ENGINE_SUCCESS,
               "Failed tap notify.");
@@ -6253,7 +6253,7 @@ static enum test_result test_tap_rcvr_set_vbstate(ENGINE_HANDLE *h, ENGINE_HANDL
     // Get the vbucket UUID before vbucket takeover.
     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_1:0:id", "failovers");
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(vb_state),
-                         1, 0, TAP_VBUCKET_SET, 1, "", 0, 828, 0, 0,
+                         1, 0, TAP_VBUCKET_SET, 1, "", 0, 828, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          "", 0, 1) == ENGINE_SUCCESS,
           "Failed tap notify.");
@@ -6267,7 +6267,7 @@ static enum test_result test_tap_rcvr_mutate_dead(ENGINE_HANDLE *h, ENGINE_HANDL
     char eng_specific[9];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 0,
+                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          "data", 4, 1) == ENGINE_NOT_MY_VBUCKET,
           "Expected not my vbucket.");
@@ -6279,7 +6279,7 @@ static enum test_result test_tap_rcvr_mutate_pending(ENGINE_HANDLE *h, ENGINE_HA
     char eng_specific[9];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 0,
+                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          "data", 4, 1) == ENGINE_SUCCESS,
           "Expected expected success.");
@@ -6291,7 +6291,7 @@ static enum test_result test_tap_rcvr_mutate_replica(ENGINE_HANDLE *h, ENGINE_HA
     char eng_specific[9];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 0,
+                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          "data", 4, 1) == ENGINE_SUCCESS,
           "Expected expected success.");
@@ -6316,7 +6316,7 @@ static enum test_result test_tap_rcvr_mutate_replica_locked(ENGINE_HANDLE *h,
     char eng_specific[9];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 0,
+                         1, 0, TAP_MUTATION, 1, "key", 3, 828, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          "data", 4, 0) == ENGINE_SUCCESS,
           "Expected expected success.");
@@ -6327,7 +6327,7 @@ static enum test_result test_tap_rcvr_delete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1
     char* eng_specific[8];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_DELETION, 0, "key", 3, 0, 0, 0,
+                         1, 0, TAP_DELETION, 0, "key", 3, 0, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          0, 0, 0) == ENGINE_SUCCESS,
           "Failed tap notify.");
@@ -6338,7 +6338,7 @@ static enum test_result test_tap_rcvr_delete_dead(ENGINE_HANDLE *h, ENGINE_HANDL
     char* eng_specific[8];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 0,
+                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          NULL, 0, 1) == ENGINE_NOT_MY_VBUCKET,
           "Expected not my vbucket.");
@@ -6351,7 +6351,7 @@ static enum test_result test_tap_rcvr_delete_pending(ENGINE_HANDLE *h, ENGINE_HA
     char* eng_specific[8];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 0,
+                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          NULL, 0, 1) == ENGINE_SUCCESS,
           "Expected expected success.");
@@ -6364,7 +6364,7 @@ static enum test_result test_tap_rcvr_delete_replica(ENGINE_HANDLE *h, ENGINE_HA
     char* eng_specific[8];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 0,
+                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          NULL, 0, 1) == ENGINE_SUCCESS,
           "Expected expected success.");
@@ -6387,7 +6387,7 @@ static enum test_result test_tap_rcvr_delete_replica_locked(ENGINE_HANDLE *h,
     char* eng_specific[8];
     memset(eng_specific, 0, sizeof(eng_specific));
     check(h1->tap_notify(h, NULL, eng_specific, sizeof(eng_specific),
-                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 0,
+                         1, 0, TAP_DELETION, 1, "key", 3, 0, 0, 1,
                          PROTOCOL_BINARY_RAW_BYTES,
                          NULL, 0, 0) == ENGINE_SUCCESS,
           "Expected expected success.");
@@ -8120,7 +8120,7 @@ static enum test_result test_datatype_with_unknown_command(ENGINE_HANDLE *h,
 
     ItemMetaData itm_meta;
     itm_meta.revSeqno = 10;
-    itm_meta.cas = 0;
+    itm_meta.cas = 0x1;
     itm_meta.exptime = 0;
     itm_meta.flags = 0;
 
@@ -10596,6 +10596,7 @@ static enum test_result test_delete_with_meta_race_with_delete(ENGINE_HANDLE *h,
     uint16_t keylen2 = (uint16_t)strlen(key2);
     item *i = NULL;
     ItemMetaData itm_meta;
+    itm_meta.cas = 0x1;
     // check the stat
     size_t temp = get_int_stat(h, h1, "ep_num_ops_del_meta");
     check(temp == 0, "Expect zero ops");