MB-23637: Extra logging for dcp-vbtakeover stat & refactor stream type 62/76162/9
authorDaniel Owen <owend@couchbase.com>
Sat, 1 Apr 2017 14:00:18 +0000 (15:00 +0100)
committerDave Rigby <daver@couchbase.com>
Fri, 7 Apr 2017 07:55:26 +0000 (07:55 +0000)
The dcp-vbtakeover stat is critical to the success of rebalance.
Therefore it is useful for debugging future rebalance failures to log
the unexpected paths and error paths.

We want to print the stream type in the logging output.  Therefore the
stream type has been refactored into a enum class and a to_string
method provided to print a textual representation of the type.

Change-Id: I72ce7c6bd1f923f3cfa7557f1b150f97aa5fa7f9
Reviewed-on: http://review.couchbase.org/76162
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
src/dcp/backfill_disk.cc
src/dcp/producer.cc
src/dcp/stream.cc
src/dcp/stream.h

index b3b9884..588041e 100644 (file)
@@ -40,11 +40,11 @@ CacheCallback::CacheCallback(EventuallyPersistentEngine& e, active_stream_t& s)
     if (stream_.get() == nullptr) {
         throw std::invalid_argument("CacheCallback(): stream is NULL");
     }
-    if (stream_.get()->getType() != STREAM_ACTIVE) {
+    if (!stream_.get()->isTypeActive()) {
         throw std::invalid_argument(
                 "CacheCallback(): stream->getType() "
                 "(which is " +
-                std::to_string(stream_.get()->getType()) + ") is not ACTIVE");
+                to_string(stream_.get()->getType()) + ") is not Active");
     }
 }
 
@@ -91,11 +91,11 @@ DiskCallback::DiskCallback(active_stream_t& s) : stream_(s) {
     if (stream_.get() == nullptr) {
         throw std::invalid_argument("DiskCallback(): stream is NULL");
     }
-    if (stream_.get()->getType() != STREAM_ACTIVE) {
+    if (!stream_.get()->isTypeActive()) {
         throw std::invalid_argument(
                 "DiskCallback(): stream->getType() "
                 "(which is " +
-                std::to_string(stream_.get()->getType()) + ") is not ACTIVE");
+                to_string(stream_.get()->getType()) + ") is not Active");
     }
 }
 
index d34367e..322c214 100644 (file)
@@ -706,7 +706,7 @@ bool DcpProducer::handleResponse(protocol_binary_response_header* resp) {
         auto itr = streams.find_if(
             [opaque](const StreamsMap::value_type& s) {
                 const auto& stream = s.second;
-                if (stream && stream->getType() == STREAM_ACTIVE) {
+                if (stream && stream->isTypeActive()) {
                     ActiveStream* as = static_cast<ActiveStream*>(stream.get());
                     return (as && opaque == stream->getOpaque());
                 } else {
@@ -844,9 +844,20 @@ void DcpProducer::addTakeoverStats(ADD_STAT add_stat, const void* c,
                                    const VBucket& vb) {
 
     auto stream = findStream(vb.getId());
-    if (stream && stream->getType() == STREAM_ACTIVE) {
-        ActiveStream* as = static_cast<ActiveStream*>(stream.get());
-        as->addTakeoverStats(add_stat, c, vb);
+    if (stream) {
+        if (stream->isTypeActive()) {
+            ActiveStream* as = static_cast<ActiveStream*>(stream.get());
+            as->addTakeoverStats(add_stat, c, vb);
+        } else {
+            LOG(EXTENSION_LOG_WARNING, "%s (vb:%" PRIu16 ") "
+                "DcpProducer::addTakeoverStats Stream type is %s and not the "
+                "expected Active",
+                logHeader(), vb.getId(), to_string(stream->getType()).c_str());
+        }
+    } else {
+        LOG(EXTENSION_LOG_NOTICE, "%s (vb:%" PRIu16 ") "
+            "DcpProducer::addTakeoverStats Unable to find stream",
+            logHeader(), vb.getId());
     }
 }
 
@@ -1094,7 +1105,7 @@ size_t DcpProducer::getItemsRemaining() {
     size_t remainingSize = 0;
     streams.for_each(
         [&remainingSize](const StreamsMap::value_type& iter) {
-            if (iter.second->getType() == STREAM_ACTIVE) {
+            if (iter.second->isTypeActive()) {
                 ActiveStream *as = static_cast<ActiveStream *>(iter.second.get());
                 remainingSize += as->getItemsRemaining();
             }
index d7a6198..7a621d4 100644 (file)
@@ -47,17 +47,37 @@ const char* to_string(Stream::Snapshot type) {
             "Snapshot type:" + std::to_string(int(type)));
 }
 
+const std::string to_string(Stream::Type type) {
+    switch (type) {
+    case Stream::Type::Active:
+        return "Active";
+    case Stream::Type::Notifier:
+        return "Notifier";
+    case Stream::Type::Passive:
+        return "Passive";
+    }
+    throw std::logic_error("to_string(Stream::Type): called with invalid "
+            "type:" + std::to_string(int(type)));
+}
+
 const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
 
 Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
                uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
                uint64_t vb_uuid, uint64_t snap_start_seqno,
-               uint64_t snap_end_seqno)
-    : name_(name), flags_(flags), opaque_(opaque), vb_(vb),
-      start_seqno_(start_seqno), end_seqno_(end_seqno), vb_uuid_(vb_uuid),
+               uint64_t snap_end_seqno, Type type)
+    : name_(name),
+      flags_(flags),
+      opaque_(opaque),
+      vb_(vb),
+      start_seqno_(start_seqno),
+      end_seqno_(end_seqno),
+      vb_uuid_(vb_uuid),
       snap_start_seqno_(snap_start_seqno),
       snap_end_seqno_(snap_end_seqno),
-      state_(StreamState::Pending), itemsReady(false),
+      state_(StreamState::Pending),
+      type_(type),
+      itemsReady(false),
       readyQ_non_meta_items(0),
       readyQueueMemory(0) {
 }
@@ -89,6 +109,10 @@ const std::string Stream::to_string(Stream::StreamState st) {
         "Stream::to_string(StreamState): " + std::to_string(int(st)));
 }
 
+bool Stream::isTypeActive() const {
+    return type_ == Type::Active;
+}
+
 bool Stream::isActive() const {
     return state_.load() != StreamState::Dead;
 }
@@ -216,7 +240,8 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
              en_seqno,
              vb_uuid,
              snap_start_seqno,
-             snap_end_seqno),
+             snap_end_seqno,
+             Type::Active),
       isBackfillTaskRunning(false),
       pendingBackfill(false),
       lastReadSeqno(st_seqno),
@@ -257,8 +282,6 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
     backfillItems.disk = 0;
     backfillItems.sent = 0;
 
-    type_ = STREAM_ACTIVE;
-
     bufferedBackfill.bytes = 0;
     bufferedBackfill.items = 0;
 
@@ -1422,7 +1445,7 @@ NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
                                uint64_t snap_start_seqno,
                                uint64_t snap_end_seqno)
     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
-             snap_start_seqno, snap_end_seqno),
+             snap_start_seqno, snap_end_seqno, Type::Notifier),
       producer(p) {
     LockHolder lh(streamMutex);
     RCPtr<VBucket> vbucket = e->getVBucket(vb_);
@@ -1432,8 +1455,6 @@ NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
         itemsReady.store(true);
     }
 
-    type_ = STREAM_NOTIFIER;
-
     producer->getLogger().log(EXTENSION_LOG_NOTICE,
         "(vb %d) stream created with start seqno %" PRIu64 " and end seqno %"
         PRIu64, vb, st_seqno, en_seqno);
@@ -1529,11 +1550,10 @@ PassiveStream::PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t c,
                              uint64_t snap_start_seqno, uint64_t snap_end_seqno,
                              uint64_t vb_high_seqno)
     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
-             snap_start_seqno, snap_end_seqno),
+             snap_start_seqno, snap_end_seqno, Type::Passive),
       engine(e), consumer(c), last_seqno(vb_high_seqno), cur_snapshot_start(0),
       cur_snapshot_end(0), cur_snapshot_type(Snapshot::None), cur_snapshot_ack(false) {
     LockHolder lh(streamMutex);
-    type_ = STREAM_PASSIVE;
     streamRequest_UNLOCKED(vb_uuid);
     itemsReady.store(true);
 }
index 2e5da6c..d326bae 100644 (file)
@@ -51,12 +51,6 @@ enum end_stream_status_t {
     END_STREAM_SLOW
 };
 
-enum stream_type_t {
-    STREAM_ACTIVE,
-    STREAM_NOTIFIER,
-    STREAM_PASSIVE
-};
-
 enum process_items_error_t {
     all_processed,
     more_to_process,
@@ -71,16 +65,28 @@ enum backfill_source_t {
 class Stream : public RCValue {
 public:
 
+    enum class Type {
+        Active,
+        Notifier,
+        Passive
+    };
+
     enum class Snapshot {
            None,
            Disk,
            Memory
-       };
-
-    Stream(const std::string &name, uint32_t flags, uint32_t opaque,
-           uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
-           uint64_t vb_uuid, uint64_t snap_start_seqno,
-           uint64_t snap_end_seqno);
+    };
+
+    Stream(const std::string &name,
+           uint32_t flags,
+           uint32_t opaque,
+           uint16_t vb,
+           uint64_t start_seqno,
+           uint64_t end_seqno,
+           uint64_t vb_uuid,
+           uint64_t snap_start_seqno,
+           uint64_t snap_end_seqno,
+           Type type);
 
     virtual ~Stream();
 
@@ -100,8 +106,6 @@ public:
 
     uint64_t getSnapEndSeqno() { return snap_end_seqno_; }
 
-    stream_type_t getType() { return type_; }
-
     virtual void addStats(ADD_STAT add_stat, const void *c);
 
     virtual DcpResponse* next() = 0;
@@ -118,6 +122,11 @@ public:
         // Stream defaults to do nothing
     }
 
+    Type getType() { return type_; }
+
+    /// @returns true if the stream type is Active
+    bool isTypeActive() const;
+
     /// @returns true if state_ is not Dead
     bool isActive() const;
 
@@ -178,7 +187,7 @@ protected:
     uint64_t snap_start_seqno_;
     uint64_t snap_end_seqno_;
     std::atomic<StreamState> state_;
-    stream_type_t type_;
+    Type type_;
 
     std::atomic<bool> itemsReady;
     std::mutex streamMutex;
@@ -200,7 +209,7 @@ private:
 };
 
 const char* to_string(Stream::Snapshot type);
-
+const std::string to_string(Stream::Type type);
 
 class ActiveStreamCheckpointProcessorTask;