From: Jim Walker Date: Fri, 18 Nov 2016 15:37:12 +0000 (+0000) Subject: MB-21724: Reduce iterations in CheckpointTest basic_chk_test X-Git-Tag: v4.6.0~15 X-Git-Url: http://review.couchbase.org/gitweb?p=ep-engine.git;a=commitdiff_plain;h=5350f9157bd2f44e3f08a127f6364e0abf2872b6 MB-21724: Reduce iterations in CheckpointTest basic_chk_test Reduce number of items and reduce threads and items further when running under valgrind. Also removed a sleep(1) and fixed the thread start/wait code that relied on the sleep(1). Change-Id: Ie6d71bf0972e30225343c12c51e36d5adadec794 Reviewed-on: http://review.couchbase.org/70086 Tested-by: buildbot Reviewed-by: Dave Rigby --- diff --git a/tests/module_tests/checkpoint_test.cc b/tests/module_tests/checkpoint_test.cc index 7b76c4c..38f1ca9 100644 --- a/tests/module_tests/checkpoint_test.cc +++ b/tests/module_tests/checkpoint_test.cc @@ -28,24 +28,19 @@ #include #include +#include #define NUM_TAP_THREADS 3 +#define NUM_TAP_THREADS_VG 2 #define NUM_SET_THREADS 4 +#define NUM_SET_THREADS_VG 2 -#define NUM_ITEMS 10000 +#define NUM_ITEMS 500 +#define NUM_ITEMS_VG 10 #define DCP_CURSOR_PREFIX "dcp-client-" #define TAP_CURSOR_PREFIX "tap-client-" -struct thread_args { - SyncObject *mutex; - SyncObject *gate; - RCPtr vbucket; - CheckpointManager *checkpoint_manager; - int *counter; - std::string name; -}; - extern "C" { /** @@ -101,15 +96,34 @@ protected: std::unique_ptr manager; }; +struct thread_args { + RCPtr vbucket; + CheckpointManager *checkpoint_manager; + int n_threads; + std::string name; +}; + +/* + * atomically increment a threadCount + * if the calling thread is the last one up, notify_all + * if the calling thread is not the last one up, wait (in the function) + */ +static void thread_up(struct thread_args* args) { + static int threadCount = 0; + static std::mutex m; + static std::condition_variable cv; + std::unique_lock lh(m); + if (++threadCount != args->n_threads) { + cv.wait(lh, [args](){return threadCount == args->n_threads;}); + } else { + cv.notify_all(); // all threads accounted for, begin + } + lh.unlock(); +} + static void launch_persistence_thread(void *arg) { struct thread_args *args = static_cast(arg); - std::unique_lock lh(*(args->mutex)); - std::unique_lock lhg(*(args->gate)); - ++(*(args->counter)); - lhg.unlock(); - args->gate->notify_all(); - args->mutex->wait(lh); - lh.unlock(); + thread_up(args); bool flush = false; while(true) { @@ -142,13 +156,7 @@ static void launch_persistence_thread(void *arg) { static void launch_tap_client_thread(void *arg) { struct thread_args *args = static_cast(arg); - std::unique_lock lh(*(args->mutex)); - std::unique_lock lhg(*(args->gate)); - ++(*(args->counter)); - lhg.unlock(); - args->gate->notify_all(); - args->mutex->wait(lh); - lh.unlock(); + thread_up(args); bool flush = false; bool isLastItem = false; @@ -165,13 +173,7 @@ static void launch_tap_client_thread(void *arg) { static void launch_checkpoint_cleanup_thread(void *arg) { struct thread_args *args = static_cast(arg); - std::unique_lock lh(*(args->mutex)); - std::unique_lock lhg(*(args->gate)); - ++(*(args->counter)); - lhg.unlock(); - args->gate->notify_all(); - args->mutex->wait(lh); - lh.unlock(); + thread_up(args); while (args->checkpoint_manager->getNumOfCursors() > 1) { bool newCheckpointCreated; @@ -182,16 +184,11 @@ static void launch_checkpoint_cleanup_thread(void *arg) { static void launch_set_thread(void *arg) { struct thread_args *args = static_cast(arg); - std::unique_lock lh(*(args->mutex)); - std::unique_lock lhg(*(args->gate)); - ++(*(args->counter)); - lhg.unlock(); - args->gate->notify_all(); - args->mutex->wait(lh); - lh.unlock(); + thread_up(args); int i(0); - for (i = 0; i < NUM_ITEMS; ++i) { + const int item_count = RUNNING_ON_VALGRIND ? NUM_ITEMS_VG : NUM_ITEMS; + for (i = 0; i < item_count; ++i) { std::stringstream key; key << "key-" << i; queued_item qi(new Item(key.str(), args->vbucket->getId(), @@ -212,13 +209,15 @@ TEST_F(CheckpointTest, basic_chk_test) { CheckpointManager *checkpoint_manager = new CheckpointManager(global_stats, 0, checkpoint_config, 1, 0, 0, cb); - SyncObject *mutex = new SyncObject(); - SyncObject *gate = new SyncObject(); - int *counter = new int; - *counter = 0; - cb_thread_t tap_threads[NUM_TAP_THREADS]; - cb_thread_t set_threads[NUM_SET_THREADS]; + const int n_set_threads = RUNNING_ON_VALGRIND ? NUM_SET_THREADS_VG : + NUM_SET_THREADS; + + const int n_tap_threads = RUNNING_ON_VALGRIND ? NUM_TAP_THREADS_VG : + NUM_TAP_THREADS; + + std::vector tap_threads(n_tap_threads); + std::vector set_threads(n_set_threads); cb_thread_t persistence_thread; cb_thread_t checkpoint_cleanup_thread; int i(0), rc(0); @@ -226,19 +225,15 @@ TEST_F(CheckpointTest, basic_chk_test) { struct thread_args t_args; t_args.checkpoint_manager = checkpoint_manager; t_args.vbucket = vbucket; - t_args.mutex = mutex; - t_args.gate = gate; - t_args.counter = counter; + t_args.n_threads = n_set_threads + n_tap_threads + 2; - struct thread_args tap_t_args[NUM_TAP_THREADS]; - for (i = 0; i < NUM_TAP_THREADS; ++i) { + std::vector tap_t_args(n_tap_threads); + for (i = 0; i < n_tap_threads; ++i) { std::string name(TAP_CURSOR_PREFIX + std::to_string(i)); tap_t_args[i].checkpoint_manager = checkpoint_manager; tap_t_args[i].vbucket = vbucket; - tap_t_args[i].mutex = mutex; - tap_t_args[i].gate = gate; - tap_t_args[i].counter = counter; tap_t_args[i].name = name; + tap_t_args[i].n_threads = n_set_threads + n_tap_threads + 2; checkpoint_manager->registerCursor(name, 1, false, MustSendCheckpointEnd::YES); } @@ -250,28 +245,17 @@ TEST_F(CheckpointTest, basic_chk_test) { launch_checkpoint_cleanup_thread, &t_args, 0); EXPECT_EQ(0, rc); - for (i = 0; i < NUM_TAP_THREADS; ++i) { + for (i = 0; i < n_tap_threads; ++i) { rc = cb_create_thread(&tap_threads[i], launch_tap_client_thread, &tap_t_args[i], 0); EXPECT_EQ(0, rc); } - for (i = 0; i < NUM_SET_THREADS; ++i) { + for (i = 0; i < n_set_threads; ++i) { rc = cb_create_thread(&set_threads[i], launch_set_thread, &t_args, 0); EXPECT_EQ(0, rc); } - // Wait for all threads to reach the starting gate - while (true) { - std::unique_lock lh(*gate); - if (*counter == (NUM_TAP_THREADS + NUM_SET_THREADS + 2)) { - break; - } - gate->wait(lh); - } - sleep(1); - mutex->notify_all(); - - for (i = 0; i < NUM_SET_THREADS; ++i) { + for (i = 0; i < n_set_threads; ++i) { rc = cb_join_thread(set_threads[i]); EXPECT_EQ(0, rc); } @@ -285,7 +269,7 @@ TEST_F(CheckpointTest, basic_chk_test) { rc = cb_join_thread(persistence_thread); EXPECT_EQ(0, rc); - for (i = 0; i < NUM_TAP_THREADS; ++i) { + for (i = 0; i < n_tap_threads; ++i) { rc = cb_join_thread(tap_threads[i]); EXPECT_EQ(0, rc); std::stringstream name; @@ -297,9 +281,6 @@ TEST_F(CheckpointTest, basic_chk_test) { EXPECT_EQ(0, rc); delete checkpoint_manager; - delete gate; - delete mutex; - delete counter; } TEST_F(CheckpointTest, reset_checkpoint_id) {