Merge branch 'couchbase/3.0.x' into 'couchbase/sherlock' 65/59765/11
authorabhinavdangeti <abhinav@couchbase.com>
Fri, 12 Feb 2016 16:54:56 +0000 (08:54 -0800)
committerabhinavdangeti <abhinav@couchbase.com>
Fri, 12 Feb 2016 16:58:13 +0000 (08:58 -0800)
couchbase/3.0.x:
|\
| * 0da7d42 MB-17885: Address compilation errors in ep_testsuite.cc
| * b7ee24c MB-17885: Update flow control bytesSent correctly on DCP producer

Change-Id: I70cda64395781a433a8e40720bdc5c75f5d0e3c2

1  2 
src/dcp-producer.cc
tests/ep_testsuite.cc

Simple merge
@@@ -3893,12 -3394,11 +3893,14 @@@ static void dcp_stream(ENGINE_HANDLE *h
                         uint64_t start, uint64_t end, uint64_t vb_uuid,
                         uint64_t snap_start_seqno, uint64_t snap_end_seqno,
                         int exp_mutations, int exp_deletions, int exp_markers,
 -                       int extra_takeover_ops, int exp_nru_value,
 +                       int extra_takeover_ops,
                         bool exp_disk_snapshot = false,
 +                       bool time_sync_enabled = false,
 +                       uint8_t exp_conflict_res = 0,
                         bool skipEstimateCheck = false,
-                        uint64_t *total_bytes = NULL) {
++                       uint64_t *total_bytes = NULL,
+                        uint64_t flow_control_buf_size = 1024,
+                        bool disable_ack = false) {
      uint32_t opaque = 1;
      uint16_t nname = strlen(name);
  
                         nname) == ENGINE_SUCCESS,
            "Failed dcp producer open connection.");
  
-     check(h1->dcp.control(h, cookie, ++opaque, "connection_buffer_size", 22,
-                           "1024", 4) == ENGINE_SUCCESS,
-           "Failed to establish connection buffer");
+     std::stringstream flow_control_buf_sz;
+     flow_control_buf_sz << flow_control_buf_size;
+     checkeq(ENGINE_SUCCESS,
+             h1->dcp.control(h, cookie, ++opaque, "connection_buffer_size", 22,
+                             flow_control_buf_sz.str().c_str(),
+                             flow_control_buf_sz.str().length()),
+             "Failed to establish connection buffer");
+     char stats_buffer[50] = {0};
+     if (flow_control_buf_size) {
+         snprintf(stats_buffer, sizeof(stats_buffer),
+                  "eq_dcpq:%s:max_buffer_bytes", name);
+         checkeq(static_cast<int>(flow_control_buf_size),
+                 get_int_stat(h, h1, stats_buffer, "dcp"),
+                 "Buffer Size did not get set correctly");
+     } else {
+         snprintf(stats_buffer, sizeof(stats_buffer),
+                  "eq_dcpq:%s:flow_control", name);
+         std::string status = get_str_stat(h, h1, stats_buffer, "dcp");
+         checkeq(0, status.compare("disabled"), "Flow control enabled!");
+     }
  
 +    check(h1->dcp.control(h, cookie, ++opaque, "enable_ext_metadata", 19,
 +                          "true", 4) == ENGINE_SUCCESS,
 +          "Failed to enable xdcr extras");
 +
      uint64_t rollback = 0;
      check(h1->dcp.stream_req(h, cookie, flags, opaque, vbucket, start, end,
                               vb_uuid, snap_start_seqno, snap_end_seqno, &rollback,
      uint64_t last_by_seqno = 0;
      uint32_t bytes_read = 0;
      uint64_t all_bytes = 0;
-     if (total_bytes) {
-         all_bytes = *total_bytes;
-     }
+     uint64_t total_acked_bytes = 0;
+     uint64_t ack_limit = flow_control_buf_size/2;
++
      do {
-         if (bytes_read > 512) {
+         if (!disable_ack && (bytes_read > ack_limit)) {
              h1->dcp.buffer_acknowledgement(h, cookie, ++opaque, 0, bytes_read);
+             total_acked_bytes += bytes_read;
              bytes_read = 0;
          }
          ENGINE_ERROR_CODE err = h1->dcp.step(h, cookie, producers);
          }
      } while (!done);
  
-         *total_bytes = all_bytes;
 +    if (total_bytes) {
++        *total_bytes = *total_bytes + all_bytes;
 +    }
      check(num_mutations == exp_mutations, "Invalid number of mutations");
      check(num_deletions == exp_deletions, "Invalid number of deletes");
      check(num_snapshot_marker == exp_markers,
@@@ -5466,6 -4797,48 +5505,48 @@@ static enum test_result test_dcp_buffer
      return SUCCESS;
  }
  
 -               0, num_items, 0, 1, 0, 2, false, false,
+ static enum test_result test_dcp_producer_flow_control(ENGINE_HANDLE *h,
+                                                        ENGINE_HANDLE_V1 *h1) {
+     /* Write 10 items */
+     const int num_items = 10;
+     for (int j = 0; j < num_items; ++j) {
+         item *i = NULL;
+         std::stringstream key;
+         key << "key" << j;
+         checkeq(ENGINE_SUCCESS,
+                 store(h, h1, NULL, OPERATION_SET, key.str().c_str(),
+                       "123456789", &i),
+                 "Failed to store a value");
+         h1->release(h, NULL, i);
+     }
+     wait_for_flusher_to_settle(h, h1);
+     verify_curr_items(h, h1, num_items, "Wrong amount of items");
+     uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+     /* Disable flow control and stream all items. The producer should stream all
+      items even when we do not send acks */
+     std::string name("unittest");
+     const void *cookie = testHarness.create_cookie();
+     dcp_stream(h, h1, name.c_str(), cookie, 0, 0, 0, num_items, vb_uuid, 0,
 -               0, 0, 1, 0, 1, 0, 2, false, false,
++               0, num_items, 0, 1, 0, false, false, 0, false, NULL,
+                0 /* do not enable flow control */,
+                true /* do not ack */);
+     /* Set flow control buffer to a very low value such that producer is not
+      expected to send more than 1 item when we do not send acks */
+     std::string name1("unittest1");
+     const void *cookie1 = testHarness.create_cookie();
+     dcp_stream(h, h1, name1.c_str(), cookie1, 0, 0, 0, num_items, vb_uuid,
++               0, 0, 1, 0, 1, 0, false, false, 0, false, NULL,
+                100 /* flow control buf to low value */,
+                true /* do not ack */);
+     testHarness.destroy_cookie(cookie);
+     testHarness.destroy_cookie(cookie1);
+     return SUCCESS;
+ }
  static enum test_result test_dcp_get_failover_log(ENGINE_HANDLE *h,
                                                    ENGINE_HANDLE_V1 *h1) {
      const void *cookie = testHarness.create_cookie();