b16062c4a3f1d87859319ce7a33b38aa7a6ceb92
[ep-engine.git] / src / executorpool.h
1 /*
2  *     Copyright 2014 Couchbase, Inc.
3  *
4  *   Licensed under the Apache License, Version 2.0 (the "License");
5  *   you may not use this file except in compliance with the License.
6  *   You may obtain a copy of the License at
7  *
8  *       http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *   Unless required by applicable law or agreed to in writing, software
11  *   distributed under the License is distributed on an "AS IS" BASIS,
12  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *   See the License for the specific language governing permissions and
14  *   limitations under the License.
15  */
16
17 #ifndef SRC_EXECUTORPOOL_H_
18 #define SRC_EXECUTORPOOL_H_ 1
19
20 #include "config.h"
21
22 #include <map>
23 #include <set>
24 #include <queue>
25
26 #include "tasks.h"
27 #include "ringbuffer.h"
28 #include "task_type.h"
29
30 // Forward decl
31 class TaskQueue;
32 class ExecutorThread;
33 class TaskLogEntry;
34
35 typedef std::vector<ExecutorThread *> ThreadQ;
36 typedef std::pair<ExTask, TaskQueue *> TaskQpair;
37 typedef std::pair<RingBuffer<TaskLogEntry>*, RingBuffer<TaskLogEntry> *>
38                                                                 TaskLog;
39 typedef std::vector<TaskQueue *> TaskQ;
40
41 class ExecutorPool {
42 public:
43
44     void addWork(size_t newWork, task_type_t qType);
45
46     void lessWork(task_type_t qType);
47
48     void doneWork(task_type_t &doneTaskType);
49
50     task_type_t tryNewWork(task_type_t newTaskType);
51
52     bool trySleep(task_type_t task_type) {
53         if (!numReadyTasks[task_type]) {
54             numSleepers++;
55             return true;
56         }
57         return false;
58     }
59
60     void woke(void) {
61         numSleepers--;
62     }
63
64     TaskQueue *nextTask(ExecutorThread &t, uint8_t tick);
65
66     TaskQueue *getSleepQ(unsigned int curTaskType) {
67         return isHiPrioQset ? hpTaskQ[curTaskType] : lpTaskQ[curTaskType];
68     }
69
70     bool cancel(size_t taskId, bool eraseTask=false);
71
72     bool stopTaskGroup(EventuallyPersistentEngine *e, task_type_t qidx,
73                        bool force);
74
75     bool wake(size_t taskId);
76
77     bool snooze(size_t taskId, double tosleep);
78
79     void registerBucket(EventuallyPersistentEngine *engine);
80
81     void unregisterBucket(EventuallyPersistentEngine *engine, bool force);
82
83     void doWorkerStat(EventuallyPersistentEngine *engine, const void *cookie,
84                       ADD_STAT add_stat);
85
86     void doTaskQStat(EventuallyPersistentEngine *engine, const void *cookie,
87                      ADD_STAT add_stat);
88
89     size_t getNumWorkersStat(void) { return threadQ.size(); }
90
91     size_t getNumCPU(void);
92
93     size_t getNumWorkers(void);
94
95     size_t getNumReaders(void);
96
97     size_t getNumWriters(void);
98
99     size_t getNumAuxIO(void);
100
101     size_t getNumNonIO(void);
102
103     size_t getMaxReaders(void) { return maxWorkers[READER_TASK_IDX]; }
104
105     size_t getMaxWriters(void) { return maxWorkers[WRITER_TASK_IDX]; }
106
107     size_t getMaxAuxIO(void) { return maxWorkers[AUXIO_TASK_IDX]; }
108
109     size_t getMaxNonIO(void) { return maxWorkers[NONIO_TASK_IDX]; }
110
111     void setMaxReaders(uint16_t v) { maxWorkers[READER_TASK_IDX] = v; }
112
113     void setMaxWriters(uint16_t v) { maxWorkers[WRITER_TASK_IDX] = v; }
114
115     void setMaxAuxIO(uint16_t v) { maxWorkers[AUXIO_TASK_IDX] = v; }
116
117     void setMaxNonIO(uint16_t v) { maxWorkers[NONIO_TASK_IDX] = v; }
118
119     size_t getNumReadyTasks(void) { return totReadyTasks; }
120
121     size_t getNumSleepers(void) { return numSleepers; }
122
123     size_t schedule(ExTask task, task_type_t qidx);
124
125     static ExecutorPool *get(void);
126
127     static void shutdown(void);
128
129 protected:
130
131     ExecutorPool(size_t t, size_t nTaskSets, size_t r, size_t w, size_t a,
132                  size_t n);
133     virtual ~ExecutorPool(void);
134
135     TaskQueue* _nextTask(ExecutorThread &t, uint8_t tick);
136     bool _cancel(size_t taskId, bool eraseTask=false);
137     bool _wake(size_t taskId);
138     virtual bool _startWorkers(void);
139     bool _snooze(size_t taskId, double tosleep);
140     size_t _schedule(ExTask task, task_type_t qidx);
141     void _registerBucket(EventuallyPersistentEngine *engine);
142     void _unregisterBucket(EventuallyPersistentEngine *engine, bool force);
143     bool _stopTaskGroup(EventuallyPersistentEngine *e, task_type_t qidx, bool force);
144     TaskQueue* _getTaskQueue(EventuallyPersistentEngine *e, task_type_t qidx);
145
146     size_t numTaskSets; // safe to read lock-less not altered after creation
147     size_t maxGlobalThreads;
148
149     AtomicValue<size_t> totReadyTasks;
150     SyncObject mutex; // Thread management condition var + mutex
151
152     //! A mapping of task ids to Task, TaskQ in the thread pool
153     std::map<size_t, TaskQpair> taskLocator;
154
155     //A list of threads
156     ThreadQ threadQ;
157
158     // Global cross bucket priority queues where tasks get scheduled into ...
159     TaskQ hpTaskQ; // a vector array of numTaskSets elements for high priority
160     bool isHiPrioQset;
161
162     TaskQ lpTaskQ; // a vector array of numTaskSets elements for low priority
163     bool isLowPrioQset;
164
165     size_t numBuckets;
166
167     SyncObject tMutex; // to serialize taskLocator, threadQ, numBuckets access
168
169     AtomicValue<uint16_t> numSleepers; // total number of sleeping threads
170     AtomicValue<uint16_t> *curWorkers; // track # of active workers per TaskSet
171     AtomicValue<uint16_t> *maxWorkers; // and limit it to the value set here
172     AtomicValue<size_t> *numReadyTasks; // number of ready tasks per task set
173
174     // Set of all known buckets
175     std::set<void *> buckets;
176
177     // Singleton creation
178     static Mutex initGuard;
179     static ExecutorPool *instance;
180 };
181 #endif  // SRC_EXECUTORPOOL_H_