[BP] MB-18452: Single threaded test harness improvements 28/65928/3
authorJim Walker <jim@couchbase.com>
Thu, 2 Jun 2016 15:05:50 +0000 (16:05 +0100)
committerDave Rigby <daver@couchbase.com>
Wed, 20 Jul 2016 08:17:47 +0000 (08:17 +0000)
Refactor parts of the very new evp_store_single_threaded_test so that
it's simpler to drive tasks making new tests easier to write.

The main change is to provide helper methods for running any task from
a queue (with some checks) and a way to push a clean shutdown.

Change-Id: I7add574f0768c642f3c6c7c64293e882337a1cdc
Reviewed-on: http://review.couchbase.org/65928
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/ep_engine.cc
src/fakes/fake_executorpool.h
tests/module_tests/evp_store_single_threaded_test.cc

index 99aa4ac..3b7328d 100644 (file)
@@ -2124,9 +2124,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
     CheckpointConfig::addConfigChangeListener(*this);
 
     epstore = new EventuallyPersistentStore(*this);
-    if (epstore == NULL) {
-        return ENGINE_ENOMEM;
-    }
 
     initializeEngineCallbacks();
 
index a8cca39..33a17f7 100644 (file)
@@ -30,6 +30,7 @@
 
 #include "executorpool.h"
 #include "executorthread.h"
+#include "taskqueue.h"
 
 #include <gtest/gtest.h>
 
@@ -79,36 +80,121 @@ public:
     TaskQ& getLpTaskQ() {
         return lpTaskQ;
     }
+
+    /*
+     * Mark all tasks as cancelled and remove the from the locator.
+     */
+    void cancelAll() {
+        for (auto& it : taskLocator) {
+            it.second.first->cancel();
+            // And force awake so he is "runnable"
+            it.second.second->wake(it.second.first);
+        }
+        taskLocator.clear();
+    }
 };
 
-/* A fake execution 'thread', to be used with the FakeExecutorPool Allows
- * execution of tasks synchronously in the current thread.
+/*
+ * A container for a single task to 'execute' on divorced of the logical thread.
+ * Performs checks of the taskQueue once execution is complete.
  */
-class FakeExecutorThread : public ExecutorThread {
+class CheckedExecutor : public ExecutorThread {
 public:
-    FakeExecutorThread(ExecutorPool* manager_, int startingQueue)
-        : ExecutorThread(manager_, startingQueue, "mock_executor") {
+
+    CheckedExecutor(ExecutorPool* manager_, TaskQueue& q)
+        : ExecutorThread(manager_, q.getQueueType(), "checked_executor"),
+          queue(q),
+          preFutureQueueSize(queue.getFutureQueueSize()),
+          preReadyQueueSize(queue.getReadyQueueSize()),
+          rescheduled(false) {
+        if (!queue.fetchNextTask(*this, false)) {
+            throw std::logic_error("CheckedExecutor failed fetchNextTask");
+        }
+
+        // Configure a checker to run, some tasks are subtly different
+        if (getTaskName().compare("Snapshotting vbucket states") == 0 ||
+            getTaskName().compare("Removing closed unreferenced checkpoints from memory") == 0 ||
+            getTaskName().compare("Paging expired items.") == 0 ||
+            getTaskName().compare("Adjusting hash table sizes.") == 0) {
+            checker = [=](bool taskRescheduled) {
+                // These tasks all schedule one other task
+                this->oneExecutes(taskRescheduled, 1);
+            };
+        } else {
+            checker = [=](bool taskRescheduled) {
+                this->oneExecutes(taskRescheduled, 0);
+            };
+        }
+    }
+
+    void runCurrentTask(const std::string& expectedTask) {
+        EXPECT_EQ(expectedTask, getTaskName());
+        run();
     }
 
     void runCurrentTask() {
-        // Only supports one-shot tasks
-        EXPECT_FALSE(currentTask->run());
-        completeCurrentTask();
+        run();
     }
 
-    // 'completes' the current task; useful if the caller wants to seperately
-    // run() the current task and then tidy up afterwards.
     void completeCurrentTask() {
         manager->doneWork(curTaskType);
-        manager->cancel(currentTask->getId(), true);
-        currentTask.reset();
+        if (rescheduled && !currentTask->isdead()) {
+            queue.reschedule(currentTask, curTaskType);
+        } else {
+            manager->cancel(currentTask->getId(), true);
+        }
+
+        if (!currentTask->isdead()) {
+            checker(rescheduled);
+        }
+    }
+
+    void updateCurrentTime() {
+        now = gethrtime();
     }
 
     ExTask& getCurrentTask() {
         return currentTask;
     }
 
-    void updateCurrentTime() {
-        now = gethrtime();
+private:
+
+    /*
+     * Performs checks based on the assumption that one task executes and can
+     * as part of that execution
+     *   - request itself to be rescheduled
+     *   - schedule other tasks (expectedToBeScheduled)
+     */
+    void oneExecutes(bool rescheduled, int expectedToBeScheduled) {
+        if (rescheduled) {
+            // One task executed and was rescheduled, account for it.
+            expectedToBeScheduled++;
+        }
+
+        // Check that the new sizes of the future and ready tally given
+        // one executed and n were scheduled as a side effect.
+        EXPECT_EQ((preFutureQueueSize + preReadyQueueSize) - 1,
+                  (queue.getFutureQueueSize() + queue.getReadyQueueSize()) -
+                  expectedToBeScheduled);
+    }
+
+    /*
+     * Run the task and record if it was rescheduled.
+     */
+    void run() {
+        rescheduled = currentTask->run();
     }
+
+    TaskQueue& queue;
+    size_t preFutureQueueSize;
+    size_t preReadyQueueSize;
+    bool rescheduled;
+
+    /*
+     * A function object that runs post task execution for the purpose of
+     * running checks against state changes.
+     * The defined function accepts one boolean parameter that states if the
+     * task which just executed has been rescheduled.
+     */
+    std::function<void(bool)> checker;
 };
index bead4a3..edbf787 100644 (file)
@@ -30,7 +30,80 @@ class SingleThreadedEPStoreTest : public EventuallyPersistentStoreTest {
     void SetUp() {
         SingleThreadedExecutorPool::replaceExecutorPoolWithFake();
         EventuallyPersistentStoreTest::SetUp();
+
+        task_executor = reinterpret_cast<SingleThreadedExecutorPool*>
+            (ExecutorPool::get());
+    }
+
+    void TearDown() {
+        if (engine) {
+            shutdownAndPurgeTasks();
+        }
+        EventuallyPersistentStoreTest::TearDown();
+    }
+
+public:
+    /*
+     * Run the next task from the taskQ
+     * The task must match the expectedTaskName parameter
+     */
+    void runNextTask(TaskQueue& taskQ, const std::string& expectedTaskName) {
+        CheckedExecutor executor(task_executor, taskQ);
+
+        // Run the task
+        executor.runCurrentTask(expectedTaskName);
+        executor.completeCurrentTask();
+    }
+
+    /*
+     * Run the next task from the taskQ
+     */
+    void runNextTask(TaskQueue& taskQ) {
+        CheckedExecutor executor(task_executor, taskQ);
+
+        // Run the task
+        executor.runCurrentTask();
+        executor.completeCurrentTask();
+    }
+
+protected:
+    /*
+     * Change the vbucket state and run the VBStatePeristTask
+     * On return the state will be changed and the task completed.
+     */
+    void setVBucketStateAndRunPersistTask(uint16_t vbid, vbucket_state_t newState) {
+        auto& lpWriterQ = *task_executor->getLpTaskQ()[WRITER_TASK_IDX];
+
+        // Change state - this should add 1 VBStatePersistTask to the WRITER queue.
+        EXPECT_EQ(ENGINE_SUCCESS,
+                  store->setVBucketState(vbid, newState, /*transfer*/false));
+
+        runNextTask(lpWriterQ, "Persisting a vbucket state for vbucket: "
+                               + std::to_string(vbid));
+    }
+
+    /*
+     * Set the stats isShutdown and attempt to drive all tasks to cancel
+     */
+    void shutdownAndPurgeTasks() {
+        engine->getEpStats().isShutdown = true;
+        task_executor->cancelAll();
+
+        for (task_type_t t :
+             {WRITER_TASK_IDX, READER_TASK_IDX, AUXIO_TASK_IDX, NONIO_TASK_IDX}) {
+
+            // Define a lambda to drive all tasks from the queue, if hpTaskQ
+            // is implemented then trivial to add a second call to runTasks.
+            auto runTasks = [=](TaskQueue& queue) {
+                while (queue.getFutureQueueSize() > 0 || queue.getReadyQueueSize()> 0){
+                    runNextTask(queue);
+                }
+            };
+            runTasks(*task_executor->getLpTaskQ()[t]);
+        }
     }
+
+    SingleThreadedExecutorPool* task_executor;
 };
 
 static ENGINE_ERROR_CODE dummy_dcp_add_failover_cb(vbucket_failover_t* entry,
@@ -45,7 +118,7 @@ static ENGINE_ERROR_CODE dummy_dcp_add_failover_cb(vbucket_failover_t* entry,
  */
 typedef struct {
     EventuallyPersistentEngine* engine;
-    FakeExecutorThread& fake_executor_thread;
+    CheckedExecutor& backfill;
     SyncObject& backfill_cv;
     SyncObject& destroy_cv;
     TaskQueue* taskQ;
@@ -54,7 +127,7 @@ typedef struct {
 static void MB20054_run_backfill_task(void* arg) {
     mb20054_backfill_thread_params* params = static_cast<mb20054_backfill_thread_params*>(arg);
     EventuallyPersistentEngine* engine = params->engine;
-    FakeExecutorThread& auxio_thread = params->fake_executor_thread;
+    CheckedExecutor& backfill = params->backfill;
     SyncObject& backfill_cv = params->backfill_cv;
     SyncObject& destroy_cv = params->destroy_cv;
 
@@ -64,7 +137,7 @@ static void MB20054_run_backfill_task(void* arg) {
 
     // Run the BackfillManagerTask task to push items to readyQ. In sherlock
     // upwards this runs multiple times - so should return true.
-    EXPECT_TRUE(auxio_thread.getCurrentTask()->run());
+    backfill.runCurrentTask("Backfilling items for a DCP Connection");
 
     // Notify the main thread that it can progress with destroying the
     // engine [A].
@@ -86,19 +159,17 @@ static void MB20054_run_backfill_task(void* arg) {
     // Best we can do is spin on waiting for the DCPBackfill task to be
     // set to 'dead' - and only then completeCurrentTask; which will
     // cancel the task.
-    while (!auxio_thread.getCurrentTask()->isdead()) {
+    while (!backfill.getCurrentTask()->isdead()) {
         // spin.
     }
-    auxio_thread.completeCurrentTask();
+    backfill.completeCurrentTask();
 
-    // Cleanup - fetch the next (final) task -
+    // Cleanup - run the next (final) task -
     // ActiveStreamCheckpointProcessorTask - so it can be cancelled
     // and executorpool shut down.
-    auxio_thread.updateCurrentTime();
-    EXPECT_TRUE(lpAuxioQ->fetchNextTask(auxio_thread, false));
-    EXPECT_EQ("Process checkpoint(s) for DCP producer",
-              auxio_thread.getTaskName());
-    auxio_thread.runCurrentTask();
+    CheckedExecutor executor(ExecutorPool::get(), *lpAuxioQ);
+    executor.runCurrentTask("Process checkpoint(s) for DCP producer");
+    executor.completeCurrentTask();
 }
 
 // Check that if onDeleteItem() is called during bucket deletion, we do not
@@ -107,9 +178,6 @@ static void MB20054_run_backfill_task(void* arg) {
 // bucket shutdown, which has a non-zero number of Items which are destructed
 // (and call onDeleteItem).
 TEST_F(SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
-    SingleThreadedExecutorPool* task_executor =
-            reinterpret_cast<SingleThreadedExecutorPool*>(ExecutorPool::get());
-
     // Should start with no tasks registered on any queues.
     TaskQ& lp_task_q = task_executor->getLpTaskQ();
     for (auto& queue : lp_task_q) {
@@ -117,26 +185,8 @@ TEST_F(SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
         ASSERT_EQ(0, queue->getReadyQueueSize());
     }
 
-    // [[1] Set our state to active. This should add a VBStatePersistTask to
-    // the WRITER queue.
-    EXPECT_EQ(ENGINE_SUCCESS,
-              store->setVBucketState(vbid, vbucket_state_active, false));
-
-    TaskQueue* lpWriterQ = task_executor->getLpTaskQ()[WRITER_TASK_IDX];
-    TaskQueue* lpAuxioQ = task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
-
-    EXPECT_EQ(1, lpWriterQ->getFutureQueueSize());
-    EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
-
-    // Use a FakeExecutorThread to fetch and run the persistTask.
-    FakeExecutorThread writer_thread(task_executor, WRITER_TASK_IDX);
-    writer_thread.updateCurrentTime();
-    EXPECT_TRUE(lpWriterQ->fetchNextTask(writer_thread, false));
-    EXPECT_EQ("Persisting a vbucket state for vbucket: 0",
-              writer_thread.getTaskName());
-    EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
-    EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
-    writer_thread.runCurrentTask();
+    // [[1] Set our state to active.
+    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
 
     // Perform one SET, then close it's checkpoint. This means that we no
     // longer have all sequence numbers in memory checkpoints, forcing the
@@ -147,10 +197,11 @@ TEST_F(SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
     RCPtr<VBucket> vb = store->getVbMap().getBucket(vbid);
     CheckpointManager& ckpt_mgr = vb->checkpointManager;
     ckpt_mgr.createNewCheckpoint();
-
+    auto lpWriterQ = task_executor->getLpTaskQ()[WRITER_TASK_IDX];
     EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
     EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
 
+    auto lpAuxioQ = task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
     EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
     EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
 
@@ -199,11 +250,7 @@ TEST_F(SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
     // object has it's currentTask set to BackfillManagerTask, the task
     // will not be deleted.
     // Essentially we are simulating a concurrent thread running this task.
-    FakeExecutorThread auxio_thread(task_executor, AUXIO_TASK_IDX);
-    auxio_thread.updateCurrentTime();
-    EXPECT_TRUE(lpAuxioQ->fetchNextTask(auxio_thread, false));
-    EXPECT_EQ("Backfilling items for a DCP Connection",
-              auxio_thread.getTaskName());
+    CheckedExecutor backfill(task_executor, *lpAuxioQ);
 
     // This is the one action we really need to perform 'concurrently' - delete
     // the engine while a DCPBackfill task is still running. We spin up a
@@ -223,8 +270,8 @@ TEST_F(SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
     SyncObject destroy_cv;
 
     cb_thread_t concurrent_task_thread;
-    mb20054_backfill_thread_params params = {engine, auxio_thread, backfill_cv,
-                                     destroy_cv, lpAuxioQ};
+    mb20054_backfill_thread_params params = {engine, backfill, backfill_cv,
+                                             destroy_cv, lpAuxioQ};
 
     cb_create_thread(&concurrent_task_thread, MB20054_run_backfill_task, &params, 0);
 
@@ -232,6 +279,7 @@ TEST_F(SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
     LockHolder lh(backfill_cv);
     backfill_cv.wait();
 
+    ObjectRegistry::onSwitchThread(engine);
     // 'Destroy' the engine - this doesn't delete the object, just shuts down
     // connections, marks streams as dead etc.
     engine->destroy(/*force*/false);
@@ -245,7 +293,6 @@ TEST_F(SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
     // Need to have the current engine valid before deleting (this is what
     // EvpDestroy does normally; however we have a smart ptr to the engine
     // so must delete via that).
-    ObjectRegistry::onSwitchThread(engine);
     delete engine;
     engine = NULL;