MB-21724: Reduce iterations in CheckpointTest basic_chk_test 86/70086/6
authorJim Walker <jim@couchbase.com>
Fri, 18 Nov 2016 15:37:12 +0000 (15:37 +0000)
committerDave Rigby <daver@couchbase.com>
Mon, 21 Nov 2016 08:54:45 +0000 (08:54 +0000)
Reduce number of items and reduce threads and items further when
running under valgrind.

Also removed a sleep(1) and fixed the thread start/wait code
that relied on the sleep(1).

Change-Id: Ie6d71bf0972e30225343c12c51e36d5adadec794
Reviewed-on: http://review.couchbase.org/70086
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
tests/module_tests/checkpoint_test.cc

index 7b76c4c..38f1ca9 100644 (file)
 
 #include <gtest/gtest.h>
 #include <gmock/gmock.h>
 
 #include <gtest/gtest.h>
 #include <gmock/gmock.h>
+#include <valgrind/valgrind.h>
 
 #define NUM_TAP_THREADS 3
 
 #define NUM_TAP_THREADS 3
+#define NUM_TAP_THREADS_VG 2
 #define NUM_SET_THREADS 4
 #define NUM_SET_THREADS 4
+#define NUM_SET_THREADS_VG 2
 
 
-#define NUM_ITEMS 10000
+#define NUM_ITEMS 500
+#define NUM_ITEMS_VG 10
 
 #define DCP_CURSOR_PREFIX "dcp-client-"
 #define TAP_CURSOR_PREFIX "tap-client-"
 
 
 #define DCP_CURSOR_PREFIX "dcp-client-"
 #define TAP_CURSOR_PREFIX "tap-client-"
 
-struct thread_args {
-    SyncObject *mutex;
-    SyncObject *gate;
-    RCPtr<VBucket> vbucket;
-    CheckpointManager *checkpoint_manager;
-    int *counter;
-    std::string name;
-};
-
 extern "C" {
 
 /**
 extern "C" {
 
 /**
@@ -101,15 +96,34 @@ protected:
     std::unique_ptr<CheckpointManager> manager;
 };
 
     std::unique_ptr<CheckpointManager> manager;
 };
 
+struct thread_args {
+    RCPtr<VBucket> vbucket;
+    CheckpointManager *checkpoint_manager;
+    int n_threads;
+    std::string name;
+};
+
+/*
+ * atomically increment a threadCount
+ * if the calling thread is the last one up, notify_all
+ * if the calling thread is not the last one up, wait (in the function)
+ */
+static void thread_up(struct thread_args* args) {
+    static int threadCount = 0;
+    static std::mutex m;
+    static std::condition_variable cv;
+    std::unique_lock<std::mutex> lh(m);
+    if (++threadCount != args->n_threads) {
+        cv.wait(lh, [args](){return threadCount == args->n_threads;});
+    } else {
+        cv.notify_all(); // all threads accounted for, begin
+    }
+    lh.unlock();
+}
+
 static void launch_persistence_thread(void *arg) {
     struct thread_args *args = static_cast<struct thread_args *>(arg);
 static void launch_persistence_thread(void *arg) {
     struct thread_args *args = static_cast<struct thread_args *>(arg);
-    std::unique_lock<std::mutex> lh(*(args->mutex));
-    std::unique_lock<std::mutex> lhg(*(args->gate));
-    ++(*(args->counter));
-    lhg.unlock();
-    args->gate->notify_all();
-    args->mutex->wait(lh);
-    lh.unlock();
+    thread_up(args);
 
     bool flush = false;
     while(true) {
 
     bool flush = false;
     while(true) {
@@ -142,13 +156,7 @@ static void launch_persistence_thread(void *arg) {
 
 static void launch_tap_client_thread(void *arg) {
     struct thread_args *args = static_cast<struct thread_args *>(arg);
 
 static void launch_tap_client_thread(void *arg) {
     struct thread_args *args = static_cast<struct thread_args *>(arg);
-    std::unique_lock<std::mutex> lh(*(args->mutex));
-    std::unique_lock<std::mutex> lhg(*(args->gate));
-    ++(*(args->counter));
-    lhg.unlock();
-    args->gate->notify_all();
-    args->mutex->wait(lh);
-    lh.unlock();
+    thread_up(args);
 
     bool flush = false;
     bool isLastItem = false;
 
     bool flush = false;
     bool isLastItem = false;
@@ -165,13 +173,7 @@ static void launch_tap_client_thread(void *arg) {
 
 static void launch_checkpoint_cleanup_thread(void *arg) {
     struct thread_args *args = static_cast<struct thread_args *>(arg);
 
 static void launch_checkpoint_cleanup_thread(void *arg) {
     struct thread_args *args = static_cast<struct thread_args *>(arg);
-    std::unique_lock<std::mutex> lh(*(args->mutex));
-    std::unique_lock<std::mutex> lhg(*(args->gate));
-    ++(*(args->counter));
-    lhg.unlock();
-    args->gate->notify_all();
-    args->mutex->wait(lh);
-    lh.unlock();
+    thread_up(args);
 
     while (args->checkpoint_manager->getNumOfCursors() > 1) {
         bool newCheckpointCreated;
 
     while (args->checkpoint_manager->getNumOfCursors() > 1) {
         bool newCheckpointCreated;
@@ -182,16 +184,11 @@ static void launch_checkpoint_cleanup_thread(void *arg) {
 
 static void launch_set_thread(void *arg) {
     struct thread_args *args = static_cast<struct thread_args *>(arg);
 
 static void launch_set_thread(void *arg) {
     struct thread_args *args = static_cast<struct thread_args *>(arg);
-    std::unique_lock<std::mutex> lh(*(args->mutex));
-    std::unique_lock<std::mutex> lhg(*(args->gate));
-    ++(*(args->counter));
-    lhg.unlock();
-    args->gate->notify_all();
-    args->mutex->wait(lh);
-    lh.unlock();
+    thread_up(args);
 
     int i(0);
 
     int i(0);
-    for (i = 0; i < NUM_ITEMS; ++i) {
+    const int item_count = RUNNING_ON_VALGRIND ? NUM_ITEMS_VG : NUM_ITEMS;
+    for (i = 0; i < item_count; ++i) {
         std::stringstream key;
         key << "key-" << i;
         queued_item qi(new Item(key.str(), args->vbucket->getId(),
         std::stringstream key;
         key << "key-" << i;
         queued_item qi(new Item(key.str(), args->vbucket->getId(),
@@ -212,13 +209,15 @@ TEST_F(CheckpointTest, basic_chk_test) {
     CheckpointManager *checkpoint_manager = new CheckpointManager(global_stats, 0,
                                                                   checkpoint_config,
                                                                   1, 0, 0, cb);
     CheckpointManager *checkpoint_manager = new CheckpointManager(global_stats, 0,
                                                                   checkpoint_config,
                                                                   1, 0, 0, cb);
-    SyncObject *mutex = new SyncObject();
-    SyncObject *gate = new SyncObject();
-    int *counter = new int;
-    *counter = 0;
 
 
-    cb_thread_t tap_threads[NUM_TAP_THREADS];
-    cb_thread_t set_threads[NUM_SET_THREADS];
+    const int n_set_threads = RUNNING_ON_VALGRIND ? NUM_SET_THREADS_VG :
+                                                       NUM_SET_THREADS;
+
+    const int n_tap_threads = RUNNING_ON_VALGRIND ? NUM_TAP_THREADS_VG :
+                                                       NUM_TAP_THREADS;
+
+    std::vector<cb_thread_t> tap_threads(n_tap_threads);
+    std::vector<cb_thread_t> set_threads(n_set_threads);
     cb_thread_t persistence_thread;
     cb_thread_t checkpoint_cleanup_thread;
     int i(0), rc(0);
     cb_thread_t persistence_thread;
     cb_thread_t checkpoint_cleanup_thread;
     int i(0), rc(0);
@@ -226,19 +225,15 @@ TEST_F(CheckpointTest, basic_chk_test) {
     struct thread_args t_args;
     t_args.checkpoint_manager = checkpoint_manager;
     t_args.vbucket = vbucket;
     struct thread_args t_args;
     t_args.checkpoint_manager = checkpoint_manager;
     t_args.vbucket = vbucket;
-    t_args.mutex = mutex;
-    t_args.gate = gate;
-    t_args.counter = counter;
+    t_args.n_threads = n_set_threads + n_tap_threads + 2;
 
 
-    struct thread_args tap_t_args[NUM_TAP_THREADS];
-    for (i = 0; i < NUM_TAP_THREADS; ++i) {
+    std::vector<struct thread_args> tap_t_args(n_tap_threads);
+    for (i = 0; i < n_tap_threads; ++i) {
         std::string name(TAP_CURSOR_PREFIX + std::to_string(i));
         tap_t_args[i].checkpoint_manager = checkpoint_manager;
         tap_t_args[i].vbucket = vbucket;
         std::string name(TAP_CURSOR_PREFIX + std::to_string(i));
         tap_t_args[i].checkpoint_manager = checkpoint_manager;
         tap_t_args[i].vbucket = vbucket;
-        tap_t_args[i].mutex = mutex;
-        tap_t_args[i].gate = gate;
-        tap_t_args[i].counter = counter;
         tap_t_args[i].name = name;
         tap_t_args[i].name = name;
+        tap_t_args[i].n_threads = n_set_threads + n_tap_threads + 2;
         checkpoint_manager->registerCursor(name, 1, false,
                                            MustSendCheckpointEnd::YES);
     }
         checkpoint_manager->registerCursor(name, 1, false,
                                            MustSendCheckpointEnd::YES);
     }
@@ -250,28 +245,17 @@ TEST_F(CheckpointTest, basic_chk_test) {
                         launch_checkpoint_cleanup_thread, &t_args, 0);
     EXPECT_EQ(0, rc);
 
                         launch_checkpoint_cleanup_thread, &t_args, 0);
     EXPECT_EQ(0, rc);
 
-    for (i = 0; i < NUM_TAP_THREADS; ++i) {
+    for (i = 0; i < n_tap_threads; ++i) {
         rc = cb_create_thread(&tap_threads[i], launch_tap_client_thread, &tap_t_args[i], 0);
         EXPECT_EQ(0, rc);
     }
 
         rc = cb_create_thread(&tap_threads[i], launch_tap_client_thread, &tap_t_args[i], 0);
         EXPECT_EQ(0, rc);
     }
 
-    for (i = 0; i < NUM_SET_THREADS; ++i) {
+    for (i = 0; i < n_set_threads; ++i) {
         rc = cb_create_thread(&set_threads[i], launch_set_thread, &t_args, 0);
         EXPECT_EQ(0, rc);
     }
 
         rc = cb_create_thread(&set_threads[i], launch_set_thread, &t_args, 0);
         EXPECT_EQ(0, rc);
     }
 
-    // Wait for all threads to reach the starting gate
-    while (true) {
-        std::unique_lock<std::mutex> lh(*gate);
-        if (*counter == (NUM_TAP_THREADS + NUM_SET_THREADS + 2)) {
-            break;
-        }
-        gate->wait(lh);
-    }
-    sleep(1);
-    mutex->notify_all();
-
-    for (i = 0; i < NUM_SET_THREADS; ++i) {
+    for (i = 0; i < n_set_threads; ++i) {
         rc = cb_join_thread(set_threads[i]);
         EXPECT_EQ(0, rc);
     }
         rc = cb_join_thread(set_threads[i]);
         EXPECT_EQ(0, rc);
     }
@@ -285,7 +269,7 @@ TEST_F(CheckpointTest, basic_chk_test) {
     rc = cb_join_thread(persistence_thread);
     EXPECT_EQ(0, rc);
 
     rc = cb_join_thread(persistence_thread);
     EXPECT_EQ(0, rc);
 
-    for (i = 0; i < NUM_TAP_THREADS; ++i) {
+    for (i = 0; i < n_tap_threads; ++i) {
         rc = cb_join_thread(tap_threads[i]);
         EXPECT_EQ(0, rc);
         std::stringstream name;
         rc = cb_join_thread(tap_threads[i]);
         EXPECT_EQ(0, rc);
         std::stringstream name;
@@ -297,9 +281,6 @@ TEST_F(CheckpointTest, basic_chk_test) {
     EXPECT_EQ(0, rc);
 
     delete checkpoint_manager;
     EXPECT_EQ(0, rc);
 
     delete checkpoint_manager;
-    delete gate;
-    delete mutex;
-    delete counter;
 }
 
 TEST_F(CheckpointTest, reset_checkpoint_id) {
 }
 
 TEST_F(CheckpointTest, reset_checkpoint_id) {