MB-17766: Refactor nextCheckpointItemTask to allow testing 64/59664/4
authorDave Rigby <daver@couchbase.com>
Tue, 9 Feb 2016 13:33:38 +0000 (13:33 +0000)
committerChiyoung Seo <chiyoung@couchbase.com>
Wed, 10 Feb 2016 17:47:14 +0000 (17:47 +0000)
Split nextCheckpointItemTask() into two inner (protected) functions,
to allow testing of the fix for MB-17766.

Change-Id: I9d441d873cf7f727f90a966d4dda03043c7f6480
Reviewed-on: http://review.couchbase.org/59664
Well-Formed: buildbot <build@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
src/dcp-stream.cc
src/dcp-stream.h

index 80f5c3f..a687923 100644 (file)
@@ -689,21 +689,31 @@ void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
 
 void ActiveStream::nextCheckpointItemTask() {
     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
-    if (!vbucket) {
+    if (vbucket) {
+        std::deque<queued_item> items = getOutstandingItems(vbucket);
+        processItems(items);
+    } else {
         /* The entity deleting the vbucket must set stream to dead,
            calling setDead(END_STREAM_STATE) will cause deadlock because
            it will try to grab streamMutex which is already acquired at this
            point here */
         return;
     }
-    bool mark = false;
+}
+
+std::deque<queued_item> ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb) {
     std::deque<queued_item> items;
-    std::deque<MutationResponse*> mutations;
-    vbucket->checkpointManager.getAllItemsForCursor(name_, items);
-    if (vbucket->checkpointManager.getNumCheckpoints() > 1) {
+    vb->checkpointManager.getAllItemsForCursor(name_, items);
+    if (vb->checkpointManager.getNumCheckpoints() > 1) {
         engine->getEpStore()->wakeUpCheckpointRemover();
     }
 
+    return items;
+}
+
+
+void ActiveStream::processItems(std::deque<queued_item>& items) {
+    bool mark = false;
     if (items.empty()) {
         producer->notifyStreamReady(vb_, true);
         return;
@@ -713,6 +723,7 @@ void ActiveStream::nextCheckpointItemTask() {
         mark = true;
     }
 
+    std::deque<MutationResponse*> mutations;
     std::deque<queued_item>::iterator itemItr;
     for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
         queued_item& qi = *itemItr;
index 0db3223..97c0629 100644 (file)
@@ -212,6 +212,14 @@ public:
     // Runs on ActiveStreamCheckpointProcessorTask
     void nextCheckpointItemTask();
 
+protected:
+    // Returns the outstanding items for the stream's checkpoint cursor.
+    std::deque<queued_item> getOutstandingItems(RCPtr<VBucket> &vb);
+
+    // Given a set of queued items, create mutation responses for each item,
+    // and pass onto the producer associated with this stream.
+    void processItems(std::deque<queued_item>& items);
+
 private:
 
     void transitionState(stream_state_t newState);