cond.wait(lock);
}
+ template <class Predicate>
+ void wait(std::unique_lock<std::mutex>& lock, Predicate pred) {
+ cond.wait(lock, pred);
+ }
+
void wait_for(std::unique_lock<std::mutex>& lock,
const double secs) {
cond.wait_for(lock, std::chrono::milliseconds(int64_t(secs * 1000.0)));
{
LockHolder lh(mutex);
+ // If we are rescheduling a previously cancelled task, we should reset
+ // the task state to the initial value of running.
+ task->setState(TASK_RUNNING, TASK_DEAD);
+
futureQueue.push(task);
LOG(EXTENSION_LOG_DEBUG,
EXPECT_TRUE(pool->threadExists("writer_worker_0"));
EXPECT_TRUE(pool->threadExists("writer_worker_1"));
}
+
+/* Make sure that a task that has run once and been cancelled can be
+ * rescheduled and will run again properly.
+ */
+TEST_F(ExecutorPoolDynamicWorkerTest, reschedule_dead_task) {
+ size_t runCount{0};
+
+ ExTask task = new LambdaTask(taskable, TaskId::ItemPager, 0, true, [&] {
+ ++runCount;
+ return false;
+ });
+
+ ASSERT_EQ(TASK_RUNNING, task->getState())
+ << "Initial task state should be RUNNING";
+
+ pool->schedule(task);
+ pool->waitForEmptyTaskLocator();
+
+ EXPECT_EQ(TASK_DEAD, task->getState())
+ << "Task has completed and been cleaned up, state should be DEAD";
+
+ pool->schedule(task);
+ pool->waitForEmptyTaskLocator();
+
+ EXPECT_EQ(TASK_DEAD, task->getState())
+ << "Task has completed and been cleaned up, state should be DEAD";
+
+ EXPECT_EQ(2, runCount);
+}
#include <executorthread.h>
#include <gtest/gtest.h>
#include <taskable.h>
+#include <thread>
#include "thread_gate.h"
class MockTaskable : public Taskable {
return std::find(names.begin(), names.end(), name) != names.end();
}
+ /** Waits indefinitely for the taskLocator to become empty, indicating all
+ * tasks have been cancelled and cleaned up.
+ */
+ void waitForEmptyTaskLocator() {
+ std::unique_lock<std::mutex> lh(tMutex);
+ tMutex.wait(lh, [this] { return taskLocator.empty(); });
+ }
+
~TestExecutorPool() = default;
};