2 * Copyright 2014 Couchbase, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 * === High-level overview of the task execution system. ===
20 * ExecutorPool is the core interface for users wishing to run tasks on our
23 * Under the covers we have a configurable number of system threads that are
24 * labeled with a type (see task_type_t). These threads service all buckets.
26 * Each thread operates by reading from a shared TaskQueue. Each thread wakes
27 * up and fetches (TaskQueue::fetchNextTask) a task for execution
28 * (GlobalTask::run() is called to execute the task).
30 * The pool also has the concept of high and low priority which is achieved by
31 * having two TaskQueue objects per task-type. When a thread wakes up to run
32 * a task, it will service the high-priority queue more frequently than the
35 * Within a single queue itself there is also a task priority. The task priority
36 * is a value where lower is better. When many tasks are ready for execution
37 * they are moved to a ready queue and sorted by their priority. Thus tasks
38 * with priority 0 get to go before tasks with priority 1. Only once the ready
39 * queue of tasks is empty will we consider looking for more eligible tasks.
40 * In this context, an eligible task is one that has a wakeTime <= now.
42 * === Important methods of the ExecutorPool ===
44 * ExecutorPool* ExecutorPool::get()
45 * The ExecutorPool is accessed via the static get() method. Calling get
46 * returns the processes global ExecutorPool object. This is an instance
47 * that is global/shared between all buckets.
49 * ExecutorPool::schedule(ExTask task, task_type_t qidx)
50 * The schedule method allows task to be scheduled for future execution by a
51 * thread of type 'qidx'. The task's 'wakeTime' determines approximately when
52 * the task will be executed (no guarantees).
54 * ExecutorPool::wake(size_t taskId)
55 * The wake method allows for a caller to request that the task matching
56 * taskId be executed by its thread-type now'. The tasks wakeTime is modified
57 * so that it has a wakeTime of now and a thread of the correct type is
58 * signaled to wake-up and perform fetching. The woken task will have to wait
59 * for any current tasks to be executed first, but it will jump ahead of other
60 * tasks as tasks that are ready to run are ordered by their priority.
62 * ExecutorPool::snooze(size_t taskId, double toSleep)
63 * The pool's snooze method will locate the task matching taskId and adjust
64 * its wakeTime to account for the toSleep value.
66 #ifndef SRC_EXECUTORPOOL_H_
67 #define SRC_EXECUTORPOOL_H_ 1
76 #include "ringbuffer.h"
77 #include "task_type.h"
84 typedef std::vector<ExecutorThread *> ThreadQ;
85 typedef std::pair<ExTask, TaskQueue *> TaskQpair;
86 typedef std::pair<RingBuffer<TaskLogEntry>*, RingBuffer<TaskLogEntry> *>
88 typedef std::vector<TaskQueue *> TaskQ;
93 void addWork(size_t newWork, task_type_t qType);
95 void lessWork(task_type_t qType);
97 void doneWork(task_type_t &doneTaskType);
99 task_type_t tryNewWork(task_type_t newTaskType);
101 bool trySleep(task_type_t task_type) {
102 if (!numReadyTasks[task_type]) {
113 TaskQueue *nextTask(ExecutorThread &t, uint8_t tick);
115 TaskQueue *getSleepQ(unsigned int curTaskType) {
116 return isHiPrioQset ? hpTaskQ[curTaskType] : lpTaskQ[curTaskType];
119 bool cancel(size_t taskId, bool eraseTask=false);
121 bool stopTaskGroup(EventuallyPersistentEngine *e, task_type_t qidx,
124 bool wake(size_t taskId);
126 bool snooze(size_t taskId, double tosleep);
128 void registerBucket(EventuallyPersistentEngine *engine);
130 void unregisterBucket(EventuallyPersistentEngine *engine, bool force);
132 void doWorkerStat(EventuallyPersistentEngine *engine, const void *cookie,
135 void doTaskQStat(EventuallyPersistentEngine *engine, const void *cookie,
138 size_t getNumWorkersStat(void) { return threadQ.size(); }
140 size_t getNumCPU(void);
142 size_t getNumWorkers(void);
144 size_t getNumReaders(void);
146 size_t getNumWriters(void);
148 size_t getNumAuxIO(void);
150 size_t getNumNonIO(void);
152 size_t getMaxReaders(void) { return maxWorkers[READER_TASK_IDX]; }
154 size_t getMaxWriters(void) { return maxWorkers[WRITER_TASK_IDX]; }
156 size_t getMaxAuxIO(void) { return maxWorkers[AUXIO_TASK_IDX]; }
158 size_t getMaxNonIO(void) { return maxWorkers[NONIO_TASK_IDX]; }
160 void setMaxReaders(uint16_t v) { maxWorkers[READER_TASK_IDX] = v; }
162 void setMaxWriters(uint16_t v) { maxWorkers[WRITER_TASK_IDX] = v; }
164 void setMaxAuxIO(uint16_t v) { maxWorkers[AUXIO_TASK_IDX] = v; }
166 void setMaxNonIO(uint16_t v) { maxWorkers[NONIO_TASK_IDX] = v; }
168 size_t getNumReadyTasks(void) { return totReadyTasks; }
170 size_t getNumSleepers(void) { return numSleepers; }
172 size_t schedule(ExTask task, task_type_t qidx);
174 static ExecutorPool *get(void);
176 static void shutdown(void);
180 ExecutorPool(size_t t, size_t nTaskSets, size_t r, size_t w, size_t a,
182 virtual ~ExecutorPool(void);
184 TaskQueue* _nextTask(ExecutorThread &t, uint8_t tick);
185 bool _cancel(size_t taskId, bool eraseTask=false);
186 bool _wake(size_t taskId);
187 virtual bool _startWorkers(void);
188 bool _snooze(size_t taskId, double tosleep);
189 size_t _schedule(ExTask task, task_type_t qidx);
190 void _registerBucket(EventuallyPersistentEngine *engine);
191 void _unregisterBucket(EventuallyPersistentEngine *engine, bool force);
192 bool _stopTaskGroup(EventuallyPersistentEngine *e, task_type_t qidx, bool force);
193 TaskQueue* _getTaskQueue(EventuallyPersistentEngine *e, task_type_t qidx);
195 size_t numTaskSets; // safe to read lock-less not altered after creation
196 size_t maxGlobalThreads;
198 AtomicValue<size_t> totReadyTasks;
199 SyncObject mutex; // Thread management condition var + mutex
201 //! A mapping of task ids to Task, TaskQ in the thread pool
202 std::map<size_t, TaskQpair> taskLocator;
207 // Global cross bucket priority queues where tasks get scheduled into ...
208 TaskQ hpTaskQ; // a vector array of numTaskSets elements for high priority
211 TaskQ lpTaskQ; // a vector array of numTaskSets elements for low priority
216 SyncObject tMutex; // to serialize taskLocator, threadQ, numBuckets access
218 AtomicValue<uint16_t> numSleepers; // total number of sleeping threads
219 AtomicValue<uint16_t> *curWorkers; // track # of active workers per TaskSet
220 AtomicValue<uint16_t> *maxWorkers; // and limit it to the value set here
221 AtomicValue<size_t> *numReadyTasks; // number of ready tasks per task set
223 // Set of all known buckets
224 std::set<void *> buckets;
226 // Singleton creation
227 static Mutex initGuard;
228 static ExecutorPool *instance;
230 #endif // SRC_EXECUTORPOOL_H_