class ConnManager : public GlobalTask {
public:
ConnManager(EventuallyPersistentEngine *e, ConnMap *cmap)
- : GlobalTask(e, TaskId::ConnManager, MIN_SLEEP_TIME, true),
- engine(e), connmap(cmap) { }
+ : GlobalTask(e, TaskId::ConnManager,
+ e->getConfiguration().getConnectionManagerInterval(),
+ true),
+ engine(e), connmap(cmap),
+ snoozeTime(e->getConfiguration().getConnectionManagerInterval()) { }
bool run(void) {
connmap->manageConnections();
- snooze(MIN_SLEEP_TIME);
+ snooze(snoozeTime);
return !engine->getEpStats().isShutdown ||
!connmap->isAllEmpty() ||
!connmap->isDeadConnectionsEmpty();
private:
EventuallyPersistentEngine *engine;
ConnMap *connmap;
+ size_t snoozeTime;
};
class ConnMapValueChangeListener : public ValueChangedListener {
if (tp && tp->isPaused() && conn->isReserved() &&
tp->setNotificationScheduled(true)) {
pendingNotifications.push(conn);
- connNotifier_->notifyMutationEvent(); // Wake up the connection notifier so that
- // it can notify the event to a given
- // paused connection.
+ if (connNotifier_) {
+ // Wake up the connection notifier so that
+ // it can notify the event to a given paused connection.
+ connNotifier_->notifyMutationEvent();
+ }
}
} else {
LockHolder rlh(releaseLock);
(This approach seems safer than calling pthread_cancel()) */
static std::atomic<bool> stop_continuous_dcp_thread(false);
+static bool wait_started(false);
+
struct SeqnoRange {
uint64_t start;
uint64_t end;
}
}
+/* DCP step thread that keeps running till it reads upto 'exp_mutations'.
+ Note: the exp_mutations is cumulative across all streams in the DCP
+ connection */
+static void dcp_waiting_step(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+ const void *cookie, uint32_t opaque,
+ uint64_t exp_mutations)
+{
+ bool done = false;
+ size_t bytes_read = 0;
+ bool pending_marker_ack = false;
+ uint64_t marker_end = 0;
+ uint64_t num_mutations = 0;
+ std::unique_ptr<dcp_message_producers> producers(get_dcp_producers(h, h1));
+
+ do {
+ if (bytes_read > 512) {
+ checkeq(ENGINE_SUCCESS,
+ h1->dcp.buffer_acknowledgement(h, cookie, ++opaque, 0,
+ bytes_read),
+ "Failed to get dcp buffer ack");
+ bytes_read = 0;
+ }
+ ENGINE_ERROR_CODE err = h1->dcp.step(h, cookie, producers.get());
+ if (err == ENGINE_DISCONNECT) {
+ done = true;
+ } else {
+ switch (dcp_last_op) {
+ case PROTOCOL_BINARY_CMD_DCP_MUTATION:
+ bytes_read += dcp_last_packet_size;
+ if (pending_marker_ack && dcp_last_byseqno == marker_end) {
+ sendDcpAck(h, h1, cookie,
+ PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER,
+ PROTOCOL_BINARY_RESPONSE_SUCCESS,
+ dcp_last_opaque);
+ }
+ ++num_mutations;
+ break;
+ case PROTOCOL_BINARY_CMD_DCP_STREAM_END:
+ done = true;
+ bytes_read += dcp_last_packet_size;
+ break;
+ case PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER:
+ if (dcp_last_flags & 8) {
+ pending_marker_ack = true;
+ marker_end = dcp_last_snap_end_seqno;
+ }
+ bytes_read += dcp_last_packet_size;
+ break;
+ case 0:
+ /* No messages were ready on the last step call, so we
+ * wait till the conn is notified of new item.
+ * Note that we check for 0 because we clear the
+ * dcp_last_op value below.
+ */
+ testHarness.lock_cookie(cookie);
+ /* waitfor_cookie() waits on a condition variable. But
+ the api expects the cookie to be locked before
+ calling it */
+ wait_started = true;
+ testHarness.waitfor_cookie(cookie);
+ testHarness.unlock_cookie(cookie);
+ break;
+ default:
+ // Aborting ...
+ std::string err_string("Unexpected DCP operation: " +
+ std::to_string(dcp_last_op));
+ check(false, err_string.c_str());
+ }
+ if (num_mutations >= exp_mutations) {
+ done = true;
+ }
+ dcp_last_op = 0;
+ }
+ } while (!done);
+
+ /* Do buffer ack of the outstanding bytes */
+ h1->dcp.buffer_acknowledgement(h, cookie, ++opaque, 0, bytes_read);
+}
+
// Testcases //////////////////////////////////////////////////////////////////
static enum test_result test_dcp_vbtakeover_no_stream(ENGINE_HANDLE *h,
return SUCCESS;
}
+static enum test_result test_dcp_producer_stream_req_open(ENGINE_HANDLE *h,
+ ENGINE_HANDLE_V1 *h1)
+{
+ const void *cookie = testHarness.create_cookie();
+ const int num_items = 3;
+
+ DcpStreamCtx ctx;
+ ctx.vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
+ ctx.seqno = {0, static_cast<uint64_t>(-1)};
+
+ std::string name("unittest");
+ TestDcpConsumer tdc(name.c_str(), cookie);
+ tdc.addStreamCtx(ctx);
+
+ tdc.openConnection(h, h1);
+
+ /* Create a separate thread that does tries to get any DCP items */
+ std::thread dcp_step_thread(dcp_waiting_step, h, h1, cookie, 0, num_items);
+
+ /* We need to wait till the 'dcp_waiting_step' thread begins its wait */
+ while (1) {
+ /* Busy wait is ok here. To do a non busy wait we must use
+ another condition variable which is an overkill here */
+ testHarness.lock_cookie(cookie);
+ if (wait_started) {
+ testHarness.unlock_cookie(cookie);
+ break;
+ }
+ testHarness.unlock_cookie(cookie);
+ }
+
+ /* Now create a stream */
+ tdc.openStreams(h, h1);
+
+ /* Write items */
+ write_items(h, h1, num_items, 0);
+ wait_for_flusher_to_settle(h, h1);
+ verify_curr_items(h, h1, num_items, "Wrong amount of items");
+
+ /* If the notification (to 'dcp_waiting_step' thread upon writing an item)
+ mechanism is efficient, we must see the 'dcp_waiting_step' finish before
+ test time out */
+ dcp_step_thread.join();
+
+ testHarness.destroy_cookie(cookie);
+
+ return SUCCESS;
+}
+
static enum test_result test_dcp_producer_stream_req_partial(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
const char *default_dbname = "./ep_testsuite_dcp";
BaseTestCase testsuite_testcases[] = {
+
TestCase("test dcp vbtakeover stat no stream",
test_dcp_vbtakeover_no_stream, test_setup, teardown, nullptr,
prepare, cleanup),
TestCase("test dcp replica stream all", test_dcp_replica_stream_all,
test_setup, teardown, "chk_remover_stime=1;max_checkpoints=2",
prepare, cleanup),
+ TestCase("test dcp producer stream open",
+ test_dcp_producer_stream_req_open, test_setup, teardown,
+ /* Expecting the connection manager background thread to notify
+ the connection at its default time interval is not very
+ efficent when we have items to be sent in a DCP stream.
+ Hence increase the default time to very high value, so that
+ the test fails if we are not doing a notification correctly
+ */
+ "connection_manager_interval=200000000",
+ prepare, cleanup),
TestCase("test producer stream request (partial)",
test_dcp_producer_stream_req_partial, test_setup, teardown,
/* set chk_period to essentially infinity so it won't run