MB-16656: Send snapshotEnd as highSeqno for replica vb in GET_ALL_VB_SEQNOS call 89/62889/5 v3.1.5
authorManu Dhundi <manu@couchbase.com>
Fri, 15 Apr 2016 15:38:44 +0000 (08:38 -0700)
committerManu Dhundi <manu@couchbase.com>
Fri, 15 Apr 2016 16:50:27 +0000 (16:50 +0000)
For replica vbucket we must send snapshotEnd received in the last snapshotMarker
as the high seqno. Sending lastClosedChkSeqno can cause problems for view engine
which builds an index from replica vbucket.

Previously this was sent correctly in seqno stats, now adding it for
GET_ALL_VB_SEQNOS as well.

Change-Id: I58dd168f9248263172759616bc53e751b536e5e3
Reviewed-on: http://review.couchbase.org/62889
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/ep_engine.cc
tests/ep_test_apis.cc
tests/ep_test_apis.h
tests/ep_testsuite.cc

index 9c2e836..d87d454 100644 (file)
@@ -5754,7 +5754,9 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::getAllVBucketSequenceNumbers(const
                 if (vb->getState() == vbucket_state_active) {
                     highSeqno = htonll(vb->getHighSeqno());
                 } else {
-                    highSeqno = htonll(vb->checkpointManager.getLastClosedChkBySeqno());
+                    uint64_t snapshot_start, snapshot_end;
+                    vb->getCurrentSnapshot(snapshot_start, snapshot_end);
+                    highSeqno = htonll(snapshot_end);
                 }
                 size_t offset = payload.size();
                 payload.resize(offset + sizeof(vbid) + sizeof(highSeqno));
index 94cc4f0..ff1913e 100644 (file)
@@ -958,6 +958,57 @@ void wait_for_persisted_value(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
     h1->release(h, NULL, i);
 }
 
+bool get_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+                       vbucket_state_t state, const void *cookie) {
+    protocol_binary_request_header *pkt;
+    if (state) {
+        char ext[sizeof(vbucket_state_t)];
+        encodeExt(ext, static_cast<uint32_t>(state));
+        pkt = createPacket(PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS, 0, 0, ext,
+                           sizeof(vbucket_state_t));
+    } else {
+        pkt = createPacket(PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS);
+    }
+
+    checkeq(ENGINE_SUCCESS, h1->unknown_command(h, cookie, pkt, add_response),
+            "Error in getting all vb info");
+
+    free(pkt);
+
+    return last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS;
+}
+
+void verify_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+                          uint16_t vb_start, uint16_t vb_end) {
+    const int per_vb_resp_size = sizeof(uint16_t) + sizeof(uint64_t);
+    const int high_seqno_offset = sizeof(uint16_t);
+
+    std::string seqno_body(last_body, last_bodylen);
+
+    /* Check if the total response length is as expected. We expect 10 bytes
+       (2 for vb_id + 8 for seqno) */
+    checkeq((vb_end - vb_start + 1) * per_vb_resp_size,
+            static_cast<int>(seqno_body.size()), "Failed to get all vb info.");
+    /* Check if the contents are correct */
+    for (uint16_t i = 0; i < (vb_end - vb_start + 1); i++) {
+        /* Check for correct vb_id */
+        checkeq(static_cast<const uint16_t>(vb_start + i),
+                ntohs(*(reinterpret_cast<const uint16_t*>(seqno_body.data() +
+                                                          per_vb_resp_size*i))),
+                "vb_id mismatch");
+        /* Check for correct high_seqno */
+        std::stringstream vb_stat_seqno;
+        vb_stat_seqno << "vb_" << (vb_start + i) << ":high_seqno";
+        uint64_t high_seqno_vb =
+              get_ull_stat(h, h1, vb_stat_seqno.str().c_str(), "vbucket-seqno");
+        checkeq(high_seqno_vb,
+                ntohll(*(reinterpret_cast<const uint64_t*>(seqno_body.data() +
+                                                           per_vb_resp_size*i +
+                                                           high_seqno_offset))),
+                "high_seqno mismatch");
+    }
+}
+
 void dcp_step(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const void* cookie) {
     struct dcp_message_producers* producers = get_dcp_producers();
     ENGINE_ERROR_CODE err = h1->dcp.step(h, cookie, producers);
index 7c806d6..c80257a 100644 (file)
 #include "ep-engine/command_ids.h"
 #include "item.h"
 
+extern "C" bool abort_msg(const char *expr, const char *msg, int line);
+
+template <typename T>
+static void checkeqfn(T exp, T got, const char *msg, const char *file, const int linenum) {
+    if (exp != got) {
+        std::stringstream ss;
+        ss << "Expected `" << exp << "', got `" << got << "' - " << msg;
+        abort_msg(ss.str().c_str(), file, linenum);
+    }
+}
+
+#define checkeq(a, b, c) checkeqfn(a, b, c, __FILE__, __LINE__)
+
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -192,6 +205,11 @@ void wait_for_persisted_value(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
 void wait_for_memory_usage_below(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
                                  int mem_threshold);
 
+bool get_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+                       vbucket_state_t state, const void *cookie);
+void verify_all_vb_seqnos(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+                          uint16_t vb_start, uint16_t vb_end);
+
 // Tap Operations
 void changeVBFilter(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, std::string name,
                     std::map<uint16_t, uint64_t> &filtermap);
index fc3ccc9..b106ad1 100644 (file)
 // away ;)
 typedef void (*UNLOCK_COOKIE_T)(const void *cookie);
 
-extern "C" bool abort_msg(const char *expr, const char *msg, int line);
-
-template <typename T>
-static void checkeqfn(T exp, T got, const char *msg, const char *file, const int linenum) {
-    if (exp != got) {
-        std::stringstream ss;
-        ss << "Expected `" << exp << "', got `" << got << "' - " << msg;
-        abort_msg(ss.str().c_str(), file, linenum);
-    }
-}
-
-#define checkeq(a, b, c) checkeqfn(a, b, c, __FILE__, __LINE__)
-
 extern "C" {
 
 #define check(expr, msg) \
@@ -11558,6 +11545,70 @@ static enum test_result test_failover_log_behavior(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+static enum test_result test_get_all_vb_seqnos(ENGINE_HANDLE *h,
+                                               ENGINE_HANDLE_V1 *h1) {
+    const int num_items = 5;
+
+    /* Replica vbucket 0; snapshot 0 to 10, but write just 1 item */
+    const int rep_vb_num = 0;
+    check(set_vbucket_state(h, h1, rep_vb_num, vbucket_state_replica),
+          "Failed to set vbucket state");
+    wait_for_flusher_to_settle(h, h1);
+
+    const void *cookie = testHarness.create_cookie();
+    uint32_t opaque = 0xFFFF0000;
+    uint32_t flags = 0;
+    std::string name("unittest");
+    uint8_t cas = 0;
+    uint8_t datatype = 1;
+    uint64_t bySeqno = 10;
+    uint64_t revSeqno = 0;
+    uint32_t exprtime = 0;
+    uint32_t lockTime = 0;
+
+    checkeq(ENGINE_SUCCESS,
+            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)(name.c_str()),
+                         name.size()),
+            "Failed to open DCP consumer connection!");
+    add_stream_for_consumer(h, h1, cookie, opaque++, rep_vb_num, 0,
+                            PROTOCOL_BINARY_RESPONSE_SUCCESS);
+
+    std::string opaqueStr("eq_dcpq:" + name + ":stream_0_opaque");
+    uint32_t stream_opaque = get_int_stat(h, h1, opaqueStr.c_str(), "dcp");
+
+    checkeq(ENGINE_SUCCESS,
+            h1->dcp.snapshot_marker(h, cookie, stream_opaque, rep_vb_num, 0, 10,
+                                    1),
+            "Failed to send snapshot marker!");
+
+    check(h1->dcp.mutation(h, cookie, stream_opaque, "key", 3, "value", 5,
+                           cas, rep_vb_num, flags, datatype,
+                           bySeqno, revSeqno, exprtime,
+                           lockTime, NULL, 0, 0) == ENGINE_SUCCESS,
+          "Failed dcp mutate.");
+
+    /* Active vbucket 1; write 5 items */
+    check(set_vbucket_state(h, h1, 1, vbucket_state_active),
+          "Failed to set vbucket state.");
+    for (int j= 0; j < num_items; j++) {
+        std::stringstream ss;
+        ss << "key" << j;
+        checkeq(ENGINE_SUCCESS,
+                store(h, h1, NULL, OPERATION_SET, ss.str().c_str(),
+                      "value", NULL, 0, 1),
+                "Failed to store an item.");
+    }
+
+    /* Create request to get vb seqno of all vbuckets */
+    get_all_vb_seqnos(h, h1, static_cast<vbucket_state_t>(0), cookie);
+
+    /* Check if the response received is correct */
+    verify_all_vb_seqnos(h, h1, 0, 1);
+
+    testHarness.destroy_cookie(cookie);
+    return SUCCESS;
+}
+
 static enum test_result test_failover_log_dcp(ENGINE_HANDLE *h,
                                               ENGINE_HANDLE_V1 *h1) {
 
@@ -12726,6 +12777,8 @@ engine_test_t* get_tests(void) {
 
         TestCase("test failover log behavior", test_failover_log_behavior,
                  test_setup, teardown, NULL, prepare, cleanup),
+        TestCase("test get all vb seqnos", test_get_all_vb_seqnos, test_setup,
+                 teardown, NULL, prepare, cleanup),
         TestCase("test MB-16357", test_mb16357,
                  test_setup, teardown, "compaction_exp_mem_threshold=85",
                  prepare, cleanup),