MB-18453: Make task scheduling fairer 29/65929/3
authorJim Walker <jim@couchbase.com>
Thu, 30 Jun 2016 10:23:20 +0000 (11:23 +0100)
committerDave Rigby <daver@couchbase.com>
Wed, 20 Jul 2016 12:54:36 +0000 (12:54 +0000)
The MB identified that we can starve tasks by scheduling
a higher priority task via ExecutorPool::wake().

This occurs because ExecutorPool::wake() pushes tasks
into the readyQueue enabling frequent wakes to trigger
the starvation bug.

The fix is to remove readyQueue.push from wake, so that we only
push to the readyQueue. The fetch side of scheduling only looks at
the futureQueue once the readyQueue is empty, thus the identified
starvation won't happen.

A unit-test demonstrates the fix using the single-threaded harness and
expects that two tasks of differing priorities get executed, rather
than the wake() starving the low-priority task.

This test drives:
 - ExecutorPool::schedule
 - ExecutorPool::reschedule
 - ExecutorPool::wake

These are all the methods which can add tasks into the scheduler
queue.

The fetch side is also covered:
 - ExecutorPool::fetchNextTask

This commit is an update to a previous commit that was reverted due
to performance issues. The original commit was reverted to minimise
disruption.

- original commit is e22c9ebeda1aac
- revert is 27cb1120e3e37

Change-Id: I70a4dcf7cd1c3a6f04548e9bbc3f95e24cdf50ad
Reviewed-on: http://review.couchbase.org/65929
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/executorpool.h
src/fakes/fake_executorpool.h
src/taskqueue.cc
tests/module_tests/evp_store_single_threaded_test.cc

index b16062c..ac32bc5 100644 (file)
  *   limitations under the License.
  */
 
+/*
+ * === High-level overview of the task execution system. ===
+ *
+ * ExecutorPool is the core interface for users wishing to run tasks on our
+ * worker threads.
+ *
+ * Under the covers we have a configurable number of system threads that are
+ * labeled with a type (see task_type_t). These threads service all buckets.
+ *
+ * Each thread operates by reading from a shared TaskQueue. Each thread wakes
+ * up and fetches (TaskQueue::fetchNextTask) a task for execution
+ * (GlobalTask::run() is called to execute the task).
+ *
+ * The pool also has the concept of high and low priority which is achieved by
+ * having two TaskQueue objects per task-type. When a thread wakes up to run
+ * a task, it will service the high-priority queue more frequently than the
+ * low-priority queue.
+ *
+ * Within a single queue itself there is also a task priority. The task priority
+ * is a value where lower is better. When many tasks are ready for execution
+ * they are moved to a ready queue and sorted by their priority. Thus tasks
+ * with priority 0 get to go before tasks with priority 1. Only once the ready
+ * queue of tasks is empty will we consider looking for more eligible tasks.
+ * In this context, an eligible task is one that has a wakeTime <= now.
+ *
+ * === Important methods of the ExecutorPool ===
+ *
+ * ExecutorPool* ExecutorPool::get()
+ *   The ExecutorPool is accessed via the static get() method. Calling get
+ *   returns the processes global ExecutorPool object. This is an instance
+ *   that is global/shared between all buckets.
+ *
+ * ExecutorPool::schedule(ExTask task, task_type_t qidx)
+ *   The schedule method allows task to be scheduled for future execution by a
+ *   thread of type 'qidx'. The task's 'wakeTime' determines approximately when
+ *   the task will be executed (no guarantees).
+ *
+ * ExecutorPool::wake(size_t taskId)
+ *   The wake method allows for a caller to request that the task matching
+ *   taskId be executed by its thread-type now'. The tasks wakeTime is modified
+ *   so that it has a wakeTime of now and a thread of the correct type is
+ *   signaled to wake-up and perform fetching. The woken task will have to wait
+ *   for any current tasks to be executed first, but it will jump ahead of other
+ *   tasks as tasks that are ready to run are ordered by their priority.
+ *
+ * ExecutorPool::snooze(size_t taskId, double toSleep)
+ *   The pool's snooze method will locate the task matching taskId and adjust
+ *   its wakeTime to account for the toSleep value.
+ */
 #ifndef SRC_EXECUTORPOOL_H_
 #define SRC_EXECUTORPOOL_H_ 1
 
index 33a17f7..8e51913 100644 (file)
@@ -92,6 +92,14 @@ public:
         }
         taskLocator.clear();
     }
+
+    size_t getTotReadyTasks() {
+        return totReadyTasks;
+    }
+
+    size_t getNumReadyTasks(task_type_t qType) {
+        return numReadyTasks[qType];
+    }
 };
 
 /*
index e698a8f..5da9482 100644 (file)
@@ -276,16 +276,15 @@ void TaskQueue::_wake(ExTask &task) {
     while (!notReady.empty()) {
         ExTask tid = notReady.front();
         if (tid->getWaketime() <= now || tid->isdead()) {
-            readyQueue.push(tid);
             numReady++;
-        } else {
-            futureQueue.push(tid);
         }
+
+        // MB-18453: Only push to the futureQueue
+        futureQueue.push(tid);
         notReady.pop();
     }
 
     if (numReady) {
-        manager->addWork(numReady, queueType);
         _doWake_UNLOCKED(numReady);
         TaskQueue *sleepQ = manager->getSleepQ(queueType);
         lh.unlock();
index edbf787..13a0aa1 100644 (file)
@@ -298,3 +298,55 @@ TEST_F(SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
 
     cb_join_thread(concurrent_task_thread);
 }
+
+/*
+ * MB-18953 is triggered by the executorpool wake path moving tasks directly
+ * into the readyQueue, thus allowing for high-priority tasks to dominiate
+ * a taskqueue.
+ */
+TEST_F(SingleThreadedEPStoreTest, MB18953_taskWake) {
+    auto& lpNonioQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
+
+    class TestTask : public GlobalTask {
+    public:
+        TestTask(EventuallyPersistentEngine* e, TaskId id)
+          : GlobalTask(e, id, 0.0, false) {}
+
+        // returning true will also drive the ExecutorPool::reschedule path.
+        bool run() { return true; }
+
+        std::string getDescription() {
+            return std::string("TestTask ") + GlobalTask::getTaskName(getTypeId());
+        }
+    };
+
+    ExTask hpTask = new TestTask(engine,
+                                 TaskId::PendingOpsNotification);
+    task_executor->schedule(hpTask, NONIO_TASK_IDX);
+
+    ExTask lpTask = new TestTask(engine,
+                                 TaskId::DefragmenterTask);
+    task_executor->schedule(lpTask, NONIO_TASK_IDX);
+
+    runNextTask(lpNonioQ, "TestTask PendingOpsNotification"); // hptask goes first
+    // Ensure that a wake to the hpTask doesn't mean the lpTask gets ignored
+    lpNonioQ.wake(hpTask);
+
+    // Check 1 task is ready
+    EXPECT_EQ(1, task_executor->getTotReadyTasks());
+    EXPECT_EQ(1, task_executor->getNumReadyTasks(NONIO_TASK_IDX));
+
+    runNextTask(lpNonioQ, "TestTask DefragmenterTask"); // lptask goes second
+
+    // Run the tasks again to check that coming from ::reschedule our
+    // expectations are still met.
+    runNextTask(lpNonioQ, "TestTask PendingOpsNotification"); // hptask goes first
+
+    // Ensure that a wake to the hpTask doesn't mean the lpTask gets ignored
+    lpNonioQ.wake(hpTask);
+
+    // Check 1 task is ready
+    EXPECT_EQ(1, task_executor->getTotReadyTasks());
+    EXPECT_EQ(1, task_executor->getNumReadyTasks(NONIO_TASK_IDX));
+    runNextTask(lpNonioQ, "TestTask DefragmenterTask"); // lptask goes second
+}