MB-17006: [BP] DCP Producer could miss fetching items from a stream 41/57641/5
authorabhinavdangeti <abhinav@couchbase.com>
Mon, 7 Dec 2015 21:02:55 +0000 (13:02 -0800)
committerChiyoung Seo <chiyoung@couchbase.com>
Wed, 9 Dec 2015 23:47:44 +0000 (23:47 +0000)
Here's the scenario:
1. Stream currently in backfill phase
2. When backfill is received, 1 item added to readyQ
    a. itemsReady of stream set to true (Producer notified)
3. Front end op comes in
    a. item added to checkpoint queue
    b. itemsReady not set to true, as it already is (Producer not
      notified)
4. Producer calls stream::next()
    a. stream in backfillPhase(): 1 item popped from readyQ
    b. backfill task still running => no state transition to IN_MEMORY
    c. 1 op returned to producer, producer re-adds vbucket to ready list
5. Backfill completes
6. Producer calls stream::next()
    a. stream in backfillPhase(): no items in readyQ
    b. As backfill task has completed, state transitions to IN_MEMORY
    c. no items in readyQ => NULL returned
    d. As no op obtained, producer doesn't re-add vbucket to ready list

=> Front end item remains stuck in checkpoint queue, until more front
end ops come in - which would notify the producer

The proposed fix here is: In step 6b, when the producer sees the
backfill task has completed, and the state for the stream transitions
to IN_MEMORY, move checkpoint items into readyQ. This way the readyQ
will not be empty, and the producer would re-add the vbucket back into
the ready list.

Change-Id: I3403d3926f97788074990ef0e4c69cac902b2a93
Reviewed-on: http://review.couchbase.org/57516
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Change-Id: I0821402fefd01f0851572d7c22ccee5fc065778d
Reviewed-on: http://review.couchbase.org/57641
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/dcp-stream.cc
tests/ep_testsuite.cc

index 8d0c687..eb2e447 100644 (file)
@@ -467,13 +467,9 @@ DcpResponse* ActiveStream::backfillPhase() {
 }
 
 DcpResponse* ActiveStream::inMemoryPhase() {
-    if (!readyQ.empty()) {
-        return nextQueuedItem();
-    }
-
     if (lastSentSeqno >= end_seqno_) {
         endStream(END_STREAM_OK);
-    } else {
+    } else if (readyQ.empty()) {
         nextCheckpointItem();
     }
 
@@ -814,15 +810,38 @@ void ActiveStream::transitionState(stream_state_t newState) {
 
     state_ = newState;
 
-    if (newState == STREAM_BACKFILLING) {
-        scheduleBackfill();
-    } else if (newState == STREAM_TAKEOVER_SEND) {
-        nextCheckpointItem();
-    } else if (newState == STREAM_DEAD) {
-        RCPtr<VBucket> vb = engine->getVBucket(vb_);
-        if (vb) {
-            vb->checkpointManager.removeTAPCursor(name_);
-        }
+    switch (newState) {
+        case STREAM_BACKFILLING:
+            scheduleBackfill();
+            break;
+        case STREAM_IN_MEMORY:
+            // Check if the producer has sent up till the last requested
+            // sequence number already, if not - move checkpoint items into
+            // the ready queue.
+            if (lastSentSeqno >= end_seqno_) {
+                // Stream transitioning to DEAD state
+                endStream(END_STREAM_OK);
+            } else {
+                nextCheckpointItem();
+            }
+            break;
+        case STREAM_TAKEOVER_SEND:
+            nextCheckpointItem();
+            break;
+        case STREAM_DEAD:
+            {
+                RCPtr<VBucket> vb = engine->getVBucket(vb_);
+                if (vb) {
+                    vb->checkpointManager.removeTAPCursor(name_);
+                }
+                break;
+            }
+        case STREAM_TAKEOVER_WAIT:
+        case STREAM_PENDING:
+            break;
+        case STREAM_READING:
+            throw std::logic_error("ActiveStream::transitionState:"
+                    " newState can't be STREAM_READING!");
     }
 }
 
index 0f18114..2086b9a 100644 (file)
@@ -3469,9 +3469,10 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
         (flags & DCP_ADD_STREAM_FLAG_DISKONLY) == 0 &&
         !skipEstimateCheck) {
         int est = end - start;
-        char stats_takeover[50];
-        snprintf(stats_takeover, sizeof(stats_takeover), "dcp-vbtakeover 0 %s", name);
-        wait_for_stat_to_be(h, h1, "estimate", est, stats_takeover);
+        std::stringstream stats_takeover;
+        stats_takeover << "dcp-vbtakeover " << vbucket << " " << name;
+        wait_for_stat_to_be_lte(h, h1, "estimate", est,
+                                stats_takeover.str().c_str());
     }
 
     bool done = false;