MB-21320: Remove unused 'schedule' flag in DcpProducer::notifyStreamReady 72/69172/2
authorManu Dhundi <manu@couchbase.com>
Tue, 25 Oct 2016 01:13:55 +0000 (18:13 -0700)
committerManu Dhundi <manu@couchbase.com>
Tue, 25 Oct 2016 17:19:30 +0000 (17:19 +0000)
Change-Id: Ia562ec7baac012d401cc420982431df9920e92d3
Reviewed-on: http://review.couchbase.org/69172
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Reviewed-by: Jim Walker <jim@couchbase.com>
src/dcp/producer.cc
src/dcp/producer.h
src/dcp/stream.cc

index 2c0958f..8c0d92e 100644 (file)
@@ -917,7 +917,7 @@ void DcpProducer::setDisconnect(bool disconnect) {
     }
 }
 
-void DcpProducer::notifyStreamReady(uint16_t vbucket, bool schedule) {
+void DcpProducer::notifyStreamReady(uint16_t vbucket) {
     if (ready.pushUnique(vbucket)) {
         log.unpauseIfSpaceAvailable();
     }
index 9e28029..eebf091 100644 (file)
@@ -102,7 +102,7 @@ public:
      */
     ENGINE_ERROR_CODE closeStream(uint32_t opaque, uint16_t vbucket);
 
-    void notifyStreamReady(uint16_t vbucket, bool schedule);
+    void notifyStreamReady(uint16_t vbucket);
 
     void notifyBackfillManager();
     bool recordBackfillManagerBytesRead(uint32_t bytes);
index 27a6dda..f1d7d44 100644 (file)
@@ -328,7 +328,7 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
     lh.unlock();
     bool inverse = false;
     if (itemsReady.compare_exchange_strong(inverse, true)) {
-        producer->notifyStreamReady(vb_, false);
+        producer->notifyStreamReady(vb_);
     }
 }
 
@@ -354,7 +354,7 @@ bool ActiveStream::backfillReceived(Item* itm, backfill_source_t backfill_source
             lh.unlock();
             bool inverse = false;
             if (itemsReady.compare_exchange_strong(inverse, true)) {
-                producer->notifyStreamReady(vb_, false);
+                producer->notifyStreamReady(vb_);
             }
 
             if (backfill_source == BACKFILL_FROM_MEMORY) {
@@ -395,7 +395,7 @@ void ActiveStream::completeBackfill() {
     isBackfillTaskRunning.compare_exchange_strong(inverse, false);
     inverse = false;
     if (itemsReady.compare_exchange_strong(inverse, true)) {
-        producer->notifyStreamReady(vb_, false);
+        producer->notifyStreamReady(vb_);
     }
 }
 
@@ -403,7 +403,7 @@ void ActiveStream::snapshotMarkerAckReceived() {
     bool inverse = false;
     if (--waitForSnapshot == 0 &&
         itemsReady.compare_exchange_strong(inverse, true)) {
-        producer->notifyStreamReady(vb_, true);
+        producer->notifyStreamReady(vb_);
     }
 }
 
@@ -436,7 +436,7 @@ void ActiveStream::setVBucketStateAckRecieved() {
 
         bool inverse = false;
         if (itemsReady.compare_exchange_strong(inverse, true)) {
-            producer->notifyStreamReady(vb_, true);
+            producer->notifyStreamReady(vb_);
         }
     } else {
         producer->getLogger().log(EXTENSION_LOG_WARNING,
@@ -824,7 +824,7 @@ void ActiveStream::processItems(std::vector<queued_item>& items) {
 
     // Completed item processing - clear guard flag and notify producer.
     chkptItemsExtractionInProgress.store(false);
-    producer->notifyStreamReady(vb_, true);
+    producer->notifyStreamReady(vb_);
 }
 
 void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
@@ -890,7 +890,7 @@ uint32_t ActiveStream::setDead(end_stream_status_t status) {
     bool inverse = false;
     if (status != END_STREAM_DISCONNECTED &&
         itemsReady.compare_exchange_strong(inverse, true)) {
-        producer->notifyStreamReady(vb_, true);
+        producer->notifyStreamReady(vb_);
     }
     return 0;
 }
@@ -899,7 +899,7 @@ void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
     if (state_ != STREAM_DEAD) {
         bool inverse = false;
         if (itemsReady.compare_exchange_strong(inverse, true)) {
-            producer->notifyStreamReady(vb_, true);
+            producer->notifyStreamReady(vb_);
         }
     }
 }
@@ -1156,7 +1156,7 @@ void ActiveStream::transitionState(stream_state_t newState) {
                 endStream(END_STREAM_OK);
                 bool inverse = false;
                 if (itemsReady.compare_exchange_strong(inverse, true)) {
-                    producer->notifyStreamReady(vb_, false);
+                    producer->notifyStreamReady(vb_);
                 }
             } else {
                 nextCheckpointItem();
@@ -1272,7 +1272,7 @@ uint32_t NotifierStream::setDead(end_stream_status_t status) {
             lh.unlock();
             bool inverse = false;
             if (itemsReady.compare_exchange_strong(inverse, true)) {
-                producer->notifyStreamReady(vb_, true);
+                producer->notifyStreamReady(vb_);
             }
         }
     }
@@ -1287,7 +1287,7 @@ void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
         lh.unlock();
         bool inverse = false;
         if (itemsReady.compare_exchange_strong(inverse, true)) {
-            producer->notifyStreamReady(vb_, true);
+            producer->notifyStreamReady(vb_);
         }
     }
 }