taskAlreadyCancelled.compare_exchange_strong(inverse, true);
}
+SingleThreadedRCPtr<PassiveStream> DcpConsumer::makePassiveStream(
+ EventuallyPersistentEngine& e,
+ dcp_consumer_t consumer,
+ const std::string& name,
+ uint32_t flags,
+ uint32_t opaque,
+ uint16_t vb,
+ uint64_t start_seqno,
+ uint64_t end_seqno,
+ uint64_t vb_uuid,
+ uint64_t snap_start_seqno,
+ uint64_t snap_end_seqno,
+ uint64_t vb_high_seqno) {
+ return SingleThreadedRCPtr<PassiveStream>(
+ new PassiveStream(&e,
+ consumer,
+ name,
+ flags,
+ opaque,
+ vb,
+ start_seqno,
+ end_seqno,
+ vb_uuid,
+ snap_start_seqno,
+ snap_end_seqno,
+ vb_high_seqno));
+}
+
ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
uint32_t flags) {
lastMessageTime = ep_current_time();
}
streams.insert({vbucket,
- SingleThreadedRCPtr<PassiveStream>(
- new PassiveStream(&engine_, this, getName(), flags,
- new_opaque, vbucket, start_seqno,
- end_seqno, vbucket_uuid,
- snap_start_seqno, snap_end_seqno,
- high_seqno))});
+ makePassiveStream(engine_,
+ this,
+ getName(),
+ flags,
+ new_opaque,
+ vbucket,
+ start_seqno,
+ end_seqno,
+ vbucket_uuid,
+ snap_start_seqno,
+ snap_end_seqno,
+ high_seqno)});
ready.push_back(vbucket);
opaqueMap_[new_opaque] = std::make_pair(opaque, vbucket);
try {
err = stream->messageReceived(
- std::make_unique<MutationResponse>(item, opaque, emd));
+ std::make_unique<MutationResponse>(item,
+ opaque,
+ /*isKeyOnly*/false,
+ emd));
} catch (const std::bad_alloc&) {
delete emd;
return ENGINE_ENOMEM;
try {
err = stream->messageReceived(
- std::make_unique<MutationResponse>(item, opaque, emd));
+ std::make_unique<MutationResponse>(item,
+ opaque,
+ /*isKeyOnly*/false,
+ emd));
} catch (const std::bad_alloc&) {
delete emd;
return ENGINE_ENOMEM;
virtual ~DcpConsumer();
+ /*
+ * Creates a PassiveStream.
+ *
+ * @param e Reference to the engine
+ * @param consumer The consumer the new stream will belong to
+ * @param name The name of the new stream
+ * @param flags The DCP flags
+ * @param opaque The stream opaque
+ * @param vb The vbucket the stream belongs to
+ * @param start_seqno The start sequence number of the stream
+ * @param end_seqno The end sequence number of the stream
+ * @param vb_uuid The uuid of the vbucket the stream belongs to
+ * @param snap_start_seqno The snapshot start sequence number
+ * @param snap_end_seqno The snapshot end sequence number
+ * @param vb_high_seqno The last received sequence number
+ *
+ * @return a SingleThreadedRCPtr to the newly created PassiveStream.
+ */
+ virtual SingleThreadedRCPtr<PassiveStream> makePassiveStream(
+ EventuallyPersistentEngine& e,
+ dcp_consumer_t consumer,
+ const std::string& name,
+ uint32_t flags,
+ uint32_t opaque,
+ uint16_t vb,
+ uint64_t start_seqno,
+ uint64_t end_seqno,
+ uint64_t vb_uuid,
+ uint64_t snap_start_seqno,
+ uint64_t snap_end_seqno,
+ uint64_t vb_high_seqno);
+
ENGINE_ERROR_CODE addStream(uint32_t opaque, uint16_t vbucket,
uint32_t flags) override;
void reconnectStream(VBucketPtr &vb, uint32_t new_opaque,
uint64_t start_seqno);
- ENGINE_ERROR_CODE messageReceived(std::unique_ptr<DcpResponse> response);
+ /*
+ * Calls the appropriate function to process the message.
+ *
+ * @params response The dcp message that needs to be processed.
+ * @returns the error code from processing the message.
+ */
+ virtual ENGINE_ERROR_CODE messageReceived(
+ std::unique_ptr<DcpResponse> response);
void addStats(ADD_STAT add_stat, const void *c);
#pragma once
#include "dcp/consumer.h"
+#include "mock_stream.h"
/*
* Mock of the DcpConsumer class. Wraps the real DcpConsumer class
void public_notifyVbucketReady(uint16_t vbid) {
notifyVbucketReady(vbid);
}
-};
\ No newline at end of file
+
+ /*
+ * Creates a PassiveStream.
+ * @return a SingleThreadedRCPtr to the newly created MockPassiveStream.
+ */
+ SingleThreadedRCPtr<PassiveStream> makePassiveStream(
+ EventuallyPersistentEngine& e,
+ dcp_consumer_t consumer,
+ const std::string& name,
+ uint32_t flags,
+ uint32_t opaque,
+ uint16_t vb,
+ uint64_t start_seqno,
+ uint64_t end_seqno,
+ uint64_t vb_uuid,
+ uint64_t snap_start_seqno,
+ uint64_t snap_end_seqno,
+ uint64_t vb_high_seqno) override {
+ return SingleThreadedRCPtr<PassiveStream>(
+ new MockPassiveStream(e,
+ consumer,
+ name,
+ flags,
+ opaque,
+ vb,
+ start_seqno,
+ end_seqno,
+ vb_uuid,
+ snap_start_seqno,
+ snap_end_seqno,
+ vb_high_seqno));
+ }
+
+};
*/
class MockPassiveStream : public PassiveStream {
public:
- MockPassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t consumer,
- const std::string &name, uint32_t flags, uint32_t opaque,
- uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
- uint64_t vb_uuid, uint64_t snap_start_seqno,
- uint64_t snap_end_seqno, uint64_t vb_high_seqno)
- : PassiveStream(e, consumer, name, flags, opaque, vb, start_seqno,
- end_seqno, vb_uuid, snap_start_seqno, snap_end_seqno,
+ MockPassiveStream(EventuallyPersistentEngine& e,
+ dcp_consumer_t consumer,
+ const std::string& name,
+ uint32_t flags,
+ uint32_t opaque,
+ uint16_t vb,
+ uint64_t start_seqno,
+ uint64_t end_seqno,
+ uint64_t vb_uuid,
+ uint64_t snap_start_seqno,
+ uint64_t snap_end_seqno,
+ uint64_t vb_high_seqno)
+ : PassiveStream(&e,
+ consumer,
+ name,
+ flags,
+ opaque,
+ vb,
+ start_seqno,
+ end_seqno,
+ vb_uuid,
+ snap_start_seqno,
+ snap_end_seqno,
vb_high_seqno) {}
void transitionStateToDead() {
transitionState(StreamState::Dead);
}
+
+ ENGINE_ERROR_CODE messageReceived(
+ std::unique_ptr<DcpResponse> dcpResponse) override {
+ responseMessageSize = dcpResponse->getMessageSize();
+ return PassiveStream::messageReceived(std::move(dcpResponse));
+ }
+
+ uint32_t responseMessageSize;
};
EXPECT_EQ(1, notifyTest.getCallbacks());
}
+// Tests that the MutationResponse created for the deletion response is of the
+// correct size.
+TEST_F(ConnectionTest, test_mb24424_deleteResponse) {
+ const void* cookie = create_mock_cookie();
+ uint16_t vbid = 0;
+
+ connection_t conn = new MockDcpConsumer(*engine, cookie, "test_consumer");
+ MockDcpConsumer* consumer = dynamic_cast<MockDcpConsumer*>(conn.get());
+
+ ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
+ ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
+ /*flags*/0));
+
+ MockPassiveStream *stream = static_cast<MockPassiveStream*>
+ ((consumer->
+ getVbucketStream(vbid)).get());
+ ASSERT_TRUE(stream->isActive());
+
+ std::string key = "key";
+ std::string data = R"({"json":"yes"})";
+ const DocKey docKey{ reinterpret_cast<const uint8_t*>(key.data()),
+ key.size(),
+ DocNamespace::DefaultCollection};
+ cb::const_byte_buffer value{reinterpret_cast<const uint8_t*>(data.data()),
+ data.size()};
+ uint8_t extMeta[1] = {uint8_t(PROTOCOL_BINARY_DATATYPE_JSON)};
+ cb::const_byte_buffer meta{extMeta, sizeof(uint8_t)};
+
+ consumer->deletion(/*opaque*/1,
+ /*key*/docKey,
+ /*values*/value,
+ /*priv_bytes*/0,
+ /*datatype*/PROTOCOL_BINARY_DATATYPE_JSON,
+ /*cas*/0,
+ /*vbucket*/vbid,
+ /*bySeqno*/1,
+ /*revSeqno*/0,
+ /*meta*/meta);
+
+ auto messageSize = MutationResponse::deletionBaseMsgBytes +
+ key.size() + data.size() + sizeof(extMeta);
+
+ EXPECT_EQ(messageSize, stream->responseMessageSize);
+
+ /* Close stream before deleting the connection */
+ ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(/*opaque*/0, vbid));
+
+ destroy_mock_cookie(cookie);
+}
+
+// Tests that the MutationResponse created for the mutation response is of the
+// correct size.
+TEST_F(ConnectionTest, test_mb24424_mutationResponse) {
+ const void* cookie = create_mock_cookie();
+ uint16_t vbid = 0;
+
+ connection_t conn = new MockDcpConsumer(*engine, cookie, "test_consumer");
+ MockDcpConsumer* consumer = dynamic_cast<MockDcpConsumer*>(conn.get());
+
+ ASSERT_EQ(ENGINE_SUCCESS, set_vb_state(vbid, vbucket_state_replica));
+ ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(/*opaque*/0, vbid,
+ /*flags*/0));
+
+ MockPassiveStream *stream = static_cast<MockPassiveStream*>
+ ((consumer->
+ getVbucketStream(vbid)).get());
+ ASSERT_TRUE(stream->isActive());
+
+ std::string key = "key";
+ std::string data = R"({"json":"yes"})";
+ const DocKey docKey{ reinterpret_cast<const uint8_t*>(key.data()),
+ key.size(),
+ DocNamespace::DefaultCollection};
+ cb::const_byte_buffer value{reinterpret_cast<const uint8_t*>(data.data()),
+ data.size()};
+ uint8_t extMeta[1] = {uint8_t(PROTOCOL_BINARY_DATATYPE_JSON)};
+ cb::const_byte_buffer meta{extMeta, sizeof(uint8_t)};
+
+ consumer->mutation(/*opaque*/1,
+ /*key*/docKey,
+ /*values*/value,
+ /*priv_bytes*/0,
+ /*datatype*/PROTOCOL_BINARY_DATATYPE_JSON,
+ /*cas*/0,
+ /*vbucket*/vbid,
+ /*flags*/0,
+ /*bySeqno*/1,
+ /*revSeqno*/0,
+ /*exptime*/0,
+ /*lock_time*/0,
+ /*meta*/meta,
+ /*nru*/0);
+
+ auto messageSize = MutationResponse::mutationBaseMsgBytes +
+ key.size() + data.size() + sizeof(extMeta);
+
+ EXPECT_EQ(messageSize, stream->responseMessageSize);
+
+ /* Close stream before deleting the connection */
+ ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(/*opaque*/0, vbid));
+
+ destroy_mock_cookie(cookie);
+}
+
+
// Test cases which run in both Full and Value eviction
INSTANTIATE_TEST_CASE_P(PersistentAndEphemeral,
StreamTest,