MB-17766: Incorrect ordering of messages during ActiveStream's takeover-send phase 27/59627/7
authorabhinavdangeti <abhinav@couchbase.com>
Tue, 9 Feb 2016 20:18:01 +0000 (12:18 -0800)
committerabhinav dangeti <abhinav@couchbase.com>
Wed, 10 Feb 2016 19:05:35 +0000 (19:05 +0000)
A race between the step() and ActiveStreamCheckpointProcessorTask
can cause mutations to be queued into the readyQ after the
setVBucketState(active) message.

Here's the scenario in chronology (T: Front-end thread, BT: IO thread):
1. T1: ActiveStream::setVBucketStateAckRecieved()
2. T1: transitionState(takeoverSend) => schedules ActiveStreamCheckpointProcessorTask
3. BT1: manageConnections() notifies memcached about specific conn (max idle time: 5s)
4. BT2: ActiveStreamCheckpointProcessorTask runs, gets all Items For Cursor
5. T1: step() -> takeoverSendPhase() -> readyQ is empty
        => nextCheckpointItem() return false, as getNumItemsForCursor returns 0
        => setVbucketState(active) added to readyQ
6. BT2: ActiveStreamCheckpointProcessorTask continues, adds mutations acquired into readyQ
        -> Note the mutations were acquired in step 4
        => Notified memcached connections
7. T1: step() .. ships messages in incorrect order

On the new master, the vbucket is promoted to active state and then more
mutations are received from the old master. If there were front end ops
at this time, there could be an inconsistency in highSeqno or in worst
cases crashes in checkpoint manager due to highSeqno not belonging in
the designated range.

The fix: Add an atomic flag that is also checked for along with
getNumItemsForCursor in nextCheckpointItem(). This flag is set before
retrieving all items for a cursor (getAllItemsForCursor) and unset after
all the retrieved items have been added to the ready queue of the stream.

Change-Id: I5c04d47cc99c7dd3b2d87cb68dd30d36473226e5
Reviewed-on: http://review.couchbase.org/59627
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/dcp-stream.cc
src/dcp-stream.h

index c487199..0fde627 100644 (file)
@@ -277,7 +277,7 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
        itemsFromBackfill(0), itemsFromMemory(0), firstMarkerSent(false),
        waitForSnapshot(0), engine(e), producer(p),
        isBackfillTaskRunning(false), lastSentSnapEndSeqno(0),
-       checkpointCreatorTask(task) {
+       checkpointCreatorTask(task), chkptItemsExtractionInProgress(false) {
 
     const char* type = "";
     if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
@@ -634,6 +634,8 @@ bool ActiveStream::nextCheckpointItem() {
         static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
         ->schedule(this);
         return true;
+    } else if (chkptItemsExtractionInProgress) {
+        return true;
     }
     return false;
 }
@@ -704,6 +706,9 @@ void ActiveStream::nextCheckpointItemTask() {
 
 void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
                                        std::deque<queued_item> &items) {
+    // Commencing item processing - set guard flag.
+    chkptItemsExtractionInProgress.store(true);
+
     vb->checkpointManager.getAllItemsForCursor(name_, items);
     if (vb->checkpointManager.getNumCheckpoints() > 1) {
         engine->getEpStore()->wakeUpCheckpointRemover();
@@ -712,41 +717,40 @@ void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
 
 
 void ActiveStream::processItems(std::deque<queued_item>& items) {
-    bool mark = false;
-    if (items.empty()) {
-        producer->notifyStreamReady(vb_, true);
-        return;
-    }
+    if (!items.empty()) {
+        bool mark = false;
+        if (items.front()->getOperation() == queue_op_checkpoint_start) {
+            mark = true;
+        }
 
-    if (items.front()->getOperation() == queue_op_checkpoint_start) {
-        mark = true;
-    }
+        std::deque<MutationResponse*> mutations;
+        std::deque<queued_item>::iterator itemItr;
+        for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
+            queued_item& qi = *itemItr;
 
-    std::deque<MutationResponse*> mutations;
-    std::deque<queued_item>::iterator itemItr;
-    for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
-        queued_item& qi = *itemItr;
+            if (qi->getOperation() == queue_op_set ||
+                qi->getOperation() == queue_op_del) {
+                curChkSeqno = qi->getBySeqno();
+                lastReadSeqno = qi->getBySeqno();
 
-        if (qi->getOperation() == queue_op_set ||
-            qi->getOperation() == queue_op_del) {
-            curChkSeqno = qi->getBySeqno();
-            lastReadSeqno = qi->getBySeqno();
+                mutations.push_back(new MutationResponse(qi, opaque_));
+            } else if (qi->getOperation() == queue_op_checkpoint_start) {
+                snapshot(mutations, mark);
+                mark = true;
+            }
+        }
 
-            mutations.push_back(new MutationResponse(qi, opaque_));
-        } else if (qi->getOperation() == queue_op_checkpoint_start) {
+        if (mutations.empty()) {
+            // If we only got checkpoint start or ends check to see if there are
+            // any more snapshots before pausing the stream.
+            nextCheckpointItemTask();
+        } else {
             snapshot(mutations, mark);
-            mark = true;
         }
     }
 
-    if (mutations.empty()) {
-        // If we only got checkpoint start or ends check to see if there are
-        // any more snapshots before pausing the stream.
-        nextCheckpointItemTask();
-    } else {
-        snapshot(mutations, mark);
-    }
-    // ...notify...
+    // Completed item processing - clear guard flag and notify producer.
+    chkptItemsExtractionInProgress.store(false);
     producer->notifyStreamReady(vb_, true);
 }
 
index e63d4b7..543fe0a 100644 (file)
@@ -220,6 +220,8 @@ protected:
     // and pass onto the producer associated with this stream.
     void processItems(std::deque<queued_item>& items);
 
+    bool nextCheckpointItem();
+
 private:
 
     void transitionState(stream_state_t newState);
@@ -236,8 +238,6 @@ private:
 
     DcpResponse* nextQueuedItem();
 
-    bool nextCheckpointItem();
-
     void snapshot(std::deque<MutationResponse*>& snapshot, bool mark);
 
     void endStream(end_stream_status_t reason);
@@ -274,7 +274,14 @@ private:
 
     //! Last snapshot end seqno sent to the DCP client
     uint64_t lastSentSnapEndSeqno;
+
     ExTask checkpointCreatorTask;
+
+    /* Flag used by checkpointCreatorTask that is set before all items are
+       extracted for given checkpoint cursor, and is unset after all retrieved
+       items are added to the readyQ */
+    AtomicValue<bool> chkptItemsExtractionInProgress;
+
 };