Merge remote-tracking branch 'couchbase/3.0.x' into sherlock 87/65187/3
authorDave Rigby <daver@couchbase.com>
Thu, 23 Jun 2016 10:44:53 +0000 (11:44 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 23 Jun 2016 10:58:57 +0000 (11:58 +0100)
* commit 'a430629':
  MB-19278: Fix lock-order inversion on ActiveStream::streamMutex
  MB-19277: Set executorThread's waketime to atomic
  MB-19276: Fix data race on ExecutorThread::taskStart
  MB-19275: Address data race on a DCP stream's state
  MB-19273: Fix data race on PassiveStream::buffer.{bytes,items}
  MB-19260: Make cookie atomic to serialize set/get in ConnHandler
  MB-19259: Fix data race on DcpConsumer::backoffs
  MB-19258: Address data race with replicationThrottle parameters
  MB-19281: [BP] Add template class RelaxedAtomic<>
  MB-19257: Fix data race on ExecutorThread::now
  MB-19256: Address possible data race on VBCBAdaptor::currentvb

Further merge of mostly TSan fixes from 3.0.x into sherlock.

Change-Id: Ic88c446c4e09d669f7a4da7f8cb2f97c13d70ab7

1  2 
src/dcp-consumer.h
src/dcp-stream.cc
src/dcp-stream.h
src/ep.cc
src/ep.h
src/executorthread.cc
src/executorthread.h
src/relaxed_atomic.h
src/tapconnection.cc
src/tapconnection.h

@@@ -150,11 -125,11 +153,11 @@@ private
      Mutex readyMutex;
      std::list<uint16_t> ready;
  
 -    passive_stream_t* streams;
 +    std::vector<passive_stream_t> streams;
      opaque_map opaqueMap_;
  
 -    rel_time_t lastNoopTime;
 +    rel_time_t lastMessageTime;
-     uint32_t backoffs;
+     Couchbase::RelaxedAtomic<uint32_t> backoffs;
      uint32_t noopInterval;
      bool enableNoop;
      bool sendNoopInterval;
@@@ -1464,12 -1506,19 +1466,19 @@@ void PassiveStream::handleSnapshotEnd(R
  void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
      Stream::addStats(add_stat, c);
  
 -    const int bsize = 128;
 +    const int bsize = 1024;
      char buf[bsize];
+     size_t buffer_bytes;
+     size_t buffer_items;
+     {
+         LockHolder lh(buffer.bufMutex);
+         buffer_bytes = buffer.bytes;
+         buffer_items = buffer.items;
+     }
      snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(), vb_);
-     add_casted_stat(buf, buffer.items, add_stat, c);
+     add_casted_stat(buf, buffer_items, add_stat, c);
      snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(), vb_);
-     add_casted_stat(buf, buffer.bytes, add_stat, c);
+     add_casted_stat(buf, buffer_bytes, add_stat, c);
      snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
      add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
      snprintf(buf, bsize, "%s:stream_%d_last_received_seqno", name_.c_str(), vb_);
@@@ -1521,10 -1570,10 +1530,10 @@@ bool PassiveStream::transitionState(str
          consumer->logHeader(), vb_, stateName(state_), stateName(newState));
  
      if (state_ == newState) {
 -        return;
 +        return false;
      }
  
-     switch (state_) {
+     switch (state_.load()) {
          case STREAM_PENDING:
              cb_assert(newState == STREAM_READING || newState == STREAM_DEAD);
              break;
Simple merge
diff --cc src/ep.cc
Simple merge
diff --cc src/ep.h
Simple merge
Simple merge
Simple merge
index 0000000,edf2089..eb86707
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,124 +1,124 @@@
 -#include "atomic.h"
+ /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+ /*
+  *     Copyright 2015 Couchbase, Inc.
+  *
+  *   Licensed under the Apache License, Version 2.0 (the "License");
+  *   you may not use this file except in compliance with the License.
+  *   You may obtain a copy of the License at
+  *
+  *       http://www.apache.org/licenses/LICENSE-2.0
+  *
+  *   Unless required by applicable law or agreed to in writing, software
+  *   distributed under the License is distributed on an "AS IS" BASIS,
+  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  *   See the License for the specific language governing permissions and
+  *   limitations under the License.
+  */
+ #pragma once
 -            value.store(0, memory_order_relaxed);
++#include <atomic>
+ namespace Couchbase {
+     /**
+      * The RelaxedAtomic class wraps std::atomic<> and operates with
+      * relaxed memory ordering.
+      */
+     template<typename T>
+     class RelaxedAtomic {
+     public:
+         RelaxedAtomic() {
 -            value.store(initial, memory_order_relaxed);
++            value.store(0, std::memory_order_relaxed);
+         }
+         RelaxedAtomic(const T& initial) {
 -            value.store(other.value.load(memory_order_relaxed),
 -                        memory_order_relaxed);
++            value.store(initial, std::memory_order_relaxed);
+         }
+         explicit RelaxedAtomic(const RelaxedAtomic& other) {
 -            return value.load(memory_order_relaxed);
++            value.store(other.value.load(std::memory_order_relaxed),
++                        std::memory_order_relaxed);
+         }
+         operator T() const {
 -            return value.load(memory_order_relaxed);
++            return value.load(std::memory_order_relaxed);
+         }
+         T load() const {
 -            value.store(rhs.load(), memory_order_relaxed);
++            return value.load(std::memory_order_relaxed);
+         }
+         RelaxedAtomic& operator=(const RelaxedAtomic& rhs) {
 -            value.fetch_add(rhs, memory_order_relaxed);
++            value.store(rhs.load(), std::memory_order_relaxed);
+             return *this;
+         }
+         RelaxedAtomic& operator+=(const T rhs) {
 -            value.fetch_add(rhs.value.load(memory_order_relaxed),
 -                            memory_order_relaxed);
++            value.fetch_add(rhs, std::memory_order_relaxed);
+             return *this;
+         }
+         RelaxedAtomic& operator+=(const RelaxedAtomic& rhs) {
 -            value.fetch_sub(rhs, memory_order_relaxed);
++            value.fetch_add(rhs.value.load(std::memory_order_relaxed),
++                            std::memory_order_relaxed);
+             return *this;
+         }
+         RelaxedAtomic& operator-=(const T rhs) {
 -            value.fetch_sub(rhs.value.load(memory_order_relaxed),
 -                            memory_order_relaxed);
++            value.fetch_sub(rhs, std::memory_order_relaxed);
+             return *this;
+         }
+         RelaxedAtomic& operator-=(const RelaxedAtomic& rhs) {
 -            return value.fetch_add(1, memory_order_relaxed) + 1;
++            value.fetch_sub(rhs.value.load(std::memory_order_relaxed),
++                            std::memory_order_relaxed);
+             return *this;
+         }
+         T operator++() {
 -            return value.fetch_add(1, memory_order_relaxed);
++            return value.fetch_add(1, std::memory_order_relaxed) + 1;
+         }
+         T operator++(int) {
 -            return value.fetch_sub(1, memory_order_relaxed) - 1;
++            return value.fetch_add(1, std::memory_order_relaxed);
+         }
+         T operator--() {
 -            return value.fetch_sub(1, memory_order_relaxed);
++            return value.fetch_sub(1, std::memory_order_relaxed) - 1;
+         }
+         T operator--(int) {
 -            value.store(val, memory_order_relaxed);
++            return value.fetch_sub(1, std::memory_order_relaxed);
+         }
+         RelaxedAtomic& operator=(T val) {
 -            value.store(0, memory_order_relaxed);
++            value.store(val, std::memory_order_relaxed);
+             return *this;
+         }
+         void reset() {
 -                T currval = value.load(memory_order_relaxed);
++            value.store(0, std::memory_order_relaxed);
+         }
+         void setIfGreater(const T& val) {
+             do {
 -                                                    memory_order_relaxed)) {
++                T currval = value.load(std::memory_order_relaxed);
+                 if (val > currval) {
+                     if (value.compare_exchange_weak(currval, val,
 -        AtomicValue<T> value;
++                                                    std::memory_order_relaxed)) {
+                         break;
+                     }
+                 } else {
+                     break;
+                 }
+             } while (true);
+         }
+         void setIfGreater(const RelaxedAtomic& val) {
+             setIfGreater(val.load());
+         }
+     private:
++        std::atomic <T> value;
+     };
+ }
Simple merge
Simple merge