MB-24424: Pass correct parameters when making MutationResponse 02/78302/16
authorDaniel Owen <owend@couchbase.com>
Thu, 18 May 2017 11:50:22 +0000 (12:50 +0100)
committerDave Rigby <daver@couchbase.com>
Wed, 24 May 2017 15:04:59 +0000 (15:04 +0000)
The isKeyOnly parameter was missing when passed into make_unique. This
patch fixes this issue.  In addition it moves the creation of a
PassiveStream into a separate function.  This enables the function that
creates a PassiveStream to be overridden with one that creates a
MockPassiveStream, which is used in testing.

Change-Id: I58e2e8ca06acba24573d1b4a53aeed85dbcecffa
Reviewed-on: http://review.couchbase.org/78302
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
src/dcp/consumer.cc
src/dcp/consumer.h
src/dcp/stream.h
tests/mock/mock_dcp_consumer.h
tests/mock/mock_stream.h
tests/module_tests/dcp_test.cc

index 611bf11..4de6c60 100644 (file)
@@ -149,6 +149,34 @@ void DcpConsumer::taskCancelled() {
     taskAlreadyCancelled.compare_exchange_strong(inverse, true);
 }
 
+SingleThreadedRCPtr<PassiveStream> DcpConsumer::makePassiveStream(
+        EventuallyPersistentEngine& e,
+        dcp_consumer_t consumer,
+        const std::string& name,
+        uint32_t flags,
+        uint32_t opaque,
+        uint16_t vb,
+        uint64_t start_seqno,
+        uint64_t end_seqno,
+        uint64_t vb_uuid,
+        uint64_t snap_start_seqno,
+        uint64_t snap_end_seqno,
+        uint64_t vb_high_seqno) {
+    return SingleThreadedRCPtr<PassiveStream>(
+            new PassiveStream(&e,
+                              consumer,
+                              name,
+                              flags,
+                              opaque,
+                              vb,
+                              start_seqno,
+                              end_seqno,
+                              vb_uuid,
+                              snap_start_seqno,
+                              snap_end_seqno,
+                              vb_high_seqno));
+}
+
 ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
                                          uint32_t flags) {
     lastMessageTime = ep_current_time();
@@ -198,12 +226,18 @@ ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
     }
 
     streams.insert({vbucket,
-                    SingleThreadedRCPtr<PassiveStream>(
-                           new PassiveStream(&engine_, this, getName(), flags,
-                                             new_opaque, vbucket, start_seqno,
-                                             end_seqno, vbucket_uuid,
-                                             snap_start_seqno, snap_end_seqno,
-                                             high_seqno))});
+                    makePassiveStream(engine_,
+                                      this,
+                                      getName(),
+                                      flags,
+                                      new_opaque,
+                                      vbucket,
+                                      start_seqno,
+                                      end_seqno,
+                                      vbucket_uuid,
+                                      snap_start_seqno,
+                                      snap_end_seqno,
+                                      high_seqno)});
     ready.push_back(vbucket);
     opaqueMap_[new_opaque] = std::make_pair(opaque, vbucket);
 
@@ -321,7 +355,10 @@ ENGINE_ERROR_CODE DcpConsumer::mutation(uint32_t opaque,
 
         try {
             err = stream->messageReceived(
-                    std::make_unique<MutationResponse>(item, opaque, emd));
+                    std::make_unique<MutationResponse>(item,
+                                                       opaque,
+                                                       /*isKeyOnly*/false,
+                                                       emd));
         } catch (const std::bad_alloc&) {
             delete emd;
             return ENGINE_ENOMEM;
@@ -386,7 +423,10 @@ ENGINE_ERROR_CODE DcpConsumer::deletion(uint32_t opaque,
 
         try {
             err = stream->messageReceived(
-                    std::make_unique<MutationResponse>(item, opaque, emd));
+                    std::make_unique<MutationResponse>(item,
+                                                       opaque,
+                                                       /*isKeyOnly*/false,
+                                                       emd));
         } catch (const std::bad_alloc&) {
             delete emd;
             return ENGINE_ENOMEM;
index 5683fef..9c7513b 100644 (file)
@@ -38,6 +38,38 @@ public:
 
     virtual ~DcpConsumer();
 
+    /*
+     * Creates a PassiveStream.
+     *
+     * @param e Reference to the engine
+     * @param consumer The consumer the new stream will belong to
+     * @param name The name of the new stream
+     * @param flags The DCP flags
+     * @param opaque The stream opaque
+     * @param vb The vbucket the stream belongs to
+     * @param start_seqno The start sequence number of the stream
+     * @param end_seqno The end sequence number of the stream
+     * @param vb_uuid The uuid of the vbucket the stream belongs to
+     * @param snap_start_seqno The snapshot start sequence number
+     * @param snap_end_seqno The snapshot end sequence number
+     * @param vb_high_seqno The last received sequence number
+     *
+     * @return a SingleThreadedRCPtr to the newly created PassiveStream.
+     */
+    virtual SingleThreadedRCPtr<PassiveStream> makePassiveStream(
+            EventuallyPersistentEngine& e,
+            dcp_consumer_t consumer,
+            const std::string& name,
+            uint32_t flags,
+            uint32_t opaque,
+            uint16_t vb,
+            uint64_t start_seqno,
+            uint64_t end_seqno,
+            uint64_t vb_uuid,
+            uint64_t snap_start_seqno,
+            uint64_t snap_end_seqno,
+            uint64_t vb_high_seqno);
+
     ENGINE_ERROR_CODE addStream(uint32_t opaque, uint16_t vbucket,
                                 uint32_t flags) override;
 
index 9b71109..602124a 100644 (file)
@@ -551,7 +551,14 @@ public:
     void reconnectStream(VBucketPtr &vb, uint32_t new_opaque,
                          uint64_t start_seqno);
 
-    ENGINE_ERROR_CODE messageReceived(std::unique_ptr<DcpResponse> response);
+    /*
+     * Calls the appropriate function to process the message.
+     *
+     * @params response The dcp message that needs to be processed.
+     * @returns the error code from processing the message.
+     */
+    virtual ENGINE_ERROR_CODE messageReceived(
+            std::unique_ptr<DcpResponse> response);
 
     void addStats(ADD_STAT add_stat, const void *c);
 
index 24bc438..292146f 100644 (file)
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "dcp/consumer.h"
+#include "mock_stream.h"
 
 /*
  * Mock of the DcpConsumer class.  Wraps the real DcpConsumer class
@@ -45,4 +46,37 @@ public:
     void public_notifyVbucketReady(uint16_t vbid) {
         notifyVbucketReady(vbid);
     }
-};
\ No newline at end of file
+
+    /*
+     * Creates a PassiveStream.
+     * @return a SingleThreadedRCPtr to the newly created MockPassiveStream.
+     */
+    SingleThreadedRCPtr<PassiveStream> makePassiveStream(
+            EventuallyPersistentEngine& e,
+            dcp_consumer_t consumer,
+            const std::string& name,
+            uint32_t flags,
+            uint32_t opaque,
+            uint16_t vb,
+            uint64_t start_seqno,
+            uint64_t end_seqno,
+            uint64_t vb_uuid,
+            uint64_t snap_start_seqno,
+            uint64_t snap_end_seqno,
+            uint64_t vb_high_seqno) override {
+        return SingleThreadedRCPtr<PassiveStream>(
+                new MockPassiveStream(e,
+                                      consumer,
+                                      name,
+                                      flags,
+                                      opaque,
+                                      vb,
+                                      start_seqno,
+                                      end_seqno,
+                                      vb_uuid,
+                                      snap_start_seqno,
+                                      snap_end_seqno,
+                                      vb_high_seqno));
+    }
+
+};
index 45d8519..8880872 100644 (file)
@@ -126,16 +126,40 @@ public:
  */
 class MockPassiveStream : public PassiveStream {
 public:
-    MockPassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t consumer,
-                      const std::string &name, uint32_t flags, uint32_t opaque,
-                      uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
-                      uint64_t vb_uuid, uint64_t snap_start_seqno,
-                      uint64_t snap_end_seqno, uint64_t vb_high_seqno)
-    : PassiveStream(e, consumer, name, flags, opaque, vb, start_seqno,
-                    end_seqno, vb_uuid, snap_start_seqno, snap_end_seqno,
+    MockPassiveStream(EventuallyPersistentEngine& e,
+                      dcp_consumer_t consumer,
+                      const std::string& name,
+                      uint32_t flags,
+                      uint32_t opaque,
+                      uint16_t vb,
+                      uint64_t start_seqno,
+                      uint64_t end_seqno,
+                      uint64_t vb_uuid,
+                      uint64_t snap_start_seqno,
+                      uint64_t snap_end_seqno,
+                      uint64_t vb_high_seqno)
+    : PassiveStream(&e,
+                    consumer,
+                    name,
+                    flags,
+                    opaque,
+                    vb,
+                    start_seqno,
+                    end_seqno,
+                    vb_uuid,
+                    snap_start_seqno,
+                    snap_end_seqno,
                     vb_high_seqno) {}
 
     void transitionStateToDead() {
         transitionState(StreamState::Dead);
     }
+
+    ENGINE_ERROR_CODE messageReceived(
+            std::unique_ptr<DcpResponse> dcpResponse) override {
+        responseMessageSize = dcpResponse->getMessageSize();
+        return PassiveStream::messageReceived(std::move(dcpResponse));
+    }
+
+    uint32_t responseMessageSize;
 };
index 3d9bfa9..bb7a930 100644 (file)
@@ -1337,6 +1337,111 @@ TEST_F(NotifyTest, test_mb19503_connmap_notify_paused) {
     EXPECT_EQ(1, notifyTest.getCallbacks());
 }
 
+// Tests that the MutationResponse created for the deletion response is of the
+// correct size.
+TEST_F(ConnectionTest, test_mb24424_deleteResponse) {
+    const void* cookie = create_mock_cookie();
+    uint16_t vbid = 0;
+
+    connection_t conn = new MockDcpConsumer(*engine, cookie, "test_consumer");
+    MockDcpConsumer* consumer = dynamic_cast<MockDcpConsumer*>(conn.get());
+
+    ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
+    ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
+                                                  /*flags*/0));
+
+    MockPassiveStream *stream = static_cast<MockPassiveStream*>
+                                       ((consumer->
+                                               getVbucketStream(vbid)).get());
+    ASSERT_TRUE(stream->isActive());
+
+    std::string key = "key";
+    std::string data = R"({"json":"yes"})";
+    const DocKey docKey{ reinterpret_cast<const uint8_t*>(key.data()),
+        key.size(),
+        DocNamespace::DefaultCollection};
+    cb::const_byte_buffer value{reinterpret_cast<const uint8_t*>(data.data()),
+        data.size()};
+    uint8_t extMeta[1] = {uint8_t(PROTOCOL_BINARY_DATATYPE_JSON)};
+    cb::const_byte_buffer meta{extMeta, sizeof(uint8_t)};
+
+    consumer->deletion(/*opaque*/1,
+                       /*key*/docKey,
+                       /*values*/value,
+                       /*priv_bytes*/0,
+                       /*datatype*/PROTOCOL_BINARY_DATATYPE_JSON,
+                       /*cas*/0,
+                       /*vbucket*/vbid,
+                       /*bySeqno*/1,
+                       /*revSeqno*/0,
+                       /*meta*/meta);
+
+    auto messageSize = MutationResponse::deletionBaseMsgBytes +
+            key.size() + data.size() + sizeof(extMeta);
+
+    EXPECT_EQ(messageSize, stream->responseMessageSize);
+
+    /* Close stream before deleting the connection */
+    ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(/*opaque*/0, vbid));
+
+    destroy_mock_cookie(cookie);
+}
+
+// Tests that the MutationResponse created for the mutation response is of the
+// correct size.
+TEST_F(ConnectionTest, test_mb24424_mutationResponse) {
+    const void* cookie = create_mock_cookie();
+    uint16_t vbid = 0;
+
+    connection_t conn = new MockDcpConsumer(*engine, cookie, "test_consumer");
+    MockDcpConsumer* consumer = dynamic_cast<MockDcpConsumer*>(conn.get());
+
+    ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
+    ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
+                                                  /*flags*/0));
+
+    MockPassiveStream *stream = static_cast<MockPassiveStream*>
+                                       ((consumer->
+                                               getVbucketStream(vbid)).get());
+    ASSERT_TRUE(stream->isActive());
+
+    std::string key = "key";
+    std::string data = R"({"json":"yes"})";
+    const DocKey docKey{ reinterpret_cast<const uint8_t*>(key.data()),
+        key.size(),
+        DocNamespace::DefaultCollection};
+    cb::const_byte_buffer value{reinterpret_cast<const uint8_t*>(data.data()),
+        data.size()};
+    uint8_t extMeta[1] = {uint8_t(PROTOCOL_BINARY_DATATYPE_JSON)};
+    cb::const_byte_buffer meta{extMeta, sizeof(uint8_t)};
+
+    consumer->mutation(/*opaque*/1,
+                       /*key*/docKey,
+                       /*values*/value,
+                       /*priv_bytes*/0,
+                       /*datatype*/PROTOCOL_BINARY_DATATYPE_JSON,
+                       /*cas*/0,
+                       /*vbucket*/vbid,
+                       /*flags*/0,
+                       /*bySeqno*/1,
+                       /*revSeqno*/0,
+                       /*exptime*/0,
+                       /*lock_time*/0,
+                       /*meta*/meta,
+                       /*nru*/0);
+
+    auto messageSize = MutationResponse::mutationBaseMsgBytes +
+            key.size() + data.size() + sizeof(extMeta);
+
+    EXPECT_EQ(messageSize, stream->responseMessageSize);
+
+    /* Close stream before deleting the connection */
+    ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(/*opaque*/0, vbid));
+
+    destroy_mock_cookie(cookie);
+}
+
+
 // Test cases which run in both Full and Value eviction
 INSTANTIATE_TEST_CASE_P(PersistentAndEphemeral,
                         StreamTest,