Streamline configuration.h
[ep-engine.git] / src / executorpool.h
1 /*
2  *     Copyright 2014 Couchbase, Inc.
3  *
4  *   Licensed under the Apache License, Version 2.0 (the "License");
5  *   you may not use this file except in compliance with the License.
6  *   You may obtain a copy of the License at
7  *
8  *       http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *   Unless required by applicable law or agreed to in writing, software
11  *   distributed under the License is distributed on an "AS IS" BASIS,
12  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *   See the License for the specific language governing permissions and
14  *   limitations under the License.
15  */
16
17 /*
18  * === High-level overview of the task execution system. ===
19  *
20  * ExecutorPool is the core interface for users wishing to run tasks on our
21  * worker threads.
22  *
23  * Under the covers we have a configurable number of system threads that are
24  * labeled with a type (see task_type_t). These threads service all buckets.
25  *
26  * Each thread operates by reading from a shared TaskQueue. Each thread wakes
27  * up and fetches (TaskQueue::fetchNextTask) a task for execution
28  * (GlobalTask::run() is called to execute the task).
29  *
30  * The pool also has the concept of high and low priority which is achieved by
31  * having two TaskQueue objects per task-type. When a thread wakes up to run
32  * a task, it will service the high-priority queue more frequently than the
33  * low-priority queue.
34  *
35  * Within a single queue itself there is also a task priority. The task priority
36  * is a value where lower is better. When many tasks are ready for execution
37  * they are moved to a ready queue and sorted by their priority. Thus tasks
38  * with priority 0 get to go before tasks with priority 1. Only once the ready
39  * queue of tasks is empty will we consider looking for more eligible tasks.
40  * In this context, an eligible task is one that has a wakeTime <= now.
41  *
42  * === Important methods of the ExecutorPool ===
43  *
44  * ExecutorPool* ExecutorPool::get()
45  *   The ExecutorPool is accessed via the static get() method. Calling get
46  *   returns the processes global ExecutorPool object. This is an instance
47  *   that is global/shared between all buckets.
48  *
49  * ExecutorPool::schedule(ExTask task, task_type_t qidx)
50  *   The schedule method allows task to be scheduled for future execution by a
51  *   thread of type 'qidx'. The task's 'wakeTime' determines approximately when
52  *   the task will be executed (no guarantees).
53  *
54  * ExecutorPool::wake(size_t taskId)
55  *   The wake method allows for a caller to request that the task matching
56  *   taskId be executed by its thread-type now'. The tasks wakeTime is modified
57  *   so that it has a wakeTime of now and a thread of the correct type is
58  *   signaled to wake-up and perform fetching. The woken task will have to wait
59  *   for any current tasks to be executed first, but it will jump ahead of other
60  *   tasks as tasks that are ready to run are ordered by their priority.
61  *
62  * ExecutorPool::snooze(size_t taskId, double toSleep)
63  *   The pool's snooze method will locate the task matching taskId and adjust
64  *   its wakeTime to account for the toSleep value.
65  */
66 #ifndef SRC_EXECUTORPOOL_H_
67 #define SRC_EXECUTORPOOL_H_ 1
68
69 #include "config.h"
70
71 #include "tasks.h"
72 #include "task_type.h"
73 #include "taskable.h"
74
75 #include <map>
76 #include <set>
77
78 // Forward decl
79 class TaskQueue;
80 class ExecutorThread;
81 class TaskLogEntry;
82
83 typedef std::vector<ExecutorThread *> ThreadQ;
84 typedef std::pair<ExTask, TaskQueue *> TaskQpair;
85 typedef std::vector<TaskQueue *> TaskQ;
86
87 class ExecutorPool {
88 public:
89
90     void addWork(size_t newWork, task_type_t qType);
91
92     void lessWork(task_type_t qType);
93
94     void startWork(task_type_t taskType);
95
96     void doneWork(task_type_t taskType);
97
98     bool trySleep(task_type_t task_type) {
99         if (!numReadyTasks[task_type]) {
100             numSleepers++;
101             return true;
102         }
103         return false;
104     }
105
106     void woke(void) {
107         numSleepers--;
108     }
109
110     TaskQueue *nextTask(ExecutorThread &t, uint8_t tick);
111
112     TaskQueue *getSleepQ(unsigned int curTaskType) {
113         return isHiPrioQset ? hpTaskQ[curTaskType] : lpTaskQ[curTaskType];
114     }
115
116     bool cancel(size_t taskId, bool eraseTask=false);
117
118     bool stopTaskGroup(task_gid_t taskGID, task_type_t qidx, bool force);
119
120     bool wake(size_t taskId);
121
122     /**
123      * Change how many worker threads there are for a given task type,
124      * stopping/starting threads to reach the desired number.
125      *
126      * @param type the type of task for which to adjust the workers
127      * @param newCount Target number of worker threads
128      */
129     void adjustWorkers(task_type_t type, size_t newCount);
130
131     bool snooze(size_t taskId, double tosleep);
132
133     void registerTaskable(Taskable& taskable);
134
135     void unregisterTaskable(Taskable& taskable, bool force);
136
137     void doWorkerStat(EventuallyPersistentEngine *engine, const void *cookie,
138                       ADD_STAT add_stat);
139
140     /**
141      * Generates stats regarding currently running tasks, as displayed by
142      * cbstats tasks.
143      */
144     void doTasksStat(EventuallyPersistentEngine* engine,
145                      const void* cookie,
146                      ADD_STAT add_stat);
147
148     void doTaskQStat(EventuallyPersistentEngine *engine, const void *cookie,
149                      ADD_STAT add_stat);
150
151     size_t getNumWorkersStat(void) {
152         LockHolder lh(tMutex);
153         return threadQ.size();
154     }
155
156     size_t getNumReaders(void);
157
158     size_t getNumWriters(void);
159
160     size_t getNumAuxIO(void);
161
162     size_t getNumNonIO(void);
163
164     size_t getMaxReaders(void) {
165         return numWorkers[READER_TASK_IDX];
166     }
167
168     size_t getMaxWriters(void) {
169         return numWorkers[WRITER_TASK_IDX];
170     }
171
172     size_t getMaxAuxIO(void) {
173         return numWorkers[AUXIO_TASK_IDX];
174     }
175
176     size_t getMaxNonIO(void) {
177         return numWorkers[NONIO_TASK_IDX];
178     }
179
180     void setNumReaders(uint16_t v) {
181         adjustWorkers(READER_TASK_IDX, v);
182     }
183
184     void setNumWriters(uint16_t v) {
185         adjustWorkers(WRITER_TASK_IDX, v);
186     }
187
188     void setNumAuxIO(uint16_t v) {
189         adjustWorkers(AUXIO_TASK_IDX, v);
190     }
191
192     void setNumNonIO(uint16_t v) {
193         adjustWorkers(NONIO_TASK_IDX, v);
194     }
195
196     size_t getNumReadyTasks(void) { return totReadyTasks; }
197
198     size_t getNumSleepers(void) { return numSleepers; }
199
200     size_t schedule(ExTask task);
201
202     static ExecutorPool *get(void);
203
204     static void shutdown(void);
205
206 protected:
207
208     ExecutorPool(size_t t, size_t nTaskSets, size_t r, size_t w, size_t a,
209                  size_t n);
210     virtual ~ExecutorPool(void);
211
212     TaskQueue* _nextTask(ExecutorThread &t, uint8_t tick);
213     bool _cancel(size_t taskId, bool eraseTask=false);
214     bool _wake(size_t taskId);
215     virtual bool _startWorkers(void);
216     ssize_t _adjustWorkers(task_type_t type, size_t desiredNumItems);
217     bool _snooze(size_t taskId, double tosleep);
218     size_t _schedule(ExTask task);
219     void _registerTaskable(Taskable& taskable);
220     void _unregisterTaskable(Taskable& taskable, bool force);
221     bool _stopTaskGroup(task_gid_t taskGID, task_type_t qidx, bool force);
222     TaskQueue* _getTaskQueue(const Taskable& t, task_type_t qidx);
223     void _stopAndJoinThreads();
224
225     size_t numTaskSets; // safe to read lock-less not altered after creation
226     size_t maxGlobalThreads;
227
228     std::atomic<size_t> totReadyTasks;
229     SyncObject mutex; // Thread management condition var + mutex
230
231     //! A mapping of task ids to Task, TaskQ in the thread pool
232     std::map<size_t, TaskQpair> taskLocator;
233
234     //A list of threads
235     ThreadQ threadQ;
236
237     // Global cross bucket priority queues where tasks get scheduled into ...
238     TaskQ hpTaskQ; // a vector array of numTaskSets elements for high priority
239     bool isHiPrioQset;
240
241     TaskQ lpTaskQ; // a vector array of numTaskSets elements for low priority
242     bool isLowPrioQset;
243
244     size_t numBuckets;
245
246     SyncObject tMutex; // to serialize taskLocator, threadQ, numBuckets access
247
248     std::atomic<uint16_t> numSleepers; // total number of sleeping threads
249     std::atomic<uint16_t> *curWorkers; // track # of active workers per TaskSet
250     std::atomic<uint16_t>* numWorkers; // and limit it to the value set here
251     std::atomic<size_t> *numReadyTasks; // number of ready tasks per task set
252
253     // Set of all known task owners
254     std::set<void *> taskOwners;
255
256     // Singleton creation
257     static std::mutex initGuard;
258     static std::atomic<ExecutorPool*> instance;
259 };
260 #endif  // SRC_EXECUTORPOOL_H_