MB-19843: Modify the end_seqno in DCP stream request after checking for rollback 96/64796/6
authorManu Dhundi <manu@couchbase.com>
Wed, 8 Jun 2016 19:41:12 +0000 (12:41 -0700)
committerDave Rigby <daver@couchbase.com>
Thu, 9 Jun 2016 16:22:11 +0000 (16:22 +0000)
During a DCP stream request, we will update the end seqno when flags
DCP_ADD_STREAM_FLAG_LATEST/DCP_ADD_STREAM_FLAG_DISKONLY are used.
Currently in some cases when a rollback is required, the end_seqno could become
less than start_seqno before we check if a rollback is needed, resulting in
rejection of stream request.

Hence we should modify the end_seqno (if required as per the flags) only after
checking if a rollback is needed.

Change-Id: I23b112c16b9167023a990a5709ae6aae4838472e
Reviewed-on: http://review.couchbase.org/64796
Well-Formed: buildbot <build@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
src/dcp-producer.cc
tests/ep_testsuite.cc

index 19194ae..b074c02 100644 (file)
@@ -178,17 +178,10 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
         return ENGINE_TMPFAIL;
     }
 
-    if (flags & DCP_ADD_STREAM_FLAG_LATEST) {
-        end_seqno = vb->getHighSeqno();
-    }
-
-    if (flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
-        end_seqno = engine_.getEpStore()->getLastPersistedSeqno(vbucket);
-    }
-
     if (!notifyOnly && start_seqno > end_seqno) {
         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
-            "the start seqno (%llu) is larger than the end seqno (%llu)",
+            "the start seqno (%llu) is larger than the end seqno (%llu); "
+            "Incorrect params passed by the DCP client",
             logHeader(), vbucket, start_seqno, end_seqno);
         return ENGINE_ERANGE;
     }
@@ -249,6 +242,34 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
         return rv;
     }
 
+    if (flags & DCP_ADD_STREAM_FLAG_LATEST) {
+        end_seqno = vb->getHighSeqno();
+    }
+
+    if (flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
+        end_seqno = engine_.getEpStore()->getLastPersistedSeqno(vbucket);
+    }
+
+    if (!notifyOnly && start_seqno > end_seqno) {
+        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
+            "the start seqno (%llu) is larger than the end seqno (%llu), "
+            "stream request flags %d, vb_uuid %llu, snapStartSeqno %llu, "
+            "snapEndSeqno %llu; should have rolled back instead",
+            logHeader(), vbucket, start_seqno, end_seqno, flags, vbucket_uuid,
+            snap_start_seqno, snap_end_seqno);
+        return ENGINE_ERANGE;
+    }
+
+    if (!notifyOnly && start_seqno > vb->getHighSeqno()) {
+        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
+            "the start seqno (%llu) is larger than the vb highSeqno (%llu), "
+            "stream request flags is %d, vb_uuid %llu, snapStartSeqno %llu, "
+            "snapEndSeqno %llu; should have rolled back instead",
+            logHeader(), vbucket, start_seqno, vb->getHighSeqno(), flags,
+            vbucket_uuid, snap_start_seqno, snap_end_seqno);
+        return ENGINE_ERANGE;
+    }
+
     stream_t s;
     if (notifyOnly) {
         s = new NotifierStream(&engine_, this, getName(), flags,
index 29e052d..865f35a 100644 (file)
@@ -3628,10 +3628,10 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
 }
 
 static void dcp_stream_req(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
-                           uint32_t opaque, uint16_t vbucket, uint64_t start,
+                           uint32_t opaque, uint32_t stream_flag,
+                           uint16_t vbucket, uint64_t start,
                            uint64_t end, uint64_t uuid,
-                           uint64_t snap_start_seqno,
-                           uint64_t snap_end_seqno,
+                           uint64_t snap_start_seqno, uint64_t snap_end_seqno,
                            uint64_t exp_rollback, ENGINE_ERROR_CODE err) {
     const void *cookie = testHarness.create_cookie();
     uint32_t flags = DCP_OPEN_PRODUCER;
@@ -3642,10 +3642,11 @@ static void dcp_stream_req(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
           == ENGINE_SUCCESS, "Failed dcp Consumer open connection.");
 
     uint64_t rollback = 0;
-    ENGINE_ERROR_CODE rv = h1->dcp.stream_req(h, cookie, 0, 1, 0, start, end,
-                                              uuid, snap_start_seqno,
-                                              snap_end_seqno,
-                                              &rollback, mock_dcp_add_failover_log);
+    ENGINE_ERROR_CODE rv = h1->dcp.stream_req(h, cookie, stream_flag, 1,
+                                              vbucket, start, end, uuid,
+                                              snap_start_seqno, snap_end_seqno,
+                                              &rollback,
+                                              mock_dcp_add_failover_log);
     check(rv == err, "Unexpected error code");
     if (err == ENGINE_ROLLBACK || err == ENGINE_KEY_ENOENT) {
         check(exp_rollback == rollback, "Rollback didn't match expected value");
@@ -8436,12 +8437,12 @@ static enum test_result test_dcp_rollback_after_purge(ENGINE_HANDLE *h,
     wait_for_stat_to_be(h, h1, "vb_0:num_checkpoints", 1, "checkpoint");
 
     /* DCP stream, expect a rollback to seq 0 */
-    dcp_stream_req(h, h1, 1, 0, 3, high_seqno, vb_uuid,
+    dcp_stream_req(h, h1, 1, 0, 0, 3, high_seqno, vb_uuid,
                    3, high_seqno, 0, ENGINE_ROLLBACK);
 
     /* Do not expect rollback when you already have all items in the snapshot
        (that is, start == snap_end_seqno)*/
-    dcp_stream_req(h, h1, 1, 0, high_seqno, high_seqno + 10, vb_uuid,
+    dcp_stream_req(h, h1, 1, 0, 0, high_seqno, high_seqno + 10, vb_uuid,
                    0, high_seqno, 0, ENGINE_SUCCESS);
 
     return SUCCESS;
@@ -11654,7 +11655,7 @@ static enum test_result test_failover_log_dcp(ENGINE_HANDLE *h,
     uint64_t uuid = 0;
     uint64_t snap_start_seq = start;
     uint64_t snap_end_seq = start;
-    dcp_stream_req(h, h1, 1, 0, start, end, uuid,
+    dcp_stream_req(h, h1, 1, 0, 0, start, end, uuid,
                    snap_start_seq, snap_end_seq, 0, ENGINE_SUCCESS);
 
     start = 0;
@@ -11662,7 +11663,7 @@ static enum test_result test_failover_log_dcp(ENGINE_HANDLE *h,
     uuid = get_ull_stat(h, h1, "vb_0:1:id", "failovers");
     snap_start_seq = start;
     snap_end_seq = start;
-    dcp_stream_req(h, h1, 1, 0, start, end, uuid,
+    dcp_stream_req(h, h1, 1, 0, 0, start, end, uuid,
                    snap_start_seq, snap_end_seq, 0, ENGINE_SUCCESS);
 
     start = 2;
@@ -11670,7 +11671,7 @@ static enum test_result test_failover_log_dcp(ENGINE_HANDLE *h,
     uuid = get_ull_stat(h, h1, "vb_0:1:id", "failovers");
     snap_start_seq = start;
     snap_end_seq = start;
-    dcp_stream_req(h, h1, 1, 0, start, end, uuid,
+    dcp_stream_req(h, h1, 1, 0, 0, start, end, uuid,
                    snap_start_seq, snap_end_seq, 0, ENGINE_SUCCESS);
 
     start = 10;
@@ -11678,7 +11679,7 @@ static enum test_result test_failover_log_dcp(ENGINE_HANDLE *h,
     uuid = get_ull_stat(h, h1, "vb_0:1:id", "failovers");
     snap_start_seq = start;
     snap_end_seq = start;
-    dcp_stream_req(h, h1, 1, 0, start, end, uuid,
+    dcp_stream_req(h, h1, 1, 0, 0, start, end, uuid,
                    snap_start_seq, snap_end_seq, 0, ENGINE_SUCCESS);
 
     start = 12;
@@ -11686,7 +11687,7 @@ static enum test_result test_failover_log_dcp(ENGINE_HANDLE *h,
     uuid = get_ull_stat(h, h1, "vb_0:1:id", "failovers");
     snap_start_seq = start;
     snap_end_seq = start;
-    dcp_stream_req(h, h1, 1, 0, start, end, uuid,
+    dcp_stream_req(h, h1, 1, 0, 0, start, end, uuid,
                    snap_start_seq, snap_end_seq, 10, ENGINE_ROLLBACK);
 
     start = 2;
@@ -11694,9 +11695,25 @@ static enum test_result test_failover_log_dcp(ENGINE_HANDLE *h,
     uuid = 123456;
     snap_start_seq = start;
     snap_end_seq = start;
-    dcp_stream_req(h, h1, 1, 0, start, end, uuid,
+    dcp_stream_req(h, h1, 1, 0, 0, start, end, uuid,
                    snap_start_seq, snap_end_seq, 0, ENGINE_ROLLBACK);
 
+    /* Test a case where start_seqno > vb_high_seqno and flags
+       DCP_ADD_STREAM_FLAG_LATEST/DCP_ADD_STREAM_FLAG_DISKONLY set */
+    start = 12;
+    end = 1000;
+    uuid = get_ull_stat(h, h1, "vb_0:1:id", "failovers");
+    snap_start_seq = start;
+    snap_end_seq = start;
+
+    /* Expect rollback */
+    dcp_stream_req(h, h1, 1, DCP_ADD_STREAM_FLAG_LATEST, 0, start, end, uuid,
+                   snap_start_seq, snap_end_seq, 10, ENGINE_ROLLBACK);
+
+    /* Expect rollback */
+    dcp_stream_req(h, h1, 1, DCP_ADD_STREAM_FLAG_DISKONLY, 0, start, end, uuid,
+                   snap_start_seq, snap_end_seq, 10, ENGINE_ROLLBACK);
+
     return SUCCESS;
 }