MB-18453: Make task scheduling fairer
[ep-engine.git] / src / fakes / fake_executorpool.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2016 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 /*
19  * FakeExecutorPool / FakeExecutorThread
20  *
21  * A pair of classes which act as a fake ExecutorPool for testing purposes.
22  * Only executes tasks when explicitly told, and only on the main thread.
23  *
24  * See SingleThreadedEPStoreTest for basic usage.
25  *
26  * TODO: Improve usage documentation.
27  */
28
29 #pragma once
30
31 #include "executorpool.h"
32 #include "executorthread.h"
33 #include "taskqueue.h"
34
35 #include <gtest/gtest.h>
36
37 class SingleThreadedExecutorPool : public ExecutorPool {
38 public:
39
40     /* Registers an instance of this class as "the" executorpool (i.e. what
41      * you get when you call ExecutorPool::get()).
42      *
43      * This *must* be called before the normal ExecutorPool is created.
44      */
45     static void replaceExecutorPoolWithFake() {
46         LockHolder lh(initGuard);
47         ExecutorPool* tmp = ExecutorPool::instance;
48         if (tmp != NULL) {
49             throw std::runtime_error("replaceExecutorPoolWithFake: "
50                     "ExecutorPool instance already created - cowardly refusing to continue!");
51         }
52
53         EventuallyPersistentEngine *epe =
54                 ObjectRegistry::onSwitchThread(NULL, true);
55         tmp = new SingleThreadedExecutorPool(NUM_TASK_GROUPS);
56         ObjectRegistry::onSwitchThread(epe);
57         instance = tmp;
58     }
59
60     SingleThreadedExecutorPool(size_t nTaskSets)
61         : ExecutorPool(/*threads*/0, nTaskSets, 0, 0, 0, 0) {
62     }
63
64     bool _startWorkers() {
65         // Don't actually start any worker threads (all work will be done
66         // synchronously in the same thread) - but we do need to set
67         // maxWorkers to at least 1 otherwise ExecutorPool::tryNewWork() will
68         // never return any work.
69
70         maxWorkers[WRITER_TASK_IDX] = 1;
71         maxWorkers[READER_TASK_IDX] = 1;
72         maxWorkers[AUXIO_TASK_IDX]  = 1;
73         maxWorkers[NONIO_TASK_IDX]  = 1;
74
75         return true;
76     }
77
78     // Helper methods to access normally protected state of ExecutorPool
79
80     TaskQ& getLpTaskQ() {
81         return lpTaskQ;
82     }
83
84     /*
85      * Mark all tasks as cancelled and remove the from the locator.
86      */
87     void cancelAll() {
88         for (auto& it : taskLocator) {
89             it.second.first->cancel();
90             // And force awake so he is "runnable"
91             it.second.second->wake(it.second.first);
92         }
93         taskLocator.clear();
94     }
95
96     size_t getTotReadyTasks() {
97         return totReadyTasks;
98     }
99
100     size_t getNumReadyTasks(task_type_t qType) {
101         return numReadyTasks[qType];
102     }
103 };
104
105 /*
106  * A container for a single task to 'execute' on divorced of the logical thread.
107  * Performs checks of the taskQueue once execution is complete.
108  */
109 class CheckedExecutor : public ExecutorThread {
110 public:
111
112     CheckedExecutor(ExecutorPool* manager_, TaskQueue& q)
113         : ExecutorThread(manager_, q.getQueueType(), "checked_executor"),
114           queue(q),
115           preFutureQueueSize(queue.getFutureQueueSize()),
116           preReadyQueueSize(queue.getReadyQueueSize()),
117           rescheduled(false) {
118         if (!queue.fetchNextTask(*this, false)) {
119             throw std::logic_error("CheckedExecutor failed fetchNextTask");
120         }
121
122         // Configure a checker to run, some tasks are subtly different
123         if (getTaskName().compare("Snapshotting vbucket states") == 0 ||
124             getTaskName().compare("Removing closed unreferenced checkpoints from memory") == 0 ||
125             getTaskName().compare("Paging expired items.") == 0 ||
126             getTaskName().compare("Adjusting hash table sizes.") == 0) {
127             checker = [=](bool taskRescheduled) {
128                 // These tasks all schedule one other task
129                 this->oneExecutes(taskRescheduled, 1);
130             };
131         } else {
132             checker = [=](bool taskRescheduled) {
133                 this->oneExecutes(taskRescheduled, 0);
134             };
135         }
136     }
137
138     void runCurrentTask(const std::string& expectedTask) {
139         EXPECT_EQ(expectedTask, getTaskName());
140         run();
141     }
142
143     void runCurrentTask() {
144         run();
145     }
146
147     void completeCurrentTask() {
148         manager->doneWork(curTaskType);
149         if (rescheduled && !currentTask->isdead()) {
150             queue.reschedule(currentTask, curTaskType);
151         } else {
152             manager->cancel(currentTask->getId(), true);
153         }
154
155         if (!currentTask->isdead()) {
156             checker(rescheduled);
157         }
158     }
159
160     void updateCurrentTime() {
161         now = gethrtime();
162     }
163
164     ExTask& getCurrentTask() {
165         return currentTask;
166     }
167
168 private:
169
170     /*
171      * Performs checks based on the assumption that one task executes and can
172      * as part of that execution
173      *   - request itself to be rescheduled
174      *   - schedule other tasks (expectedToBeScheduled)
175      */
176     void oneExecutes(bool rescheduled, int expectedToBeScheduled) {
177         if (rescheduled) {
178             // One task executed and was rescheduled, account for it.
179             expectedToBeScheduled++;
180         }
181
182         // Check that the new sizes of the future and ready tally given
183         // one executed and n were scheduled as a side effect.
184         EXPECT_EQ((preFutureQueueSize + preReadyQueueSize) - 1,
185                   (queue.getFutureQueueSize() + queue.getReadyQueueSize()) -
186                   expectedToBeScheduled);
187     }
188
189     /*
190      * Run the task and record if it was rescheduled.
191      */
192     void run() {
193         rescheduled = currentTask->run();
194     }
195
196     TaskQueue& queue;
197     size_t preFutureQueueSize;
198     size_t preReadyQueueSize;
199     bool rescheduled;
200
201     /*
202      * A function object that runs post task execution for the purpose of
203      * running checks against state changes.
204      * The defined function accepts one boolean parameter that states if the
205      * task which just executed has been rescheduled.
206      */
207     std::function<void(bool)> checker;
208 };