MB-17051: [DcpProducer] Ensure no un-notified streams are left behind 67/57867/9
authorabhinavdangeti <abhinav@couchbase.com>
Thu, 17 Dec 2015 18:57:56 +0000 (10:57 -0800)
committerChiyoung Seo <chiyoung@couchbase.com>
Fri, 18 Dec 2015 09:18:13 +0000 (09:18 +0000)
Reiterate vbReady list at the end of a DcpProducer step to
ensure un-notified vbuckets are not left unprocessed.

Change-Id: I21065cf99f8be0af6dedf506237ce3dbe683387d
Reviewed-on: http://review.couchbase.org/57867
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
src/dcp-producer.cc
src/dcp-producer.h

index 1def385..c2e4812 100644 (file)
@@ -110,7 +110,7 @@ DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
                          const std::string &name, bool isNotifier)
     : Producer(e, cookie, name), rejectResp(NULL),
       notifyOnly(isNotifier), lastSendTime(ep_current_time()), log(*this),
-      vbReady(e.getConfiguration().getMaxVbuckets()),
+      vbReady(e.getConfiguration().getMaxVbuckets()), notifiedVbReady(false),
       itemsSent(0), totalBytesSent(0), roundRobinVbReady(0) {
     setSupportAck(true);
     setReserved(true);
@@ -261,6 +261,10 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
         static_cast<ActiveStream*>(streams[vbucket].get())->setActive();
     }
     vbReady[vbucket].store(true);
+    bool inverse = false;
+    if (notifiedVbReady.compare_exchange_strong(inverse, true)) {
+        log.unpauseIfSpaceAvailable();
+    }
 
     if (add_vb_conn_map) {
         connection_t conn(this);
@@ -518,7 +522,6 @@ ENGINE_ERROR_CODE DcpProducer::closeStream(uint32_t opaque, uint16_t vbucket) {
     {
         WriterLockHolder wlh(streamsMutex);
         streams.erase(vbucket);
-        vbReady[vbucket].store(false);
     }
 
     return ret;
@@ -589,7 +592,6 @@ void DcpProducer::closeAllStreams() {
             uint16_t vbid = itr->first;
             itr->second->setDead(END_STREAM_DISCONNECTED);
             streams.erase(vbid);
-            vbReady[vbid].store(false);
             vblist.push_back(vbid);
         }
     }
@@ -611,67 +613,72 @@ const char* DcpProducer::getType() const {
 
 DcpResponse* DcpProducer::getNextItem() {
     setPaused(false);
-    if (roundRobinVbReady >= vbReady.size()) {
-        roundRobinVbReady = 0;
-    }
-    for (; roundRobinVbReady < vbReady.size(); roundRobinVbReady++) {
-
-        if (log.pauseIfFull()) {
-            return NULL;
+    bool inverse = true;
+    do {
+        if (roundRobinVbReady >= vbReady.size()) {
+            roundRobinVbReady = 0;
         }
+        for (; roundRobinVbReady < vbReady.size(); roundRobinVbReady++) {
 
-        bool expected = true;
-        if (vbReady[roundRobinVbReady].compare_exchange_strong(expected, false)) {
-            uint16_t vbucket = roundRobinVbReady;
-            DcpResponse *op = NULL;
-            std::map<uint16_t, stream_t>::iterator it;
-            stream_t stream;
-            {
-                ReaderLockHolder rlh(streamsMutex);
-                it = streams.find(vbucket);
-                if (it == streams.end()) {
-                    continue;
-                }
-                stream.reset(it->second);
+            if (log.pauseIfFull()) {
+                return NULL;
             }
 
-            // Return the next operation
-            // When an op is returned it is assumed
-            // our bufferLog has been updated.
-            op = stream->next();
+            bool expected = true;
+            if (vbReady[roundRobinVbReady].compare_exchange_strong(expected, false)) {
+                uint16_t vbucket = roundRobinVbReady;
+                DcpResponse *op = NULL;
+                std::map<uint16_t, stream_t>::iterator it;
+                stream_t stream;
+                {
+                    ReaderLockHolder rlh(streamsMutex);
+                    it = streams.find(vbucket);
+                    if (it == streams.end()) {
+                        continue;
+                    }
+                    stream.reset(it->second);
+                }
 
-            if (!op) {
-                continue;
-            }
+                // Return the next operation
+                // When an op is returned it is assumed
+                // our bufferLog has been updated.
+                op = stream->next();
 
-            switch (op->getEvent()) {
-                case DCP_SNAPSHOT_MARKER:
-                case DCP_MUTATION:
-                case DCP_DELETION:
-                case DCP_EXPIRATION:
-                case DCP_STREAM_END:
-                case DCP_SET_VBUCKET:
-                    break;
-                default:
-                    LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to "
-                        "write an unexpected event %d",
-                        logHeader(), op->getEvent());
-                    abort();
-            }
+                if (!op) {
+                    continue;
+                }
 
-            vbReady[vbucket].store(true);
+                switch (op->getEvent()) {
+                    case DCP_SNAPSHOT_MARKER:
+                    case DCP_MUTATION:
+                    case DCP_DELETION:
+                    case DCP_EXPIRATION:
+                    case DCP_STREAM_END:
+                    case DCP_SET_VBUCKET:
+                        break;
+                    default:
+                        LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to "
+                                "write an unexpected event %d",
+                                logHeader(), op->getEvent());
+                        abort();
+                }
 
-            if (op->getEvent() == DCP_MUTATION ||
-                op->getEvent() == DCP_DELETION ||
-                op->getEvent() == DCP_EXPIRATION) {
-               itemsSent++;
-            }
+                vbReady[vbucket].store(true);
+                notifiedVbReady.store(true);
+                ++roundRobinVbReady;
 
-            totalBytesSent.fetch_add(op->getMessageSize());
+                if (op->getEvent() == DCP_MUTATION ||
+                    op->getEvent() == DCP_DELETION ||
+                    op->getEvent() == DCP_EXPIRATION) {
+                    itemsSent++;
+                }
+
+                totalBytesSent.fetch_add(op->getMessageSize());
 
-            return op;
+                return op;
+            }
         }
-    }
+    } while (notifiedVbReady.compare_exchange_strong(inverse, false));
 
     setPaused(true);
     return NULL;
@@ -691,7 +698,8 @@ void DcpProducer::setDisconnect(bool disconnect) {
 
 void DcpProducer::notifyStreamReady(uint16_t vbucket, bool schedule) {
     bool expected = false;
-    if (vbReady[vbucket].compare_exchange_strong(expected, true)) {
+    if (vbReady[vbucket].compare_exchange_strong(expected, true) &&
+        notifiedVbReady.compare_exchange_strong(expected, true)) {
         log.unpauseIfSpaceAvailable();
     }
 }
index eb76d75..0554893 100644 (file)
@@ -211,6 +211,7 @@ private:
     RWLock streamsMutex;
 
     std::vector<AtomicValue<bool> > vbReady;
+    AtomicValue<bool> notifiedVbReady;
 
     std::map<uint16_t, stream_t> streams;