Merge remote-tracking branch 'couchbase/3.0.x' into sherlock 71/65571/1
authorDave Rigby <daver@couchbase.com>
Thu, 7 Jul 2016 10:57:50 +0000 (11:57 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 7 Jul 2016 10:57:50 +0000 (11:57 +0100)
* couchbase/3.0.x:
  MB-19503: Fix ConnMap so notifications don't go missing [2]
  MB-19503: Fix ConnMap so notifications don't go missing.
  MB-19404: [BP] Address data race in DCP-Producer seen while making a stats request
  MB-19405: [BP] Address possible data races in PassiveStream context

Change-Id: I241ebd07f9e6177d557dd0ea37da97d6b4cc1489

1  2 
src/dcp-producer.h
src/dcp-stream.cc
src/dcp-stream.h
tests/ep_testsuite.cc

  
  #include "config.h"
  
 -#include "tapconnection.h"
  #include "dcp-stream.h"
 +#include "tapconnection.h"
 +
 +class BackfillManager;
 +
 +class Stream;
 +typedef SingleThreadedRCPtr<Stream> stream_t;
  
+ #include <relaxed_atomic.h>
  class DcpResponse;
  
  class DcpProducer : public Producer {
@@@ -297,12 -285,10 +299,12 @@@ private
          rel_time_t sendTime;
          uint32_t opaque;
          uint32_t noopInterval;
-         bool pendingRecv;
-         bool enabled;
+         Couchbase::RelaxedAtomic<bool> pendingRecv;
+         Couchbase::RelaxedAtomic<bool> enabled;
      } noopCtx;
  
 +    std::string priority;
 +
      DcpResponse *rejectResp; // stash response for retry if E2BIG was hit
  
      bool notifyOnly;
@@@ -1077,25 -1142,20 +1077,25 @@@ PassiveStream::PassiveStream(Eventually
  }
  
  PassiveStream::~PassiveStream() {
 -    LockHolder lh(streamMutex);
 -    clear_UNLOCKED();
 -    cb_assert(state_ == STREAM_DEAD);
 -    cb_assert(buffer.bytes == 0);
 +    uint32_t unackedBytes = clearBuffer();
 +    if (transitionState(STREAM_DEAD)) {
 +        // Destructed a "live" stream, log it.
 +        LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRId16 ") Destructing stream."
 +            " last_seqno is %" PRIu64 ", unAckedBytes is %" PRIu32 ".",
-             consumer->logHeader(), vb_, last_seqno,
++            consumer->logHeader(), vb_, last_seqno.load(),
 +            unackedBytes);
 +    }
  }
  
 -uint32_t PassiveStream::setDead(end_stream_status_t status) {
 -    LockHolder lh(streamMutex);
 +uint32_t PassiveStream::setDead_UNLOCKED(end_stream_status_t status,
 +                                         LockHolder *slh) {
      transitionState(STREAM_DEAD);
 -    lh.unlock();
 +    slh->unlock();
      uint32_t unackedBytes = clearBuffer();
 -    LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Setting stream to dead state,"
 -        " last_seqno is %llu, unackedBytes is %u, status is %s",
 +    LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRId16 ") Setting stream to dead"
 +        " state, last_seqno is %" PRIu64 ", unackedBytes is %" PRIu32 ","
 +        " status is %s",
-         consumer->logHeader(), vb_, last_seqno, unackedBytes,
+         consumer->logHeader(), vb_, last_seqno.load(), unackedBytes,
          getEndStreamStatusStr(status));
      return unackedBytes;
  }
@@@ -1172,12 -1216,12 +1172,12 @@@ ENGINE_ERROR_CODE PassiveStream::messag
          {
              MutationResponse* m = static_cast<MutationResponse*>(resp);
              uint64_t bySeqno = m->getBySeqno();
-             if (bySeqno <= last_seqno) {
+             if (bySeqno <= last_seqno.load()) {
                  LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous (out of "
 -                    "sequence) mutation received, with opaque: %ld, its "
 +                    "sequence) mutation received, with opaque: %u, its "
                      "seqno (%llu) is not greater than last received seqno "
                      "(%llu); Dropping mutation!", consumer->logHeader(),
-                     vb_, opaque_, bySeqno, last_seqno);
+                     vb_, opaque_, bySeqno, last_seqno.load());
                  delete m;
                  return ENGINE_ERANGE;
              }
              SnapshotMarker* s = static_cast<SnapshotMarker*>(resp);
              uint64_t snapStart = s->getStartSeqno();
              uint64_t snapEnd = s->getEndSeqno();
-             if (snapStart < last_seqno && snapEnd <= last_seqno) {
+             if (snapStart < last_seqno.load() && snapEnd <= last_seqno.load()) {
                  LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous snapshot "
 -                    "marker received, with opaque: %ld, its start (%llu), and"
 +                    "marker received, with opaque: %u, its start (%llu), and"
                      "end (%llu) are less than last received seqno (%llu); "
                      "Dropping marker!", consumer->logHeader(), vb_, opaque_,
-                     snapStart, snapEnd, last_seqno);
+                     snapStart, snapEnd, last_seqno.load());
                  delete s;
                  return ENGINE_ERANGE;
              }
@@@ -1396,9 -1426,21 +1396,9 @@@ ENGINE_ERROR_CODE PassiveStream::proces
  void PassiveStream::processMarker(SnapshotMarker* marker) {
      RCPtr<VBucket> vb = engine->getVBucket(vb_);
  
-     cur_snapshot_start = marker->getStartSeqno();
-     cur_snapshot_end = marker->getEndSeqno();
-     cur_snapshot_type = (marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory;
+     cur_snapshot_start.store(marker->getStartSeqno());
+     cur_snapshot_end.store(marker->getEndSeqno());
+     cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory);
 -    saveSnapshot = true;
  
      if (vb) {
          if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
@@@ -1460,7 -1501,8 +1460,7 @@@ void PassiveStream::handleSnapshotEnd(R
              }
              cur_snapshot_ack = false;
          }
-         cur_snapshot_type = none;
+         cur_snapshot_type.store(none);
 -        vb->setCurrentSnapshot(byseqno, byseqno);
      }
  }
  
@@@ -443,12 -428,13 +443,12 @@@ private
  
      EventuallyPersistentEngine* engine;
      dcp_consumer_t consumer;
-     uint64_t last_seqno;
+     AtomicValue<uint64_t> last_seqno;
  
-     uint64_t cur_snapshot_start;
-     uint64_t cur_snapshot_end;
-     snapshot_type_t cur_snapshot_type;
+     AtomicValue<uint64_t> cur_snapshot_start;
+     AtomicValue<uint64_t> cur_snapshot_end;
+     AtomicValue<snapshot_type_t> cur_snapshot_type;
      bool cur_snapshot_ack;
 -    bool saveSnapshot;
  
      struct Buffer {
          Buffer() : bytes(0), items(0) {}
Simple merge