Merge remote-tracking branch 'couchbase/3.0.x' into 'couchbase/sherlock' 37/60137/1
authorabhinavdangeti <abhinav@couchbase.com>
Wed, 17 Feb 2016 18:10:54 +0000 (10:10 -0800)
committerabhinavdangeti <abhinav@couchbase.com>
Wed, 17 Feb 2016 18:11:26 +0000 (10:11 -0800)
couchbase/3.0.x:
|\
| * 37d53b1 MB-18171: Break cyclic reference between ActiveStream & ChkptProcesser
| * 928ba39 MB-17766: Set maxNumAuxIO in stream_test to zero
| * 9ac7fd0 [BP] MB-17766: Fix intermittant stream_test failure

Change-Id: I5a01bbf360bd5e59a91ae8400f3de14cc50f5911

1  2 
src/connmap.cc
src/dcp-producer.cc
src/dcp-producer.h
src/dcp-stream.cc
src/dcp-stream.h
tests/module_tests/stream_test.cc

diff --cc src/connmap.cc
Simple merge
Simple merge
Simple merge
@@@ -132,10 -273,10 +131,9 @@@ ActiveStream::ActiveStream(EventuallyPe
                snap_start_seqno, snap_end_seqno),
         lastReadSeqno(st_seqno), lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
         takeoverState(vbucket_state_pending), backfillRemaining(0),
 -       itemsFromBackfill(0), itemsFromMemory(0), firstMarkerSent(false),
 -       waitForSnapshot(0), engine(e), producer(p),
 -       isBackfillTaskRunning(false), lastSentSnapEndSeqno(0),
 -       chkptItemsExtractionInProgress(false) {
 +       itemsFromMemoryPhase(0), firstMarkerSent(false), waitForSnapshot(0),
 +       engine(e), producer(p), isBackfillTaskRunning(false),
-        lastSentSnapEndSeqno(0), checkpointCreatorTask(task),
-        chkptItemsExtractionInProgress(false) {
++       lastSentSnapEndSeqno(0), chkptItemsExtractionInProgress(false) {
  
      const char* type = "";
      if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
@@@ -533,10 -628,9 +531,9 @@@ DcpResponse* ActiveStream::nextQueuedIt
  
  bool ActiveStream::nextCheckpointItem() {
      RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
 -    if (vbucket && vbucket->checkpointManager.getNumItemsForTAPConnection(name_) > 0) {
 +    if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(name_) > 0) {
          // schedule this stream to build the next checkpoint
-         static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
-         ->schedule(this);
+         producer->scheduleCheckpointProcessorTask(this);
          return true;
      } else if (chkptItemsExtractionInProgress) {
          return true;
@@@ -180,9 -168,13 +180,9 @@@ public
                   const std::string &name, uint32_t flags, uint32_t opaque,
                   uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
                   uint64_t vb_uuid, uint64_t snap_start_seqno,
-                  uint64_t snap_end_seqno, ExTask task);
+                  uint64_t snap_end_seqno);
  
 -    ~ActiveStream() {
 -        LockHolder lh(streamMutex);
 -        transitionState(STREAM_DEAD);
 -        clear_UNLOCKED();
 -    }
 +    ~ActiveStream();
  
      DcpResponse* next();
  
@@@ -90,9 -84,13 +90,13 @@@ static void test_mb17766(const std::str
          usleep(10);
      }
  
+     // Set AuxIO threads to zero, so that the producer's
+     // ActiveStreamCheckpointProcesserTask doesn't run.
+     ExecutorPool::get()->setMaxAuxIO(0);
      // Add an item.
      std::string value("value");
 -    Item item("key", /*flags*/0, /*exp*/0, value.c_str(), value.size());
 +    Item item("key", 3, /*flags*/0, /*exp*/0, value.c_str(), value.size());
  
      uint64_t cas;
      EXPECT_EQ(ENGINE_SUCCESS,
  
      RCPtr<VBucket> vb0 = engine->getVBucket(0);
      EXPECT_EQ(true, vb0, "Failed to get valid VBucket object for id 0");
-     EXPECT_EQ(false,
-               vb0->checkpointManager.registerCursor("test_mb17766"),
-               "Found an existing TAP cursor when attemping to register ours");
 -    vb0->checkpointManager.registerTAPCursor(producer->getName());
++    vb0->checkpointManager.registerCursor(producer->getName());
  
      // Should start with nextCheckpointItem() returning true.
      MockActiveStream* mock_stream = static_cast<MockActiveStream*>(stream.get());