MB-16181: Interface update, allow DCP open to accept a value 38/78038/4
authorJim Walker <jim@couchbase.com>
Tue, 14 Mar 2017 12:05:25 +0000 (12:05 +0000)
committerTrond Norbye <trond.norbye@gmail.com>
Mon, 15 May 2017 06:25:53 +0000 (06:25 +0000)
DCP consumers will be able to open a DCP stream and specify a filter,
a list of collections they are interested in. This filter will be
passed as a JSON 'value' to DCP open.

Change-Id: Id33306f0d663b263840f70c44a16deff6db0a89e
Reviewed-on: http://review.couchbase.org/78038
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Trond Norbye <trond.norbye@gmail.com>
src/ep_engine.cc
src/ep_engine.h
tests/ep_perfsuite.cc
tests/ep_testsuite_dcp.cc
tests/module_tests/evp_store_single_threaded_test.cc

index 3b23521..5266733 100644 (file)
@@ -1364,16 +1364,15 @@ static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
     return ENGINE_DISCONNECT;
 }
 
-
 static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
                                     const void* cookie,
                                     uint32_t opaque,
                                     uint32_t seqno,
                                     uint32_t flags,
-                                    void* name,
-                                    uint16_t nname) {
+                                    cb::const_char_buffer name,
+                                    cb::const_byte_buffer jsonExtra) {
     return acquireEngine(handle)->dcpOpen(
-            cookie, opaque, seqno, flags, name, nname);
+            cookie, opaque, seqno, flags, name, jsonExtra);
 }
 
 static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
@@ -6012,16 +6011,16 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::getRandomKey(const void *cookie,
     return ret;
 }
 
-ENGINE_ERROR_CODE EventuallyPersistentEngine::dcpOpen(const void* cookie,
-                                                       uint32_t opaque,
-                                                       uint32_t seqno,
-                                                       uint32_t flags,
-                                                       const void *stream_name,
-                                                       uint16_t nname)
-{
+ENGINE_ERROR_CODE EventuallyPersistentEngine::dcpOpen(
+        const void* cookie,
+        uint32_t opaque,
+        uint32_t seqno,
+        uint32_t flags,
+        cb::const_char_buffer stream_name,
+        cb::const_byte_buffer jsonExtra) {
     (void) opaque;
     (void) seqno;
-    std::string connName(static_cast<const char*>(stream_name), nname);
+    std::string connName = cb::to_string(stream_name);
 
     if (reserveCookie(cookie) != ENGINE_SUCCESS) {
         LOG(EXTENSION_LOG_WARNING, "Cannot create DCP connection because cookie"
index 125a1f8..3d6eaae 100644 (file)
@@ -268,8 +268,8 @@ public:
                               uint32_t opaque,
                               uint32_t seqno,
                               uint32_t flags,
-                              const void *stream_name,
-                              uint16_t nname);
+                              cb::const_char_buffer stream_name,
+                              cb::const_byte_buffer jsonExtra);
 
     ENGINE_ERROR_CODE dcpAddStream(const void* cookie,
                                    uint32_t opaque,
index c1784bd..3d6c9af 100644 (file)
@@ -862,8 +862,8 @@ static void perf_dcp_client(ENGINE_HANDLE* h, ENGINE_HANDLE_V1* h1,
     uint64_t vb_uuid = get_ull_stat(h, h1, uuid.c_str(), "failovers");
     uint32_t streamOpaque = opaque;
 
-    checkeq(h1->dcp.open(h, cookie, ++streamOpaque, 0, DCP_OPEN_PRODUCER,
-                             (void*)name.c_str(), name.length()),
+    checkeq(h1->dcp.open(
+                    h, cookie, ++streamOpaque, 0, DCP_OPEN_PRODUCER, name, {}),
             ENGINE_SUCCESS,
             "Failed dcp producer open connection");
 
index 77c9688..50d8e93 100644 (file)
@@ -499,8 +499,7 @@ void TestDcpConsumer::openConnection(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)
 
     /* Set up Producer at server */
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER,
-                         (void*)name.c_str(), name.length()),
+            h1->dcp.open(h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER, name, {}),
             "Failed dcp producer open connection.");
 
     /* Set flow control buffer size */
@@ -858,9 +857,9 @@ extern "C" {
               "Failed to set vbucket state.");
 
         // Open consumer connection
-        checkeq(ctx->h1->dcp.open(ctx->h, cookie, opaque, 0, flags,
-                                  (void*)name.c_str(), name.length()),
-                ENGINE_SUCCESS, "Failed dcp Consumer open connection.");
+        checkeq(ctx->h1->dcp.open(ctx->h, cookie, opaque, 0, flags, name, {}),
+                ENGINE_SUCCESS,
+                "Failed dcp Consumer open connection.");
 
         add_stream_for_consumer(ctx->h, ctx->h1, cookie, opaque++, 0, 0,
                                 PROTOCOL_BINARY_RESPONSE_SUCCESS);
@@ -1056,8 +1055,8 @@ static enum test_result test_dcp_notifier_open(ENGINE_HANDLE *h, ENGINE_HANDLE_V
     const uint32_t seqno = 0;
     uint32_t opaque = 0;
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie1, opaque, seqno, DCP_OPEN_NOTIFIER,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(
+                    h, cookie1, opaque, seqno, DCP_OPEN_NOTIFIER, name, {}),
             "Failed dcp consumer open connection.");
 
     const std::string stat_type("eq_dcpq:" + name + ":type");
@@ -1071,8 +1070,8 @@ static enum test_result test_dcp_notifier_open(ENGINE_HANDLE *h, ENGINE_HANDLE_V
 
     const auto *cookie2 = testHarness.create_cookie();
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie2, opaque, seqno, DCP_OPEN_NOTIFIER,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(
+                    h, cookie2, opaque, seqno, DCP_OPEN_NOTIFIER, name, {}),
             "Failed dcp consumer open connection.");
 
     type = get_str_stat(h, h1, stat_type.c_str(), "dcp");
@@ -1095,8 +1094,7 @@ static enum test_result test_dcp_notifier(ENGINE_HANDLE *h,
     uint64_t start = 0;
 
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, seqno, DCP_OPEN_NOTIFIER,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(h, cookie, opaque, seqno, DCP_OPEN_NOTIFIER, name, {}),
             "Failed dcp notifier open connection.");
     // Get notification for an old item
     notifier_request(h, h1, cookie, ++opaque, vbucket, start, true);
@@ -1172,8 +1170,7 @@ static enum test_result test_dcp_notifier_equal_to_number_of_items(
     const uint64_t start = 1;
     uint32_t opaque = 0;
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, seqno, DCP_OPEN_NOTIFIER,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(h, cookie, opaque, seqno, DCP_OPEN_NOTIFIER, name, {}),
             "Failed dcp notifier open connection.");
     // Should not get a stream end
     notifier_request(h, h1, cookie, ++opaque, vbucket, start, true);
@@ -1199,8 +1196,7 @@ static enum test_result test_dcp_consumer_open(ENGINE_HANDLE *h, ENGINE_HANDLE_V
     const uint32_t seqno = 0;
     const uint32_t flags = 0;
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie1, opaque, seqno, flags,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(h, cookie1, opaque, seqno, flags, name, {}),
             "Failed dcp consumer open connection.");
 
     const auto stat_type("eq_dcpq:" + name + ":type");
@@ -1214,8 +1210,7 @@ static enum test_result test_dcp_consumer_open(ENGINE_HANDLE *h, ENGINE_HANDLE_V
 
     const auto *cookie2 = testHarness.create_cookie();
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie2, opaque, seqno, flags,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(h, cookie2, opaque, seqno, flags, name, {}),
             "Failed dcp consumer open connection.");
 
     type = get_str_stat(h, h1, stat_type.c_str(), "dcp");
@@ -1235,8 +1230,7 @@ static enum test_result test_dcp_consumer_flow_control_none(ENGINE_HANDLE *h,
     const uint32_t seqno = 0;
     const uint32_t flags = 0;
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie1, opaque, seqno, flags,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(h, cookie1, opaque, seqno, flags, name, {}),
             "Failed dcp consumer open connection.");
 
     const auto stat_name("eq_dcpq:" + name + ":max_buffer_bytes");
@@ -1256,8 +1250,7 @@ static enum test_result test_dcp_consumer_flow_control_static(ENGINE_HANDLE *h,
     const uint32_t flags = 0;
     const auto flow_ctl_buf_def_size = 10485760;
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie1, opaque, seqno, flags,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(h, cookie1, opaque, seqno, flags, name, {}),
             "Failed dcp consumer open connection.");
 
     const auto stat_name("eq_dcpq:" + name + ":max_buffer_bytes");
@@ -1283,8 +1276,7 @@ static enum test_result test_dcp_consumer_flow_control_dynamic(ENGINE_HANDLE *h,
             "Incorrect new size.");
 
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie1, opaque, seqno, flags,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(h, cookie1, opaque, seqno, flags, name, {}),
             "Failed dcp consumer open connection.");
 
     const auto stat_name("eq_dcpq:" + name + ":max_buffer_bytes");
@@ -1301,8 +1293,7 @@ static enum test_result test_dcp_consumer_flow_control_dynamic(ENGINE_HANDLE *h,
             "Incorrect new size.");
 
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie2, opaque, seqno, flags,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(h, cookie2, opaque, seqno, flags, name, {}),
             "Failed dcp consumer open connection.");
 
     checkeq(20000000,
@@ -1316,16 +1307,14 @@ static enum test_result test_dcp_consumer_flow_control_dynamic(ENGINE_HANDLE *h,
     for (auto count = 0; count < 10; count++) {
         const auto *cookie = testHarness.create_cookie();
         checkeq(ENGINE_SUCCESS,
-                h1->dcp.open(h, cookie, opaque, seqno, flags,
-                             (void*)name.c_str(), name.size()),
+                h1->dcp.open(h, cookie, opaque, seqno, flags, name, {}),
                 "Failed dcp consumer open connection.");
         testHarness.destroy_cookie(cookie);
     }
     /* By now mem used by flow control bufs would have crossed the threshold */
     const auto *cookie3 = testHarness.create_cookie();
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie3, opaque, seqno, flags,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(h, cookie3, opaque, seqno, flags, name, {}),
             "Failed dcp consumer open connection.");
 
     checkeq(10485760,
@@ -1341,8 +1330,7 @@ static enum test_result test_dcp_consumer_flow_control_dynamic(ENGINE_HANDLE *h,
             get_ull_stat(h, h1, "ep_max_size"), "Incorrect new size.");
 
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie4, opaque, seqno, flags,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(h, cookie4, opaque, seqno, flags, name, {}),
             "Failed dcp consumer open connection.");
 
     checkeq(52428800,
@@ -1374,8 +1362,7 @@ static enum test_result test_dcp_consumer_flow_control_aggressive(
     const uint32_t flags = 0;
     cookie[0] = testHarness.create_cookie();
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie[0], opaque, seqno, flags,
-                         (void*)name1.c_str(), name1.size()),
+            h1->dcp.open(h, cookie[0], opaque, seqno, flags, name1, {}),
             "Failed dcp consumer open connection.");
 
     /* Check the max limit */
@@ -1388,8 +1375,7 @@ static enum test_result test_dcp_consumer_flow_control_aggressive(
         cookie[count] = testHarness.create_cookie();
         const auto name2(name + std::to_string(count));
         checkeq(ENGINE_SUCCESS,
-                h1->dcp.open(h, cookie[count], opaque, seqno, flags,
-                             (void*)name2.c_str(), name2.length()),
+                h1->dcp.open(h, cookie[count], opaque, seqno, flags, name2, {}),
                 "Failed dcp consumer open connection.");
 
         for (auto i = 0; i <= count; i++) {
@@ -1407,8 +1393,8 @@ static enum test_result test_dcp_consumer_flow_control_aggressive(
     const auto name3(name + std::to_string(max_conns - 1));
     const auto stat_name2("eq_dcpq:" + name3 + ":max_buffer_bytes");
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie[max_conns - 1], opaque, seqno, flags,
-                         (void*)name3.c_str(), name3.size()),
+            h1->dcp.open(
+                    h, cookie[max_conns - 1], opaque, seqno, flags, name3, {}),
             "Failed dcp consumer open connection.");
     checkeq(flow_ctl_buf_min, get_int_stat(h, h1, stat_name2.c_str(), "dcp"),
             "Flow Control Buffer Size not equal to min");
@@ -1459,8 +1445,8 @@ static enum test_result test_dcp_producer_open(ENGINE_HANDLE *h, ENGINE_HANDLE_V
     const uint32_t opaque = 0;
     const uint32_t seqno = 0;
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie1, opaque, seqno, DCP_OPEN_PRODUCER,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(
+                    h, cookie1, opaque, seqno, DCP_OPEN_PRODUCER, name, {}),
             "Failed dcp producer open connection.");
     const auto stat_type("eq_dcpq:" + name + ":type");
     auto type = get_str_stat(h, h1, stat_type.c_str(), "dcp");
@@ -1473,8 +1459,8 @@ static enum test_result test_dcp_producer_open(ENGINE_HANDLE *h, ENGINE_HANDLE_V
 
     const auto *cookie2 = testHarness.create_cookie();
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie2, opaque, seqno, DCP_OPEN_PRODUCER,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(
+                    h, cookie2, opaque, seqno, DCP_OPEN_PRODUCER, name, {}),
             "Failed dcp producer open connection.");
     type = get_str_stat(h, h1, stat_type.c_str(), "dcp");
     checkeq(0, type.compare("producer"), "Producer not found");
@@ -1492,8 +1478,7 @@ static enum test_result test_dcp_producer_open_same_cookie(ENGINE_HANDLE *h,
     uint32_t opaque = 0;
     const uint32_t seqno = 0;
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, seqno, DCP_OPEN_PRODUCER,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(h, cookie, opaque, seqno, DCP_OPEN_PRODUCER, name, {}),
             "Failed dcp producer open connection.");
 
     const auto stat_type("eq_dcpq:" + name + ":type");
@@ -1513,8 +1498,8 @@ static enum test_result test_dcp_producer_open_same_cookie(ENGINE_HANDLE *h,
     testHarness.store_engine_specific(cookie, nullptr);
 
     checkeq(ENGINE_DISCONNECT,
-            h1->dcp.open(h, cookie, opaque++, seqno, DCP_OPEN_PRODUCER,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(
+                    h, cookie, opaque++, seqno, DCP_OPEN_PRODUCER, name, {}),
             "Failed to return ENGINE_DISCONNECT");
 
     checkeq(2, testHarness.get_number_of_mock_cookie_references(cookie),
@@ -1534,8 +1519,7 @@ static enum test_result test_dcp_noop(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     const uint32_t seqno = 0;
     uint32_t opaque = 0;
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, seqno, DCP_OPEN_PRODUCER,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(h, cookie, opaque, seqno, DCP_OPEN_PRODUCER, name, {}),
             "Failed dcp producer open connection.");
     const std::string param1_name("connection_buffer_size");
     const std::string param1_value("1024");
@@ -1588,9 +1572,8 @@ static enum test_result test_dcp_noop_fail(ENGINE_HANDLE *h,
     const uint32_t seqno = 0;
     uint32_t opaque = 0;
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, seqno, DCP_OPEN_PRODUCER,
-                         (void*)name.c_str(), name.size()),
-                         "Failed dcp producer open connection.");
+            h1->dcp.open(h, cookie, opaque, seqno, DCP_OPEN_PRODUCER, name, {}),
+            "Failed dcp producer open connection.");
     const std::string param1_name("connection_buffer_size");
     const std::string param1_value("1024");
     checkeq(ENGINE_SUCCESS,
@@ -1639,8 +1622,7 @@ static enum test_result test_dcp_consumer_noop(ENGINE_HANDLE *h,
     uint32_t opaque = 0;
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, seqno, flags,
-                         (void*)name.c_str(), name.size()),
+            h1->dcp.open(h, cookie, opaque, seqno, flags, name, {}),
             "Failed dcp Consumer open connection.");
     add_stream_for_consumer(h, h1, cookie, opaque, vbucket, flags,
                             PROTOCOL_BINARY_RESPONSE_SUCCESS);
@@ -2233,8 +2215,7 @@ static enum test_result test_dcp_producer_keep_stream_open_replica(
 
     /* Open an DCP consumer connection */
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, seqno, flags,
-                         (void*)conn_name.c_str(), conn_name.length()),
+            h1->dcp.open(h, cookie, opaque, seqno, flags, conn_name, {}),
             "Failed dcp producer open connection.");
 
     std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
@@ -2402,11 +2383,10 @@ static test_result test_dcp_producer_stream_req_nmvb(ENGINE_HANDLE *h,
     uint32_t seqno = 0;
     uint32_t flags = DCP_OPEN_PRODUCER;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie1, opaque, seqno, flags, (void*)name, nname),
-          "Failed dcp producer open connection.");
+            h1->dcp.open(h, cookie1, opaque, seqno, flags, name, {}),
+            "Failed dcp producer open connection.");
 
     uint32_t req_vbucket = 1;
     uint64_t rollback = 0;
@@ -2636,11 +2616,9 @@ static test_result test_dcp_value_compression(ENGINE_HANDLE *h,
     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
     const void *cookie = testHarness.create_cookie();
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
     uint32_t opaque = 1;
 
-    checkeq(h1->dcp.open(h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER,
-                         (void*)name, nname),
+    checkeq(h1->dcp.open(h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER, name, {}),
             ENGINE_SUCCESS,
             "Failed dcp producer open connection.");
 
@@ -2765,8 +2743,7 @@ static test_result test_dcp_takeover_no_items(ENGINE_HANDLE *h,
     uint32_t opaque = 1;
 
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER, (void*)name,
-                         strlen(name)),
+            h1->dcp.open(h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER, name, {}),
             "Failed dcp producer open connection.");
 
     uint16_t vbucket = 0;
@@ -2995,12 +2972,11 @@ static enum test_result test_dcp_reconnect(ENGINE_HANDLE *h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
     int items = full ? 10 : 5;
 
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp Consumer open connection.");
 
     add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
@@ -3047,7 +3023,7 @@ static enum test_result test_dcp_reconnect(ENGINE_HANDLE *h,
     cookie = testHarness.create_cookie();
 
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp Consumer open connection.");
 
     uint64_t snap_start = full ? 10 : 0;
@@ -3098,14 +3074,13 @@ static enum test_result test_dcp_consumer_takeover(ENGINE_HANDLE *h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
           "Failed to set vbucket state.");
 
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp Consumer open connection.");
 
     add_stream_for_consumer(h, h1, cookie, opaque++, 0,
@@ -3193,14 +3168,13 @@ static enum test_result test_failover_scenario_one_with_dcp(
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
           "Failed to set vbucket state.");
 
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp Consumer open connection.");
 
     add_stream_for_consumer(h, h1, cookie, opaque++, 0,
@@ -3250,11 +3224,10 @@ static enum test_result test_failover_scenario_two_with_dcp(ENGINE_HANDLE *h,
     const void *cookie = testHarness.create_cookie();
     uint32_t opaque = 0xFFFF0000;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, 0, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, 0, name, {}),
             "Failed dcp Consumer open connection.");
 
     // Set up a passive stream
@@ -3341,8 +3314,7 @@ static enum test_result test_dcp_add_stream(ENGINE_HANDLE *h,
 
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)(name.c_str()),
-                         name.length()),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp Consumer open connection.");
 
     std::string flow_ctl_stat_buf("eq_dcpq:" + name + ":unacked_bytes");
@@ -3370,14 +3342,13 @@ static enum test_result test_consumer_backoff_stat(ENGINE_HANDLE *h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
           "Failed to set vbucket state.");
 
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp Consumer open connection.");
 
     add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
@@ -3433,11 +3404,10 @@ static enum test_result test_rollback_to_zero(ENGINE_HANDLE *h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp Consumer open connection.");
 
     add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
@@ -3501,10 +3471,9 @@ static enum test_result test_chk_manager_rollback(ENGINE_HANDLE *h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp Consumer open connection.");
 
     do {
@@ -3592,14 +3561,13 @@ static enum test_result test_fullrollback_for_consumer(ENGINE_HANDLE *h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
           "Failed to set vbucket state.");
 
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp Consumer open connection.");
 
     do {
@@ -3711,11 +3679,10 @@ static enum test_result test_partialrollback_for_consumer(ENGINE_HANDLE *h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp Consumer open connection.");
 
     do {
@@ -3813,13 +3780,12 @@ static enum test_result test_dcp_buffer_log_size(ENGINE_HANDLE *h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = DCP_OPEN_PRODUCER;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
     char stats_buffer[50];
     char status_buffer[50];
 
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp Consumer open connection.");
 
     checkeq(ENGINE_SUCCESS,
@@ -3918,11 +3884,10 @@ static enum test_result test_dcp_get_failover_log(ENGINE_HANDLE *h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = DCP_OPEN_PRODUCER;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp Consumer open connection.");
 
     checkeq(ENGINE_SUCCESS,
@@ -3962,7 +3927,6 @@ static enum test_result test_dcp_add_stream_exists(ENGINE_HANDLE *h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
     uint16_t vbucket = 0;
 
     check(set_vbucket_state(h, h1, vbucket, vbucket_state_replica),
@@ -3970,7 +3934,7 @@ static enum test_result test_dcp_add_stream_exists(ENGINE_HANDLE *h,
 
     /* Open consumer connection */
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp consumer open connection.");
 
     /* Send add stream to consumer */
@@ -3989,8 +3953,7 @@ static enum test_result test_dcp_add_stream_exists(ENGINE_HANDLE *h,
     uint32_t opaque1 = 0xFFFF0000;
     std::string name1("unittest1");
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie1, opaque1, 0, flags, (void*)name1.c_str(),
-                         name1.length()),
+            h1->dcp.open(h, cookie1, opaque1, 0, flags, name1, {}),
             "Failed dcp consumer open connection.");
 
     /* Send add stream */
@@ -4016,14 +3979,13 @@ static enum test_result test_dcp_add_stream_nmvb(ENGINE_HANDLE *h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
           "Failed to set vbucket state.");
 
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp consumer open connection.");
 
     // Send add stream to consumer for vbucket that doesn't exist
@@ -4042,14 +4004,13 @@ static enum test_result test_dcp_add_stream_prod_exists(ENGINE_HANDLE*h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
           "Failed to set vbucket state.");
 
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp consumer open connection.");
 
     add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
@@ -4064,13 +4025,12 @@ static enum test_result test_dcp_add_stream_prod_nmvb(ENGINE_HANDLE*h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
           "Failed to set vbucket state.");
 
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp producer open connection.");
 
     add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
@@ -4085,10 +4045,9 @@ static enum test_result test_dcp_close_stream_no_stream(ENGINE_HANDLE *h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp producer open connection.");
 
     checkeq(ENGINE_KEY_ENOENT,
@@ -4105,13 +4064,12 @@ static enum test_result test_dcp_close_stream(ENGINE_HANDLE *h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
           "Failed to set vbucket state.");
 
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp producer open connection.");
 
     add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
@@ -4139,13 +4097,12 @@ static enum test_result test_dcp_consumer_end_stream(ENGINE_HANDLE *h,
     uint16_t vbucket = 0;
     uint32_t end_flag = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
           "Failed to set vbucket state.");
 
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp producer open connection.");
 
     add_stream_for_consumer(h, h1, cookie, opaque++, vbucket, 0,
@@ -4181,8 +4138,7 @@ static enum test_result test_dcp_consumer_mutate(ENGINE_HANDLE *h,
 
     // Open an DCP connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)(name.c_str()),
-                         name.length()),
+            h1->dcp.open(h, cookie, opaque, seqno, flags, name, {}),
             "Failed dcp producer open connection.");
 
     std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
@@ -4296,12 +4252,11 @@ static enum test_result test_dcp_consumer_delete(ENGINE_HANDLE *h, ENGINE_HANDLE
     uint64_t bySeqno = 10;
     uint64_t revSeqno = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
     uint32_t seqno = 0;
 
     // Open an DCP connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, seqno, flags, name, {}),
             "Failed dcp producer open connection.");
 
     std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
@@ -4352,11 +4307,10 @@ static enum test_result test_dcp_replica_stream_backfill(ENGINE_HANDLE *h,
     uint32_t flags = 0;
     const int num_items = 100;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     /* Open an DCP consumer connection */
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, seqno, flags, name, {}),
             "Failed dcp producer open connection.");
 
     std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
@@ -4402,11 +4356,10 @@ static enum test_result test_dcp_replica_stream_in_memory(ENGINE_HANDLE *h,
     uint32_t flags = 0;
     const int num_items = 100;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     /* Open an DCP consumer connection */
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, seqno, flags, name, {}),
             "Failed dcp producer open connection.");
 
     std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
@@ -4451,11 +4404,10 @@ static enum test_result test_dcp_replica_stream_all(ENGINE_HANDLE *h,
     uint32_t flags = 0;
     const int num_items = 100;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     /* Open an DCP consumer connection */
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, seqno, flags, name, {}),
             "Failed dcp producer open connection.");
 
     std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
@@ -4576,7 +4528,6 @@ static enum test_result test_dcp_persistence_seqno_backfillItems(
     const void* consumerCookie = testHarness.create_cookie();
     uint32_t opaque = 0xFFFF0000;
     const char* name = "unittest";
-    uint16_t nname = strlen(name);
 
     /* Open an DCP consumer connection */
     checkeq(ENGINE_SUCCESS,
@@ -4585,8 +4536,8 @@ static enum test_result test_dcp_persistence_seqno_backfillItems(
                          opaque,
                          /*start_seqno*/ 0,
                          /*flags*/ 0,
-                         (void*)name,
-                         nname),
+                         name,
+                         {}),
             "Failed dcp consumer open connection.");
 
     std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
@@ -4835,8 +4786,7 @@ static enum test_result test_dcp_erroneous_mutations(ENGINE_HANDLE *h,
     uint32_t flags = 0;
     std::string name("err_mutations");
 
-    checkeq(h1->dcp.open(h, cookie, opaque, 0, flags, (void*)(name.c_str()),
-                         name.size()),
+    checkeq(h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             ENGINE_SUCCESS,
             "Failed to open DCP consumer connection!");
     add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
@@ -4924,8 +4874,7 @@ static enum test_result test_dcp_erroneous_marker(ENGINE_HANDLE *h,
     uint32_t flags = 0;
     std::string name("first_marker");
 
-    checkeq(h1->dcp.open(h, cookie1, opaque, 0, flags, (void*)(name.c_str()),
-                         name.size()),
+    checkeq(h1->dcp.open(h, cookie1, opaque, 0, flags, name, {}),
             ENGINE_SUCCESS,
             "Failed to open DCP consumer connection!");
     add_stream_for_consumer(h, h1, cookie1, opaque++, 0, 0,
@@ -4960,8 +4909,7 @@ static enum test_result test_dcp_erroneous_marker(ENGINE_HANDLE *h,
     opaque = 0xFFFFF000;
     name.assign("second_marker");
 
-    checkeq(h1->dcp.open(h, cookie2, opaque, 0 ,flags, (void*)(name.c_str()),
-                         name.size()),
+    checkeq(h1->dcp.open(h, cookie2, opaque, 0, flags, name, {}),
             ENGINE_SUCCESS,
             "Failed to open DCP consumer connection!");
     add_stream_for_consumer(h, h1, cookie2, opaque++, 0, 0,
@@ -5017,8 +4965,7 @@ static enum test_result test_dcp_invalid_mutation_deletion(ENGINE_HANDLE* h,
     uint32_t flags = 0;
     std::string name("err_mutations");
 
-    checkeq(h1->dcp.open(h, cookie, opaque, 0, flags, (void*)(name.c_str()),
-                         name.size()),
+    checkeq(h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             ENGINE_SUCCESS,
             "Failed to open DCP consumer connection!");
     add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
@@ -5060,8 +5007,7 @@ static enum test_result test_dcp_invalid_snapshot_marker(ENGINE_HANDLE* h,
     uint32_t flags = 0;
     std::string name("unittest");
 
-    checkeq(h1->dcp.open(h, cookie, opaque, 0, flags, (void*)(name.c_str()),
-                         name.size()),
+    checkeq(h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             ENGINE_SUCCESS,
             "Failed to open DCP consumer connection!");
     add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
@@ -5136,8 +5082,9 @@ static enum test_result test_dcp_early_termination(ENGINE_HANDLE* h,
 
     const void *cookie = testHarness.create_cookie();
     uint32_t opaque = 1;
-    check(h1->dcp.open(h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER,
-                       (void*)"unittest", strlen("unittest")) == ENGINE_SUCCESS,
+    check(h1->dcp.open(
+                  h, cookie, ++opaque, 0, DCP_OPEN_PRODUCER, "unittest", {}) ==
+                  ENGINE_SUCCESS,
           "Failed dcp producer open connection.");
 
     check(h1->dcp.control(h, cookie, ++opaque, "connection_buffer_size",
@@ -5317,9 +5264,9 @@ static enum test_result test_mb17517_cas_minus_1_dcp(ENGINE_HANDLE *h,
           "Failed to set vbucket state to replica.");
 
     // Open consumer connection
-    checkeq(h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name.c_str(),
-                         name.size()),
-            ENGINE_SUCCESS, "Failed DCP Consumer open connection.");
+    checkeq(h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
+            ENGINE_SUCCESS,
+            "Failed DCP Consumer open connection.");
 
     add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
                             PROTOCOL_BINARY_RESPONSE_SUCCESS);
@@ -5563,11 +5510,10 @@ static enum test_result test_dcp_consumer_processer_behavior(ENGINE_HANDLE *h,
     uint32_t opaque = 0xFFFF0000;
     uint32_t flags = 0;
     const char *name = "unittest";
-    uint16_t nname = strlen(name);
 
     // Open consumer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)name, nname),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed dcp Consumer open connection.");
 
     add_stream_for_consumer(h, h1, cookie, opaque++, 0, 0,
@@ -5650,8 +5596,7 @@ static enum test_result test_get_all_vb_seqnos(ENGINE_HANDLE *h,
     uint32_t lockTime = 0;
 
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, 0, flags, (void*)(name.c_str()),
-                         name.size()),
+            h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             "Failed to open DCP consumer connection!");
     add_stream_for_consumer(h, h1, cookie, opaque++, rep_vb_num, 0,
                             PROTOCOL_BINARY_RESPONSE_SUCCESS);
@@ -5755,8 +5700,7 @@ static enum test_result test_mb19153(ENGINE_HANDLE *h,
 
     // Setup a producer connection
     checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, ++opaque, 0, flags,
-                         (void*)name, strlen(name)),
+            h1->dcp.open(h, cookie, ++opaque, 0, flags, name, {}),
             "Failed dcp Consumer open connection.");
 
     // Initiate a stream request
@@ -5811,8 +5755,7 @@ static enum test_result test_mb19982(ENGINE_HANDLE *h,
           "Failed to set vbucket state.");
 
     // Open consumer connection
-    checkeq(h1->dcp.open(h, cookie, opaque, 0, flags,
-                         (void*)name.c_str(), name.length()),
+    checkeq(h1->dcp.open(h, cookie, opaque, 0, flags, name, {}),
             ENGINE_SUCCESS,
             "Failed dcp Consumer open connection.");
 
index 0f6dc3f..62114c3 100644 (file)
@@ -636,8 +636,12 @@ TEST_F(SingleThreadedEPBucketTest, MB19892_BackfillNotDeleted) {
     // Create a DCP producer, and start a stream request.
     std::string name{"test_producer"};
     EXPECT_EQ(ENGINE_SUCCESS,
-              engine->dcpOpen(cookie, /*opaque:unused*/{}, /*seqno:unused*/{},
-                              DCP_OPEN_PRODUCER, name.data(), name.size()));
+              engine->dcpOpen(cookie,
+                              /*opaque:unused*/ {},
+                              /*seqno:unused*/ {},
+                              DCP_OPEN_PRODUCER,
+                              name,
+                              {}));
 
     uint64_t rollbackSeqno;
     auto dummy_dcp_add_failover_cb = [](vbucket_failover_t* entry,
@@ -833,8 +837,12 @@ TEST_F(MB20054_SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_del
     // Create a DCP producer, and start a stream request.
     std::string name("test_producer");
     EXPECT_EQ(ENGINE_SUCCESS,
-              engine->dcpOpen(cookie, /*opaque:unused*/{}, /*seqno:unused*/{},
-                              DCP_OPEN_PRODUCER, name.data(), name.size()));
+              engine->dcpOpen(cookie,
+                              /*opaque:unused*/ {},
+                              /*seqno:unused*/ {},
+                              DCP_OPEN_PRODUCER,
+                              name,
+                              {}));
 
     // Expect to have an ActiveStreamCheckpointProcessorTask, which is
     // initially snoozed (so we can't run it).