MB-21320: Notify memcached to visit readyQ and get any ready items 71/69171/7
authorManu Dhundi <manu@couchbase.com>
Mon, 31 Oct 2016 16:53:18 +0000 (09:53 -0700)
committerManu Dhundi <manu@couchbase.com>
Tue, 1 Nov 2016 21:51:59 +0000 (21:51 +0000)
We should notify memcached to visit readyQ and get any items that were
pushed there during stream creation. Also, we must notify the memcached
about cursor dropping so that it can visit ep-engine and stream any
pending items and do a subsequent stream state change.

This is not a functional fix. It improves performance however.
It is not absolutely necessary to notify immediately as conn manager
will notify eventually.

Change-Id: Id06fc450a20f6d0258fa7c687520dff5f4899a28
Reviewed-on: http://review.couchbase.org/69171
Reviewed-by: Dave Rigby <daver@couchbase.com>
Reviewed-by: Jim Walker <jim@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
configuration.json
src/connmap.cc
src/dcp/producer.cc
src/dcp/stream.cc
tests/ep_testsuite.cc
tests/ep_testsuite_dcp.cc

index c284010..dda405e 100644 (file)
             "descr": "True if we want to keep the closed checkpoints for each vbucket unless the memory usage is above high water mark",
             "type": "bool"
         },
+        "connection_manager_interval": {
+            "default": "2",
+            "descr": "How often connection manager task should be run (in seconds).",
+            "type": "size_t",
+            "dynamic": false,
+            "validator": {
+                "range": {
+                    "min": 2
+                }
+            }
+        },
         "max_checkpoints": {
             "default": "2",
             "type": "size_t"
index 5420efc..8422219 100644 (file)
@@ -140,12 +140,15 @@ bool ConnNotifier::notifyConnections() {
 class ConnManager : public GlobalTask {
 public:
     ConnManager(EventuallyPersistentEngine *e, ConnMap *cmap)
-        : GlobalTask(e, TaskId::ConnManager, MIN_SLEEP_TIME, true),
-          engine(e), connmap(cmap) { }
+        : GlobalTask(e, TaskId::ConnManager,
+                     e->getConfiguration().getConnectionManagerInterval(),
+                     true),
+          engine(e), connmap(cmap),
+          snoozeTime(e->getConfiguration().getConnectionManagerInterval()) { }
 
     bool run(void) {
         connmap->manageConnections();
-        snooze(MIN_SLEEP_TIME);
+        snooze(snoozeTime);
         return !engine->getEpStats().isShutdown ||
                !connmap->isAllEmpty() ||
                !connmap->isDeadConnectionsEmpty();
@@ -158,6 +161,7 @@ public:
 private:
     EventuallyPersistentEngine *engine;
     ConnMap *connmap;
+    size_t snoozeTime;
 };
 
 class ConnMapValueChangeListener : public ValueChangedListener {
@@ -229,9 +233,11 @@ void ConnMap::notifyPausedConnection(connection_t conn, bool schedule) {
         if (tp && tp->isPaused() && conn->isReserved() &&
             tp->setNotificationScheduled(true)) {
             pendingNotifications.push(conn);
-            connNotifier_->notifyMutationEvent(); // Wake up the connection notifier so that
-                                                  // it can notify the event to a given
-                                                  // paused connection.
+            if (connNotifier_) {
+                // Wake up the connection notifier so that
+                // it can notify the event to a given paused connection.
+                connNotifier_->notifyMutationEvent();
+            }
         }
     } else {
         LockHolder rlh(releaseLock);
index 13ba0a9..af8a853 100644 (file)
@@ -328,7 +328,7 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
         streams[vbucket] = s;
     }
 
-    ready.pushUnique(vbucket);
+    notifyStreamReady(vbucket);
 
     if (add_vb_conn_map) {
         connection_t conn(this);
index c1bce99..6550e5a 100644 (file)
@@ -1023,7 +1023,23 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
         } else {
             transitionState(STREAM_IN_MEMORY);
         }
-        itemsReady.store(true);
+        if (reschedule) {
+            /* Cursor was dropped, but we will not do backfill.
+               This may happen in a corner case where, the memory
+               usage is high due to other vbuckets and persistence cursor moves
+               ahead of replication cursor to new checkpoint open but does not
+               persist items yet.
+               Note: (1) We must not notify when we schedule backfill for the
+                         first time because the stream is not yet in producer
+                         conn list of streams 
+                     (2) It is not absolutely necessary to notify immediately
+                         as conn manager or an incoming items will cause a
+                         notification eventually, but wouldn't hurt to do so */
+            bool inverse = false;
+            if (itemsReady.compare_exchange_strong(inverse, true)) {
+                producer->notifyStreamReady(vb_);
+            }
+        }
     }
 }
 
@@ -1234,6 +1250,10 @@ void ActiveStream::dropCheckpointCursor_UNLOCKED()
     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
     if (!vbucket) {
         endStream(END_STREAM_STATE);
+        bool inverse = false;
+        if (itemsReady.compare_exchange_strong(inverse, true)) {
+            producer->notifyStreamReady(vb_);
+        }
     }
     /* Drop the existing cursor */
     vbucket->checkpointManager.removeCursor(name_);
index 069890c..c073d74 100644 (file)
@@ -6153,6 +6153,7 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
                 "ep_compaction_write_queue_cap",
                 "ep_config_file",
                 "ep_conflict_resolution_type",
+                "ep_connection_manager_interval",
                 "ep_couch_bucket",
                 "ep_cursor_dropping_lower_mark",
                 "ep_cursor_dropping_upper_mark",
@@ -6333,6 +6334,7 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
                 "ep_compaction_write_queue_cap",
                 "ep_config_file",
                 "ep_conflict_resolution_type",
+                "ep_connection_manager_interval",
                 "ep_couch_bucket",
                 "ep_cursor_dropping_lower_mark",
                 "ep_cursor_dropping_lower_threshold",
index ad34a05..cb259dc 100644 (file)
@@ -48,6 +48,8 @@ void dcp_step(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const void* cookie) {
    (This approach seems safer than calling pthread_cancel()) */
 static std::atomic<bool> stop_continuous_dcp_thread(false);
 
+static bool wait_started(false);
+
 struct SeqnoRange {
     uint64_t start;
     uint64_t end;
@@ -898,6 +900,85 @@ extern "C" {
     }
 }
 
+/* DCP step thread that keeps running till it reads upto 'exp_mutations'.
+   Note: the exp_mutations is cumulative across all streams in the DCP
+         connection */
+static void dcp_waiting_step(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+                             const void *cookie, uint32_t opaque,
+                             uint64_t exp_mutations)
+{
+    bool done = false;
+    size_t bytes_read = 0;
+    bool pending_marker_ack = false;
+    uint64_t marker_end = 0;
+    uint64_t num_mutations = 0;
+    std::unique_ptr<dcp_message_producers> producers(get_dcp_producers(h, h1));
+
+    do {
+        if (bytes_read > 512) {
+            checkeq(ENGINE_SUCCESS,
+                    h1->dcp.buffer_acknowledgement(h, cookie, ++opaque, 0,
+                                                   bytes_read),
+                    "Failed to get dcp buffer ack");
+            bytes_read = 0;
+        }
+        ENGINE_ERROR_CODE err = h1->dcp.step(h, cookie, producers.get());
+        if (err == ENGINE_DISCONNECT) {
+            done = true;
+        } else {
+            switch (dcp_last_op) {
+                case PROTOCOL_BINARY_CMD_DCP_MUTATION:
+                    bytes_read += dcp_last_packet_size;
+                    if (pending_marker_ack && dcp_last_byseqno == marker_end) {
+                        sendDcpAck(h, h1, cookie,
+                                   PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER,
+                                   PROTOCOL_BINARY_RESPONSE_SUCCESS,
+                                   dcp_last_opaque);
+                    }
+                    ++num_mutations;
+                    break;
+                case PROTOCOL_BINARY_CMD_DCP_STREAM_END:
+                    done = true;
+                    bytes_read += dcp_last_packet_size;
+                    break;
+                case PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER:
+                    if (dcp_last_flags & 8) {
+                        pending_marker_ack = true;
+                        marker_end = dcp_last_snap_end_seqno;
+                    }
+                    bytes_read += dcp_last_packet_size;
+                    break;
+                case 0:
+                    /* No messages were ready on the last step call, so we
+                     * wait till the conn is notified of new item.
+                     * Note that we check for 0 because we clear the
+                     * dcp_last_op value below.
+                     */
+                    testHarness.lock_cookie(cookie);
+                    /* waitfor_cookie() waits on a condition variable. But
+                       the api expects the cookie to be locked before
+                       calling it */
+                    wait_started = true;
+                    testHarness.waitfor_cookie(cookie);
+                    testHarness.unlock_cookie(cookie);
+                    break;
+                default:
+                    // Aborting ...
+                    std::string err_string("Unexpected DCP operation: " +
+                                           std::to_string(dcp_last_op));
+                    check(false, err_string.c_str());
+            }
+            if (num_mutations >= exp_mutations) {
+                done = true;
+            }
+            dcp_last_op = 0;
+        }
+    } while (!done);
+
+    /* Do buffer ack of the outstanding bytes */
+    h1->dcp.buffer_acknowledgement(h, cookie, ++opaque, 0, bytes_read);
+}
+
 // Testcases //////////////////////////////////////////////////////////////////
 
 static enum test_result test_dcp_vbtakeover_no_stream(ENGINE_HANDLE *h,
@@ -1531,6 +1612,55 @@ static enum test_result test_dcp_consumer_noop(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+static enum test_result test_dcp_producer_stream_req_open(ENGINE_HANDLE *h,
+                                                          ENGINE_HANDLE_V1 *h1)
+{
+    const void *cookie = testHarness.create_cookie();
+    const int num_items = 3;
+
+    DcpStreamCtx ctx;
+    ctx.vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+    ctx.seqno = {0, static_cast<uint64_t>(-1)};
+
+    std::string name("unittest");
+    TestDcpConsumer tdc(name.c_str(), cookie);
+    tdc.addStreamCtx(ctx);
+
+    tdc.openConnection(h, h1);
+
+    /* Create a separate thread that does tries to get any DCP items */
+    std::thread dcp_step_thread(dcp_waiting_step, h, h1, cookie, 0, num_items);
+
+    /* We need to wait till the 'dcp_waiting_step' thread begins its wait */
+    while (1) {
+        /* Busy wait is ok here. To do a non busy wait we must use
+         another condition variable which is an overkill here */
+        testHarness.lock_cookie(cookie);
+        if (wait_started) {
+            testHarness.unlock_cookie(cookie);
+            break;
+        }
+        testHarness.unlock_cookie(cookie);
+    }
+
+    /* Now create a stream */
+    tdc.openStreams(h, h1);
+
+    /* Write items */
+    write_items(h, h1, num_items, 0);
+    wait_for_flusher_to_settle(h, h1);
+    verify_curr_items(h, h1, num_items, "Wrong amount of items");
+
+    /* If the notification (to 'dcp_waiting_step' thread upon writing an item)
+     mechanism is efficient, we must see the 'dcp_waiting_step' finish before
+     test time out */
+    dcp_step_thread.join();
+
+    testHarness.destroy_cookie(cookie);
+
+    return SUCCESS;
+}
+
 static enum test_result test_dcp_producer_stream_req_partial(ENGINE_HANDLE *h,
                                                              ENGINE_HANDLE_V1 *h1) {
 
@@ -5460,6 +5590,7 @@ static enum test_result test_set_dcp_param(ENGINE_HANDLE *h,
 const char *default_dbname = "./ep_testsuite_dcp";
 
 BaseTestCase testsuite_testcases[] = {
+
         TestCase("test dcp vbtakeover stat no stream",
                  test_dcp_vbtakeover_no_stream, test_setup, teardown, nullptr,
                  prepare, cleanup),
@@ -5507,6 +5638,16 @@ BaseTestCase testsuite_testcases[] = {
         TestCase("test dcp replica stream all", test_dcp_replica_stream_all,
                  test_setup, teardown, "chk_remover_stime=1;max_checkpoints=2",
                  prepare, cleanup),
+        TestCase("test dcp producer stream open",
+                 test_dcp_producer_stream_req_open, test_setup, teardown,
+                 /* Expecting the connection manager background thread to notify
+                    the connection at its default time interval is not very
+                    efficent when we have items to be sent in a DCP stream.
+                    Hence increase the default time to very high value, so that
+                    the test fails if we are not doing a notification correctly
+                 */
+                 "connection_manager_interval=200000000",
+                 prepare, cleanup),
         TestCase("test producer stream request (partial)",
                  test_dcp_producer_stream_req_partial, test_setup, teardown,
                  /* set chk_period to essentially infinity so it won't run