#include "mock/mock_dcp.h"
#include "programs/engine_testapp/mock_server.h"
+#include <condition_variable>
#include <platform/cb_malloc.h>
#include <thread>
ENGINE_HANDLE *h;
ENGINE_HANDLE_V1 *h1;
int items;
+ std::mutex mutex;
+ std::condition_variable cv;
+ bool compactor_waiting{false};
+ bool compaction_start{false};
};
struct writer_thread_ctx {
uint32_t flags = 0;
std::string name = "unittest";
- while (get_int_stat(ctx->h, ctx->h1, "ep_pending_compactions") == 0);
+ // Wait for compaction thread to to ready (and waiting on cv) - as
+ // we don't want the nofify_one() to be lost.
+ for (;;) {
+ std::lock_guard<std::mutex> lh(ctx->mutex);
+ if (ctx->compactor_waiting) {
+ break;
+ }
+ };
+ // Now compactor is waiting to run (and we are just about to start DCP
+ // stream, activate compaction.
+ {
+ std::lock_guard<std::mutex> lh(ctx->mutex);
+ ctx->compaction_start = true;
+ }
+ ctx->cv.notify_one();
// Switch to replica
check(set_vbucket_state(ctx->h, ctx->h1, 0, vbucket_state_replica),
static void compact_thread_func(void *args) {
struct mb16357_ctx *ctx = static_cast<mb16357_ctx *>(args);
+ std::unique_lock<std::mutex> lk(ctx->mutex);
+ ctx->compactor_waiting = true;
+ ctx->cv.wait(lk, [ctx]{return ctx->compaction_start;});
compact_db(ctx->h, ctx->h1, 0, 99, ctx->items, 1);
}
struct mb16357_ctx ctx(h, h1, num_items);
cb_thread_t cp_thread, dcp_thread;
+ // First thread used to start a background compaction; note this waits
+ // on the DCP thread to start before initiating compaction.
cb_assert(cb_create_thread(&cp_thread,
compact_thread_func,
&ctx, 0) == 0);
+ // Second thread flips vbucket to replica, notifies compaction to start,
+ // and then performs DCP on the vbucket.
cb_assert(cb_create_thread(&dcp_thread,
dcp_thread_func,
&ctx, 0) == 0);