MB-17885: Update flow control bytesSent correctly on DCP producer 75/59575/4
authorManu Dhundi <manu@couchbase.com>
Mon, 8 Feb 2016 17:42:25 +0000 (09:42 -0800)
committerChiyoung Seo <chiyoung@couchbase.com>
Tue, 9 Feb 2016 19:07:50 +0000 (19:07 +0000)
This is a fix for a regression introduced recently. Also this adds
a DCP test case to test flow control behavior of DCP producer.

Change-Id: Ia56858cb9e687a0a045b582c18e4b68948cb460c
Reviewed-on: http://review.couchbase.org/59575
Reviewed-by: Jim Walker <jim@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Tested-by: Chiyoung Seo <chiyoung@couchbase.com>
src/dcp-producer.cc
tests/ep_testsuite.cc
tests/mock/mock_dcp.cc

index 3d53924..7f0adee 100644 (file)
@@ -52,7 +52,7 @@ bool DcpProducer::BufferLog::insert(size_t bytes) {
     // If the log is not enabled
     // or there is space, allow the insert
     if (!isEnabled_UNLOCKED() || !isFull_UNLOCKED()) {
-        bytesSent += bytesSent;
+        bytesSent += bytes;
         inserted = true;
     }
     return inserted;
index 7238b75..5484765 100644 (file)
@@ -3396,7 +3396,9 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
                        int exp_mutations, int exp_deletions, int exp_markers,
                        int extra_takeover_ops, int exp_nru_value,
                        bool exp_disk_snapshot = false,
-                       bool skipEstimateCheck = false) {
+                       bool skipEstimateCheck = false,
+                       uint64_t flow_control_buf_size = 1024,
+                       bool disable_ack = false) {
     uint32_t opaque = 1;
     uint16_t nname = strlen(name);
 
@@ -3404,9 +3406,25 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
                        nname) == ENGINE_SUCCESS,
           "Failed dcp producer open connection.");
 
-    check(h1->dcp.control(h, cookie, ++opaque, "connection_buffer_size", 22,
-                          "1024", 4) == ENGINE_SUCCESS,
-          "Failed to establish connection buffer");
+    std::string flow_control_buf_sz = std::to_string(flow_control_buf_size);
+    checkeq(ENGINE_SUCCESS,
+            h1->dcp.control(h, cookie, ++opaque, "connection_buffer_size", 22,
+                            flow_control_buf_sz.c_str(),
+                            flow_control_buf_sz.length()),
+            "Failed to establish connection buffer");
+    char stats_buffer[50] = {0};
+    if (flow_control_buf_size) {
+        snprintf(stats_buffer, sizeof(stats_buffer),
+                 "eq_dcpq:%s:max_buffer_bytes", name);
+        checkeq(static_cast<int>(flow_control_buf_size),
+                get_int_stat(h, h1, stats_buffer, "dcp"),
+                "Buffer Size did not get set correctly");
+    } else {
+        snprintf(stats_buffer, sizeof(stats_buffer),
+                 "eq_dcpq:%s:flow_control", name);
+        std::string status = get_str_stat(h, h1, stats_buffer, "dcp");
+        checkeq(0, status.compare("disabled"), "Flow control enabled!");
+    }
 
     uint64_t rollback = 0;
     check(h1->dcp.stream_req(h, cookie, flags, opaque, vbucket, start, end,
@@ -3465,6 +3483,7 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
     }
 
     bool done = false;
+    bool exp_all_items_streamed = true;
     int num_mutations = 0;
     int num_deletions = 0;
     int num_snapshot_marker = 0;
@@ -3476,9 +3495,13 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
 
     uint64_t last_by_seqno = 0;
     uint32_t bytes_read = 0;
+    uint64_t all_bytes = 0;
+    uint64_t total_acked_bytes = 0;
+    uint64_t ack_limit = flow_control_buf_size/2;
     do {
-        if (bytes_read > 512) {
+        if (!disable_ack && (bytes_read > ack_limit)) {
             h1->dcp.buffer_acknowledgement(h, cookie, ++opaque, 0, bytes_read);
+            total_acked_bytes += bytes_read;
             bytes_read = 0;
         }
         ENGINE_ERROR_CODE err = h1->dcp.step(h, cookie, producers);
@@ -3492,6 +3515,7 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
                     last_by_seqno = dcp_last_byseqno;
                     num_mutations++;
                     bytes_read += dcp_last_packet_size;
+                    all_bytes += dcp_last_packet_size;
                     if (pending_marker_ack && dcp_last_byseqno == marker_end) {
                         sendDcpAck(h, h1, cookie, PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER,
                                PROTOCOL_BINARY_RESPONSE_SUCCESS, dcp_last_opaque);
@@ -3502,6 +3526,7 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
                     last_by_seqno = dcp_last_byseqno;
                     num_deletions++;
                     bytes_read += dcp_last_packet_size;
+                    all_bytes += dcp_last_packet_size;
                     if (pending_marker_ack && dcp_last_byseqno == marker_end) {
                         sendDcpAck(h, h1, cookie, PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER,
                                PROTOCOL_BINARY_RESPONSE_SUCCESS, dcp_last_opaque);
@@ -3510,6 +3535,7 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
                 case PROTOCOL_BINARY_CMD_DCP_STREAM_END:
                     done = true;
                     bytes_read += dcp_last_packet_size;
+                    all_bytes += dcp_last_packet_size;
                     break;
                 case PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER:
                     if (exp_disk_snapshot && num_snapshot_marker == 0) {
@@ -3523,6 +3549,7 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
 
                     num_snapshot_marker++;
                     bytes_read += dcp_last_packet_size;
+                    all_bytes += dcp_last_packet_size;
                     break;
                 case PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE:
                     if (dcp_last_vbucket_state == vbucket_state_pending) {
@@ -3540,6 +3567,7 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
                         num_set_vbucket_active++;
                     }
                     bytes_read += dcp_last_packet_size;
+                    all_bytes += dcp_last_packet_size;
                     sendDcpAck(h, h1, cookie, PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE,
                                PROTOCOL_BINARY_RESPONSE_SUCCESS, dcp_last_opaque);
                     break;
@@ -3548,6 +3576,13 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
                      * should just ignore this case. Note that we check for 0
                      * because we clear the dcp_last_op value below.
                      */
+                     if (disable_ack && flow_control_buf_size) {
+                         /* If there is no acking and if flow control is enabled
+                            we are done because producer should not send us any
+                            more items. */
+                         done = true;
+                         exp_all_items_streamed = false;
+                     }
                      break;
                 default:
                     break;
@@ -3569,12 +3604,23 @@ static void dcp_stream(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *name,
     }
 
     /* Check if the readyQ size goes to zero after all items are streamed */
-    char stats_ready_queue_memory[50];
-    snprintf(stats_ready_queue_memory, sizeof(stats_ready_queue_memory),
-             "eq_dcpq:%s:stream_0_ready_queue_memory", name);
-    check((uint64_t)get_ull_stat(h, h1, stats_ready_queue_memory, "dcp")
-          == 0, "readyQ size did not go to zero");
-
+    if (exp_all_items_streamed) {
+        char stats_ready_queue_memory[50];
+        snprintf(stats_ready_queue_memory, sizeof(stats_ready_queue_memory),
+                 "eq_dcpq:%s:stream_0_ready_queue_memory", name);
+        check((uint64_t)get_ull_stat(h, h1, stats_ready_queue_memory, "dcp")
+              == 0, "readyQ size did not go to zero");
+    }
+
+    /* Check if the producer has updated flow control stat correctly */
+    if (flow_control_buf_size) {
+        memset(stats_buffer, 0, 50);
+        snprintf(stats_buffer, sizeof(stats_buffer), "eq_dcpq:%s:unacked_bytes",
+                 name);
+        checkeq(static_cast<int>(all_bytes - total_acked_bytes),
+                get_int_stat(h, h1, stats_buffer, "dcp"),
+                "Buffer Size did not get set correctly");
+    }
     free(producers);
 }
 
@@ -4750,6 +4796,46 @@ static enum test_result test_dcp_buffer_log_size(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+static enum test_result test_dcp_producer_flow_control(ENGINE_HANDLE *h,
+                                                       ENGINE_HANDLE_V1 *h1) {
+    /* Write 10 items */
+    const int num_items = 10;
+    for (int j = 0; j < num_items; ++j) {
+        item *i = NULL;
+        std::string key("key" + std::to_string(j));
+        checkeq(ENGINE_SUCCESS,
+                store(h, h1, NULL, OPERATION_SET, key.c_str(), "123456789", &i),
+                "Failed to store a value");
+        h1->release(h, NULL, i);
+    }
+    wait_for_flusher_to_settle(h, h1);
+    verify_curr_items(h, h1, num_items, "Wrong amount of items");
+    uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+
+    /* Disable flow control and stream all items. The producer should stream all
+     items even when we do not send acks */
+    std::string name("unittest");
+    const void *cookie = testHarness.create_cookie();
+    dcp_stream(h, h1, name.c_str(), cookie, 0, 0, 0, num_items, vb_uuid, 0,
+               0, num_items, 0, 1, 0, 2, false, false,
+               0 /* do not enable flow control */,
+               true /* do not ack */);
+
+    /* Set flow control buffer to a very low value such that producer is not
+     expected to send more than 1 item when we do not send acks */
+    std::string name1("unittest1");
+    const void *cookie1 = testHarness.create_cookie();
+    dcp_stream(h, h1, name1.c_str(), cookie1, 0, 0, 0, num_items, vb_uuid,
+               0, 0, 1, 0, 1, 0, 2, false, false,
+               100 /* flow control buf to low value */,
+               true /* do not ack */);
+
+    testHarness.destroy_cookie(cookie);
+    testHarness.destroy_cookie(cookie1);
+
+    return SUCCESS;
+}
+
 static enum test_result test_dcp_get_failover_log(ENGINE_HANDLE *h,
                                                   ENGINE_HANDLE_V1 *h1) {
     const void *cookie = testHarness.create_cookie();
@@ -12479,6 +12565,9 @@ engine_test_t* get_tests(void) {
                 cleanup),
         TestCase("test change dcp buffer log size", test_dcp_buffer_log_size,
                 test_setup, teardown, NULL, prepare, cleanup),
+        TestCase("test dcp producer flow control",
+                 test_dcp_producer_flow_control, test_setup, teardown, NULL,
+                 prepare, cleanup),
         TestCase("test get failover log", test_dcp_get_failover_log,
                 test_setup, teardown, NULL, prepare, cleanup),
         TestCase("test add stream exists", test_dcp_add_stream_exists,
index 9b2487e..25ba0bc 100644 (file)
@@ -183,7 +183,7 @@ static ENGINE_ERROR_CODE mock_mutation(const void* cookie,
     dcp_last_meta = meta;
     dcp_last_nmeta = nmeta;
     dcp_last_nru = nru;
-    dcp_last_packet_size = 55 + dcp_last_key.length() + item->getValMemSize();
+    dcp_last_packet_size = 55 + dcp_last_key.length() + item->getNBytes();
     return ENGINE_SUCCESS;
 }