MB-19897: Record time for all DCP consumer messages 79/64879/3
authorDaniel Owen <owend@couchbase.com>
Mon, 25 Apr 2016 13:06:41 +0000 (14:06 +0100)
committerDave Rigby <daver@couchbase.com>
Mon, 13 Jun 2016 09:25:22 +0000 (09:25 +0000)
The DCP documentation states that the consumer should see
some sort of message or a No-Op message in a period
equal to twice the noop interval otherwise it should close
its connection.  See documentation/commands/no-op.md in
https://github.com/couchbaselabs/dcp-documentation

This patch changes from checking only the receival of a
no-op message to check for recieving the following messages
- add stream
- close stream
- deletion
- expiration
- flush
- mutation
- set VBucket state
- snapshot Marker
- stream end

Change-Id: Ib2268dba339cbf3701f3c7782ee8256bddc79ba3
Reviewed-on: http://review.couchbase.org/64879
Tested-by: buildbot <build@couchbase.com>
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
src/dcp-consumer.cc
src/dcp-consumer.h

index b856629..b009f36 100644 (file)
@@ -74,7 +74,7 @@ DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
       opaqueCounter(0),
       processTaskId(0),
       itemsToProcess(false),
-      lastNoopTime(ep_current_time()),
+      lastMessageTime(ep_current_time()),
       backoffs(0),
       processBufferedMessagesYieldThreshold(engine.getConfiguration().
                                                 getDcpConsumerProcessBufferedMessagesYieldLimit()),
@@ -155,6 +155,7 @@ DcpConsumer::~DcpConsumer() {
 
 ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
                                          uint32_t flags) {
+    lastMessageTime = ep_current_time();
     LockHolder lh(readyMutex);
     if (doDisconnect()) {
         return ENGINE_DISCONNECT;
@@ -206,6 +207,7 @@ ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
 }
 
 ENGINE_ERROR_CODE DcpConsumer::closeStream(uint32_t opaque, uint16_t vbucket) {
+    lastMessageTime = ep_current_time();
     if (doDisconnect()) {
         streams[vbucket].reset();
         return ENGINE_DISCONNECT;
@@ -231,6 +233,7 @@ ENGINE_ERROR_CODE DcpConsumer::closeStream(uint32_t opaque, uint16_t vbucket) {
 
 ENGINE_ERROR_CODE DcpConsumer::streamEnd(uint32_t opaque, uint16_t vbucket,
                                          uint32_t flags) {
+    lastMessageTime = ep_current_time();
     if (doDisconnect()) {
         return ENGINE_DISCONNECT;
     }
@@ -274,6 +277,7 @@ ENGINE_ERROR_CODE DcpConsumer::mutation(uint32_t opaque, const void* key,
                                         uint64_t bySeqno, uint64_t revSeqno,
                                         uint32_t exptime, uint8_t nru,
                                         const void* meta, uint16_t nmeta) {
+    lastMessageTime = ep_current_time();
     if (doDisconnect()) {
         return ENGINE_DISCONNECT;
     }
@@ -334,6 +338,7 @@ ENGINE_ERROR_CODE DcpConsumer::deletion(uint32_t opaque, const void* key,
                                         uint16_t vbucket, uint64_t bySeqno,
                                         uint64_t revSeqno, const void* meta,
                                         uint16_t nmeta) {
+    lastMessageTime = ep_current_time();
     if (doDisconnect()) {
         return ENGINE_DISCONNECT;
     }
@@ -393,6 +398,7 @@ ENGINE_ERROR_CODE DcpConsumer::expiration(uint32_t opaque, const void* key,
                                           uint16_t vbucket, uint64_t bySeqno,
                                           uint64_t revSeqno, const void* meta,
                                           uint16_t nmeta) {
+    // lastMessageTime is set in deletion function
     return deletion(opaque, key, nkey, cas, vbucket, bySeqno, revSeqno, meta,
                     nmeta);
 }
@@ -402,6 +408,7 @@ ENGINE_ERROR_CODE DcpConsumer::snapshotMarker(uint32_t opaque,
                                               uint64_t start_seqno,
                                               uint64_t end_seqno,
                                               uint32_t flags) {
+    lastMessageTime = ep_current_time();
     if (doDisconnect()) {
         return ENGINE_DISCONNECT;
     }
@@ -441,11 +448,12 @@ ENGINE_ERROR_CODE DcpConsumer::snapshotMarker(uint32_t opaque,
 }
 
 ENGINE_ERROR_CODE DcpConsumer::noop(uint32_t opaque) {
-    lastNoopTime = ep_current_time();
+    lastMessageTime = ep_current_time();
     return ENGINE_SUCCESS;
 }
 
 ENGINE_ERROR_CODE DcpConsumer::flush(uint32_t opaque, uint16_t vbucket) {
+    lastMessageTime = ep_current_time();
     if (doDisconnect()) {
         return ENGINE_DISCONNECT;
     }
@@ -456,6 +464,7 @@ ENGINE_ERROR_CODE DcpConsumer::flush(uint32_t opaque, uint16_t vbucket) {
 ENGINE_ERROR_CODE DcpConsumer::setVBucketState(uint32_t opaque,
                                                uint16_t vbucket,
                                                vbucket_state_t state) {
+    lastMessageTime = ep_current_time();
     if (doDisconnect()) {
         return ENGINE_DISCONNECT;
     }
@@ -903,9 +912,10 @@ ENGINE_ERROR_CODE DcpConsumer::handleNoop(struct dcp_message_producers* producer
         return ret;
     }
 
-    if ((ep_current_time() - lastNoopTime) > (noopInterval * 2)) {
-        LOG(EXTENSION_LOG_WARNING, "%s Disconnecting because noop message has "
-            "not been received for %u seconds", logHeader(), (noopInterval * 2));
+    if ((ep_current_time() - lastMessageTime) > (noopInterval * 2)) {
+        LOG(EXTENSION_LOG_WARNING, "%s Disconnecting because a message has "
+            "not been received for %u seconds. lastMessageTime was %u seconds ago.",
+            logHeader(), (noopInterval * 2), (ep_current_time() - lastMessageTime));
         return ENGINE_DISCONNECT;
     }
 
index 5710781..e0f27ac 100644 (file)
@@ -153,7 +153,7 @@ private:
     std::vector<passive_stream_t> streams;
     opaque_map opaqueMap_;
 
-    rel_time_t lastNoopTime;
+    rel_time_t lastMessageTime;
     uint32_t backoffs;
     uint32_t noopInterval;
     bool enableNoop;