MB-16181: Add DCP system-event engine hooks 35/77435/10
authorJim Walker <jim@couchbase.com>
Fri, 10 Mar 2017 10:19:50 +0000 (10:19 +0000)
committerDave Rigby <daver@couchbase.com>
Thu, 18 May 2017 09:15:29 +0000 (09:15 +0000)
Plug methods into the engine callbacks so that we can now have
system events pushed in from memcached.

Change-Id: I064b86542b2ab98f80e33097afe8a60242fd7147
Reviewed-on: http://review.couchbase.org/77435
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
src/dcp/consumer.h
src/ep_engine.cc
src/tapconnection.cc
src/tapconnection.h

index d2db61e..230a94a 100644 (file)
@@ -125,7 +125,7 @@ public:
                                   uint32_t event,
                                   uint64_t bySeqno,
                                   cb::const_byte_buffer key,
-                                  cb::const_byte_buffer eventData);
+                                  cb::const_byte_buffer eventData) override;
 
     bool doRollback(uint32_t opaque, uint16_t vbid, uint64_t rollbackSeqno);
 
index 5266733..adc936b 100644 (file)
@@ -1619,6 +1619,23 @@ static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
     return ENGINE_DISCONNECT;
 }
 
+static ENGINE_ERROR_CODE EvpDcpSystemEvent(ENGINE_HANDLE* handle,
+                                           const void* cookie,
+                                           uint32_t opaque,
+                                           uint16_t vbucket,
+                                           uint32_t event,
+                                           uint64_t bySeqno,
+                                           cb::const_byte_buffer key,
+                                           cb::const_byte_buffer eventData) {
+    auto engine = acquireEngine(handle);
+    ConnHandler* conn = engine->getConnHandler(cookie);
+    if (conn) {
+        return conn->systemEvent(
+                opaque, vbucket, event, bySeqno, key, eventData);
+    }
+    return ENGINE_DISCONNECT;
+}
+
 static void EvpHandleDisconnect(const void* cookie,
                                 ENGINE_EVENT_TYPE type,
                                 const void* event_data,
@@ -1813,6 +1830,7 @@ EventuallyPersistentEngine::EventuallyPersistentEngine(
     ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
     ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
     ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
+    ENGINE_HANDLE_V1::dcp.system_event = EvpDcpSystemEvent;
     ENGINE_HANDLE_V1::set_log_level = EvpSetLogLevel;
 
     serverApi = getServerApiFunc();
index e4d69c5..3beb6fd 100644 (file)
@@ -322,6 +322,18 @@ bool ConnHandler::handleResponse(protocol_binary_response_header* resp) {
     return false;
 }
 
+ENGINE_ERROR_CODE ConnHandler::systemEvent(uint32_t opaque,
+                                           uint16_t vbucket,
+                                           uint32_t event,
+                                           uint64_t bySeqno,
+                                           cb::const_byte_buffer key,
+                                           cb::const_byte_buffer eventData) {
+    logger.log(EXTENSION_LOG_WARNING,
+               "Disconnecting - This connection doesn't "
+               "support the dcp system_event API");
+    return ENGINE_DISCONNECT;
+}
+
 const Logger& ConnHandler::getLogger() const {
     return logger;
 }
index 2d1cb30..77e07e8 100644 (file)
@@ -278,6 +278,13 @@ public:
      */
     virtual bool handleResponse(protocol_binary_response_header* resp);
 
+    virtual ENGINE_ERROR_CODE systemEvent(uint32_t opaque,
+                                          uint16_t vbucket,
+                                          uint32_t event,
+                                          uint64_t bySeqno,
+                                          cb::const_byte_buffer key,
+                                          cb::const_byte_buffer eventData);
+
     EventuallyPersistentEngine& engine() {
         return engine_;
     }