MB-18453: Make task scheduling fairer
[ep-engine.git] / src / executorpool.h
1 /*
2  *     Copyright 2014 Couchbase, Inc.
3  *
4  *   Licensed under the Apache License, Version 2.0 (the "License");
5  *   you may not use this file except in compliance with the License.
6  *   You may obtain a copy of the License at
7  *
8  *       http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *   Unless required by applicable law or agreed to in writing, software
11  *   distributed under the License is distributed on an "AS IS" BASIS,
12  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *   See the License for the specific language governing permissions and
14  *   limitations under the License.
15  */
16
17 /*
18  * === High-level overview of the task execution system. ===
19  *
20  * ExecutorPool is the core interface for users wishing to run tasks on our
21  * worker threads.
22  *
23  * Under the covers we have a configurable number of system threads that are
24  * labeled with a type (see task_type_t). These threads service all buckets.
25  *
26  * Each thread operates by reading from a shared TaskQueue. Each thread wakes
27  * up and fetches (TaskQueue::fetchNextTask) a task for execution
28  * (GlobalTask::run() is called to execute the task).
29  *
30  * The pool also has the concept of high and low priority which is achieved by
31  * having two TaskQueue objects per task-type. When a thread wakes up to run
32  * a task, it will service the high-priority queue more frequently than the
33  * low-priority queue.
34  *
35  * Within a single queue itself there is also a task priority. The task priority
36  * is a value where lower is better. When many tasks are ready for execution
37  * they are moved to a ready queue and sorted by their priority. Thus tasks
38  * with priority 0 get to go before tasks with priority 1. Only once the ready
39  * queue of tasks is empty will we consider looking for more eligible tasks.
40  * In this context, an eligible task is one that has a wakeTime <= now.
41  *
42  * === Important methods of the ExecutorPool ===
43  *
44  * ExecutorPool* ExecutorPool::get()
45  *   The ExecutorPool is accessed via the static get() method. Calling get
46  *   returns the processes global ExecutorPool object. This is an instance
47  *   that is global/shared between all buckets.
48  *
49  * ExecutorPool::schedule(ExTask task, task_type_t qidx)
50  *   The schedule method allows task to be scheduled for future execution by a
51  *   thread of type 'qidx'. The task's 'wakeTime' determines approximately when
52  *   the task will be executed (no guarantees).
53  *
54  * ExecutorPool::wake(size_t taskId)
55  *   The wake method allows for a caller to request that the task matching
56  *   taskId be executed by its thread-type now'. The tasks wakeTime is modified
57  *   so that it has a wakeTime of now and a thread of the correct type is
58  *   signaled to wake-up and perform fetching. The woken task will have to wait
59  *   for any current tasks to be executed first, but it will jump ahead of other
60  *   tasks as tasks that are ready to run are ordered by their priority.
61  *
62  * ExecutorPool::snooze(size_t taskId, double toSleep)
63  *   The pool's snooze method will locate the task matching taskId and adjust
64  *   its wakeTime to account for the toSleep value.
65  */
66 #ifndef SRC_EXECUTORPOOL_H_
67 #define SRC_EXECUTORPOOL_H_ 1
68
69 #include "config.h"
70
71 #include <map>
72 #include <set>
73 #include <queue>
74
75 #include "tasks.h"
76 #include "ringbuffer.h"
77 #include "task_type.h"
78
79 // Forward decl
80 class TaskQueue;
81 class ExecutorThread;
82 class TaskLogEntry;
83
84 typedef std::vector<ExecutorThread *> ThreadQ;
85 typedef std::pair<ExTask, TaskQueue *> TaskQpair;
86 typedef std::pair<RingBuffer<TaskLogEntry>*, RingBuffer<TaskLogEntry> *>
87                                                                 TaskLog;
88 typedef std::vector<TaskQueue *> TaskQ;
89
90 class ExecutorPool {
91 public:
92
93     void addWork(size_t newWork, task_type_t qType);
94
95     void lessWork(task_type_t qType);
96
97     void doneWork(task_type_t &doneTaskType);
98
99     task_type_t tryNewWork(task_type_t newTaskType);
100
101     bool trySleep(task_type_t task_type) {
102         if (!numReadyTasks[task_type]) {
103             numSleepers++;
104             return true;
105         }
106         return false;
107     }
108
109     void woke(void) {
110         numSleepers--;
111     }
112
113     TaskQueue *nextTask(ExecutorThread &t, uint8_t tick);
114
115     TaskQueue *getSleepQ(unsigned int curTaskType) {
116         return isHiPrioQset ? hpTaskQ[curTaskType] : lpTaskQ[curTaskType];
117     }
118
119     bool cancel(size_t taskId, bool eraseTask=false);
120
121     bool stopTaskGroup(EventuallyPersistentEngine *e, task_type_t qidx,
122                        bool force);
123
124     bool wake(size_t taskId);
125
126     bool snooze(size_t taskId, double tosleep);
127
128     void registerBucket(EventuallyPersistentEngine *engine);
129
130     void unregisterBucket(EventuallyPersistentEngine *engine, bool force);
131
132     void doWorkerStat(EventuallyPersistentEngine *engine, const void *cookie,
133                       ADD_STAT add_stat);
134
135     void doTaskQStat(EventuallyPersistentEngine *engine, const void *cookie,
136                      ADD_STAT add_stat);
137
138     size_t getNumWorkersStat(void) { return threadQ.size(); }
139
140     size_t getNumCPU(void);
141
142     size_t getNumWorkers(void);
143
144     size_t getNumReaders(void);
145
146     size_t getNumWriters(void);
147
148     size_t getNumAuxIO(void);
149
150     size_t getNumNonIO(void);
151
152     size_t getMaxReaders(void) { return maxWorkers[READER_TASK_IDX]; }
153
154     size_t getMaxWriters(void) { return maxWorkers[WRITER_TASK_IDX]; }
155
156     size_t getMaxAuxIO(void) { return maxWorkers[AUXIO_TASK_IDX]; }
157
158     size_t getMaxNonIO(void) { return maxWorkers[NONIO_TASK_IDX]; }
159
160     void setMaxReaders(uint16_t v) { maxWorkers[READER_TASK_IDX] = v; }
161
162     void setMaxWriters(uint16_t v) { maxWorkers[WRITER_TASK_IDX] = v; }
163
164     void setMaxAuxIO(uint16_t v) { maxWorkers[AUXIO_TASK_IDX] = v; }
165
166     void setMaxNonIO(uint16_t v) { maxWorkers[NONIO_TASK_IDX] = v; }
167
168     size_t getNumReadyTasks(void) { return totReadyTasks; }
169
170     size_t getNumSleepers(void) { return numSleepers; }
171
172     size_t schedule(ExTask task, task_type_t qidx);
173
174     static ExecutorPool *get(void);
175
176     static void shutdown(void);
177
178 protected:
179
180     ExecutorPool(size_t t, size_t nTaskSets, size_t r, size_t w, size_t a,
181                  size_t n);
182     virtual ~ExecutorPool(void);
183
184     TaskQueue* _nextTask(ExecutorThread &t, uint8_t tick);
185     bool _cancel(size_t taskId, bool eraseTask=false);
186     bool _wake(size_t taskId);
187     virtual bool _startWorkers(void);
188     bool _snooze(size_t taskId, double tosleep);
189     size_t _schedule(ExTask task, task_type_t qidx);
190     void _registerBucket(EventuallyPersistentEngine *engine);
191     void _unregisterBucket(EventuallyPersistentEngine *engine, bool force);
192     bool _stopTaskGroup(EventuallyPersistentEngine *e, task_type_t qidx, bool force);
193     TaskQueue* _getTaskQueue(EventuallyPersistentEngine *e, task_type_t qidx);
194
195     size_t numTaskSets; // safe to read lock-less not altered after creation
196     size_t maxGlobalThreads;
197
198     AtomicValue<size_t> totReadyTasks;
199     SyncObject mutex; // Thread management condition var + mutex
200
201     //! A mapping of task ids to Task, TaskQ in the thread pool
202     std::map<size_t, TaskQpair> taskLocator;
203
204     //A list of threads
205     ThreadQ threadQ;
206
207     // Global cross bucket priority queues where tasks get scheduled into ...
208     TaskQ hpTaskQ; // a vector array of numTaskSets elements for high priority
209     bool isHiPrioQset;
210
211     TaskQ lpTaskQ; // a vector array of numTaskSets elements for low priority
212     bool isLowPrioQset;
213
214     size_t numBuckets;
215
216     SyncObject tMutex; // to serialize taskLocator, threadQ, numBuckets access
217
218     AtomicValue<uint16_t> numSleepers; // total number of sleeping threads
219     AtomicValue<uint16_t> *curWorkers; // track # of active workers per TaskSet
220     AtomicValue<uint16_t> *maxWorkers; // and limit it to the value set here
221     AtomicValue<size_t> *numReadyTasks; // number of ready tasks per task set
222
223     // Set of all known buckets
224     std::set<void *> buckets;
225
226     // Singleton creation
227     static Mutex initGuard;
228     static ExecutorPool *instance;
229 };
230 #endif  // SRC_EXECUTORPOOL_H_