Merge remote-tracking branch 'couchbase/3.0.x' into sherlock 08/65008/1
authorDave Rigby <daver@couchbase.com>
Fri, 17 Jun 2016 11:41:12 +0000 (12:41 +0100)
committerDave Rigby <daver@couchbase.com>
Fri, 17 Jun 2016 11:41:12 +0000 (12:41 +0100)
* couchbase/3.0.x:
  MB-19253: Fix race in void ExecutorPool::doWorkerStat
  MB-19252: Fix data race on Stream::readyQueueMemory
  MB-19251: Fix race in updating Vbucket.file{SpaceUsed,Size}
  MB-19249: Address possible data races in ConnHandler context
  MB-19248: Fix race in TaskQueue.{ready,future,pending}Queue access
  MB-19247: Fix possible data race in workload.h: workloadPattern
  MB-19246: Fix potentially incorrect persist_time in OBSERVE response
  MB-19229: Address possible data race in vbucket.cc: numHpChks
  MB-19228: Address possible data races in ActiveStream context
  MB-19227: Fix race in ConnNotifier.task access

Change-Id: I184b86cd800e406b5be96ec5f7c456e73f54b05c

1  2 
src/dcp-stream.cc
src/dcp-stream.h
src/ep.cc
src/ep.h
src/executorthread.cc
src/executorthread.h
src/tapconnection.h
src/vbucket.cc
src/vbucket.h

@@@ -65,7 -208,8 +65,8 @@@ void Stream::pushToReadyQ(DcpResponse* 
  {
      if (resp) {
          readyQ.push(resp);
-         readyQueueMemory += resp->getMessageSize();
+         readyQueueMemory.fetch_add(resp->getMessageSize(),
 -                                   memory_order_relaxed);
++                                   std::memory_order_relaxed);
      }
  }
  
@@@ -75,20 -219,20 +76,20 @@@ void Stream::popFromReadyQ(void
          uint32_t respSize = readyQ.front()->getMessageSize();
          readyQ.pop();
          /* Decrement the readyQ size */
-         if ((readyQueueMemory - respSize) <= readyQueueMemory) {
-             readyQueueMemory -= respSize;
 -        if (respSize <= readyQueueMemory.load(memory_order_relaxed)) {
 -            readyQueueMemory.fetch_sub(respSize, memory_order_relaxed);
++        if (respSize <= readyQueueMemory.load(std::memory_order_relaxed)) {
++            readyQueueMemory.fetch_sub(respSize, std::memory_order_relaxed);
          } else {
              LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
                  "underflow, likely wrong stat calculation! curr size: %llu;"
-                 "new size: %d", name_.c_str(), getVBucket(), readyQueueMemory,
 -                "new size: %d", name_.c_str(), getVBucket(), readyQueueMemory.load(),
--                respSize);
-             readyQueueMemory = 0;
 -            readyQueueMemory.store(0, memory_order_relaxed);
++                "new size: %d", name_.c_str(), getVBucket(),
++                readyQueueMemory.load(std::memory_order_relaxed), respSize);
++            readyQueueMemory.store(0, std::memory_order_relaxed);
          }
      }
  }
  
  uint64_t Stream::getReadyQueueMemory() {
-     return readyQueueMemory;
 -    return readyQueueMemory.load(memory_order_relaxed);
++    return readyQueueMemory.load(std::memory_order_relaxed);
  }
  
  const char * Stream::stateName(stream_state_t st) const {
@@@ -306,9 -421,8 +307,9 @@@ void ActiveStream::completeBackfill() 
      {
          LockHolder lh(streamMutex);
          LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Backfill complete, %d items read"
 -            " from disk, last seqno read: %ld", producer->logHeader(), vb_,
 -            itemsFromBackfill, lastReadSeqno.load());
 +            " from disk %d from memory, last seqno read: %llu",
 +            producer->logHeader(), vb_, backfillItems.disk.load(),
-             backfillItems.memory.load(), lastReadSeqno);
++            backfillItems.memory.load(), lastReadSeqno.load());
      }
  
      isBackfillTaskRunning.store(false);
@@@ -379,8 -488,8 +380,8 @@@ DcpResponse* ActiveStream::backfillPhas
      }
  
      if (!isBackfillTaskRunning && readyQ.empty()) {
-         backfillRemaining = 0;
-         if (lastReadSeqno >= end_seqno_) {
 -        backfillRemaining.store(0, memory_order_relaxed);
++        backfillRemaining.store(0, std::memory_order_relaxed);
+         if (lastReadSeqno.load() >= end_seqno_) {
              endStream(END_STREAM_OK);
          } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
              transitionState(STREAM_TAKEOVER_SEND);
@@@ -441,27 -550,16 +442,27 @@@ DcpResponse* ActiveStream::deadPhase() 
  void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
      Stream::addStats(add_stat, c);
  
 -    const int bsize = 128;
 +    const int bsize = 1024;
      char buffer[bsize];
 -    snprintf(buffer, bsize, "%s:stream_%d_backfilled", name_.c_str(), vb_);
 -    add_casted_stat(buffer, itemsFromBackfill, add_stat, c);
 -    snprintf(buffer, bsize, "%s:stream_%d_memory", name_.c_str(), vb_);
 -    add_casted_stat(buffer, itemsFromMemory, add_stat, c);
 +    snprintf(buffer, bsize, "%s:stream_%d_backfill_disk_items",
 +             name_.c_str(), vb_);
 +    add_casted_stat(buffer, backfillItems.disk, add_stat, c);
 +    snprintf(buffer, bsize, "%s:stream_%d_backfill_mem_items",
 +             name_.c_str(), vb_);
 +    add_casted_stat(buffer, backfillItems.memory, add_stat, c);
 +    snprintf(buffer, bsize, "%s:stream_%d_backfill_sent", name_.c_str(), vb_);
 +    add_casted_stat(buffer, backfillItems.sent, add_stat, c);
 +    snprintf(buffer, bsize, "%s:stream_%d_memory_phase", name_.c_str(), vb_);
 +    add_casted_stat(buffer, itemsFromMemoryPhase, add_stat, c);
      snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
-     add_casted_stat(buffer, lastSentSeqno, add_stat, c);
+     add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
 +    snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
 +             name_.c_str(), vb_);
 +    add_casted_stat(buffer,
 +                    lastSentSnapEndSeqno.load(std::memory_order_relaxed),
 +                    add_stat, c);
      snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno", name_.c_str(), vb_);
-     add_casted_stat(buffer, lastReadSeqno, add_stat, c);
+     add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
      snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
      add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
      snprintf(buffer, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
@@@ -744,12 -828,10 +745,12 @@@ void ActiveStream::endStream(end_stream
              pushToReadyQ(new StreamEndResponse(opaque_, reason, vb_));
          }
          transitionState(STREAM_DEAD);
 -        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream closing, %llu items sent"
 -            " from disk, %llu items sent from memory, %llu was last seqno sent"
 -            " %s is the reason", producer->logHeader(), vb_, itemsFromBackfill,
 -            itemsFromMemory.load(), lastSentSeqno.load(),
 +        LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRId16 ") Stream closing, "
 +            "%" PRIu64 " items sent from backfill phase, %" PRIu64 " items "
 +            "sent from memory phase, %" PRIu64 " was last seqno sent, "
 +            "reason: %s", producer->logHeader(), vb_,
 +            uint64_t(backfillItems.sent.load()),
-             uint64_t(itemsFromMemoryPhase), lastSentSeqno,
++            uint64_t(itemsFromMemoryPhase), lastSentSeqno.load(),
              getEndStreamStatusStr(reason));
      }
  }
@@@ -167,10 -154,14 +167,13 @@@ protected
      const static uint64_t dcpMaxSeqno;
  
  private:
-     /* This tracks the memory occupied by elements in the readyQ */
-     uint64_t readyQueueMemory;
+     /* readyQueueMemory tracks the memory occupied by elements
+      * in the readyQ.  It is an atomic because otherwise
+        getReadyQueueMemory would need to acquire streamMutex.
+      */
+     AtomicValue <uint64_t> readyQueueMemory;
  };
  
 -typedef RCPtr<Stream> stream_t;
  
  class ActiveStreamCheckpointProcessorTask;
  
@@@ -269,16 -261,17 +272,20 @@@ private
      uint64_t curChkSeqno;
      //! The current vbucket state to send in the takeover stream
      vbucket_state_t takeoverState;
-     //! The amount of items remaining to be read from disk
-     size_t backfillRemaining;
+     /* backfillRemaining is a stat recording the amount of
+      * items remaining to be read from disk.  It is an atomic
+      * because otherwise the function incrBackfillRemaining
+      * must acquire the streamMutex lock.
+      */
+     AtomicValue <size_t> backfillRemaining;
 -
 -    //! The amount of items that have been read from disk
 -    size_t itemsFromBackfill;
 -    //! The amount of items that have been read from memory
 -    AtomicValue<size_t> itemsFromMemory;
 +    //! Stats to track items read and sent from the backfill phase
 +    struct {
 +        AtomicValue<size_t> memory;
 +        AtomicValue<size_t> disk;
 +        AtomicValue<size_t> sent;
 +    } backfillItems;
 +    //! The amount of items that have been sent during the memory phase
 +    size_t itemsFromMemoryPhase;
      //! Whether ot not this is the first snapshot marker sent
      bool firstMarkerSent;
  
diff --cc src/ep.cc
Simple merge
diff --cc src/ep.h
Simple merge
Simple merge
Simple merge
Simple merge
diff --cc src/vbucket.cc
Simple merge
diff --cc src/vbucket.h
@@@ -516,21 -456,9 +516,21 @@@ private
  
      Mutex hpChksMutex;
      std::list<HighPriorityVBEntry> hpChks;
-     volatile size_t numHpChks; // size of list hpChks (to avoid MB-9434)
+     AtomicValue<size_t> numHpChks; // size of list hpChks (to avoid MB-9434)
      KVShard *shard;
  
 +    Mutex bfMutex;
 +    BloomFilter *bFilter;
 +    BloomFilter *tempFilter;    // Used during compaction.
 +
 +    /**
 +     * The following list is to contain pending notifications
 +     * that need to be alerted whenever the desired sequence
 +     * numbers have been persisted.
 +     */
 +    Mutex persistedNotificationsMutex;
 +    std::list<shared_ptr<Callback<uint64_t>> > persistedNotifications;
 +
      static size_t chkFlushTimeout;
  
      DISALLOW_COPY_AND_ASSIGN(VBucket);