MB-17889: DCP producer can leave an operation behind 96/60196/3
authorJim Walker <jim@couchbase.com>
Thu, 18 Feb 2016 15:14:28 +0000 (15:14 +0000)
committerChiyoung Seo <chiyoung@couchbase.com>
Thu, 18 Feb 2016 19:05:56 +0000 (19:05 +0000)
In low-traffic setups there's a case in DcpProducer::getNextItem
where the function exits and pauses the task, yet an item was
waiting to be sent.

getNextItem() looked like

  1. setpaused=false;
  2. while(ready.pop(vbucket)) {
  3.   process(vbucket);
  4. }
  5. setpaused=true;
  6. return NULL;

The notifier

  a. if(ready.pushUnique(vbucket))
  b.   wakeupIfPaused;

If a and b execute between 4 and 5, then the producer will sleep
and not process the vbucket until the next wakeup (which maybe never).

This is not good, the first operation will have a long latency before
it can be seen on DCP. As long as it takes for the second operation.

If a and b occur between 5 and 6, that's fine, wakupIfPaused will re-wake
the producer.

a,b,5,6 is bad
5,a,6,b is ok
5,6,a,b is ok
5,a,b,6 is ok
5,6,a,b is ok
...

The fix.

getNextItem()

  0. do {
  1.   setpaused=false;
  2.   while(ready.pop(vbucket)) {
  3.     process(vbucket);
  4.   }
  5.   setpaused=true;
  6.  } while(!ready.empty());
  7. return NULL;

Now if ab occurs after 4, but before 5, it's ok as 6 will now consume
the vbucket.

5,a,b,6 is ok, as 6 will loop and consume
5,a,6,b is ok, "    "    "    "    "
6,a,b,7 is ok, paused is true (5), b will wake the task

Change-Id: Ib412a85ee10de0e2a2ca4116d0cc85bbad538da2
Reviewed-on: http://review.couchbase.org/60196
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Reviewed-by: abhinav dangeti <abhinav@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/dcp-producer.cc
src/dcp-producer.h

index dfe6ff0..47a4e84 100644 (file)
@@ -616,56 +616,64 @@ const char* DcpProducer::getType() const {
 }
 
 DcpResponse* DcpProducer::getNextItem() {
-    setPaused(false);
-    uint16_t vbucket = 0;
-    while (ready.popFront(vbucket)) {
-        if (log.pauseIfFull()) {
-            ready.pushUnique(vbucket);
-            return NULL;
-        }
+    do {
+        setPaused(false);
+
+        uint16_t vbucket = 0;
+        while (ready.popFront(vbucket)) {
+            if (log.pauseIfFull()) {
+                ready.pushUnique(vbucket);
+                return NULL;
+            }
 
-        DcpResponse* op = NULL;
-        {
-            ReaderLockHolder rlh(streamsMutex);
-            std::map<uint16_t, stream_t>::iterator it = streams.find(vbucket);
-            if (it == streams.end()) {
+            DcpResponse* op = NULL;
+            {
+                ReaderLockHolder rlh(streamsMutex);
+                std::map<uint16_t, stream_t>::iterator it = streams.find(vbucket);
+                if (it == streams.end()) {
+                    continue;
+                }
+                op = it->second->next();
+            }
+
+            if (!op) {
+                // stream is empty, try another vbucket.
                 continue;
             }
-            op = it->second->next();
-        }
 
-        if (!op) {
-            // stream is empty, try another vbucket.
-            continue;
-        }
+            switch (op->getEvent()) {
+                case DCP_SNAPSHOT_MARKER:
+                case DCP_MUTATION:
+                case DCP_DELETION:
+                case DCP_EXPIRATION:
+                case DCP_STREAM_END:
+                case DCP_SET_VBUCKET:
+                    break;
+                default:
+                    LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to write"
+                        " an unexpected event %d", logHeader(), op->getEvent());
+                    abort();
+            }
 
-        switch (op->getEvent()) {
-            case DCP_SNAPSHOT_MARKER:
-            case DCP_MUTATION:
-            case DCP_DELETION:
-            case DCP_EXPIRATION:
-            case DCP_STREAM_END:
-            case DCP_SET_VBUCKET:
-                break;
-            default:
-                LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to write"
-                    " an unexpected event %d", logHeader(), op->getEvent());
-                abort();
-        }
+            ready.pushUnique(vbucket);
 
-        ready.pushUnique(vbucket);
+            if (op->getEvent() == DCP_MUTATION || op->getEvent() == DCP_DELETION ||
+                op->getEvent() == DCP_EXPIRATION) {
+                itemsSent++;
+            }
 
-        if (op->getEvent() == DCP_MUTATION || op->getEvent() == DCP_DELETION ||
-            op->getEvent() == DCP_EXPIRATION) {
-            itemsSent++;
-        }
+            totalBytesSent.fetch_add(op->getMessageSize());
 
-        totalBytesSent.fetch_add(op->getMessageSize());
+            return op;
+        }
 
-        return op;
-    }
+        // flag we are paused
+        setPaused(true);
 
-    setPaused(true);
+        // re-check the ready queue.
+        // A new vbucket could of became ready and the notifier could of seen
+        // paused = false, so reloop so we don't miss an operation.
+    } while(!ready.empty());
 
     return NULL;
 }
index 6bc8294..de34b8b 100644 (file)
@@ -254,15 +254,9 @@ private:
             return false;
         }
 
-        /**
-         * Move the front item to the back of the queue
-         */
-        void moveFrontToback() {
+        bool empty() {
             LockHolder lh(lock);
-            if (readyQueue.size() > 1) {
-                readyQueue.push(readyQueue.front());
-                readyQueue.pop();
-            }
+            return readyQueue.empty();
         }
 
     private: