e698a8f652ecaac78949d0d68b334e8c6868d38f
[ep-engine.git] / src / taskqueue.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2014 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include "config.h"
18
19 #include "taskqueue.h"
20 #include "executorpool.h"
21 #include "executorthread.h"
22
23 TaskQueue::TaskQueue(ExecutorPool *m, task_type_t t, const char *nm) :
24     name(nm), queueType(t), manager(m), sleepers(0)
25 {
26     // EMPTY
27 }
28
29 TaskQueue::~TaskQueue() {
30     LOG(EXTENSION_LOG_INFO, "Task Queue killing %s", name.c_str());
31 }
32
33 const std::string TaskQueue::getName() const {
34     return (name+taskType2Str(queueType));
35 }
36
37 size_t TaskQueue::getReadyQueueSize() {
38     LockHolder lh(mutex);
39     return readyQueue.size();
40 }
41
42 size_t TaskQueue::getFutureQueueSize() {
43     LockHolder lh(mutex);
44     return futureQueue.size();
45 }
46
47 size_t TaskQueue::getPendingQueueSize() {
48     LockHolder lh(mutex);
49     return pendingQueue.size();
50 }
51
52 ExTask TaskQueue::_popReadyTask(void) {
53     ExTask t = readyQueue.top();
54     readyQueue.pop();
55     manager->lessWork(queueType);
56     return t;
57 }
58
59 void TaskQueue::doWake(size_t &numToWake) {
60     LockHolder lh(mutex);
61     _doWake_UNLOCKED(numToWake);
62 }
63
64 void TaskQueue::_doWake_UNLOCKED(size_t &numToWake) {
65     if (sleepers && numToWake)  {
66         if (numToWake < sleepers) {
67             for (; numToWake; --numToWake) {
68                 mutex.notifyOne(); // cond_signal 1
69             }
70         } else {
71             mutex.notify(); // cond_broadcast
72             numToWake -= sleepers;
73         }
74     }
75 }
76
77 bool TaskQueue::_doSleep(ExecutorThread &t) {
78     t.now = gethrtime();
79     if (t.now < t.waketime && manager->trySleep(queueType)) {
80         // Atomically switch from running to sleeping; iff we were previously
81         // running.
82         executor_state_t expected_state = EXECUTOR_RUNNING;
83         if (!t.state.compare_exchange_strong(expected_state,
84                                              EXECUTOR_SLEEPING)) {
85             return false;
86         }
87         sleepers++;
88         // zzz....
89         hrtime_t snooze_nsecs = t.waketime - t.now;
90
91         if (snooze_nsecs > MIN_SLEEP_TIME * 1000000000) {
92             mutex.wait(MIN_SLEEP_TIME);
93         } else {
94             mutex.wait(snooze_nsecs);
95         }
96         // ... woke!
97         sleepers--;
98         manager->woke();
99
100         // Finished our sleep, atomically switch back to running iff we were
101         // previously sleeping.
102         expected_state = EXECUTOR_SLEEPING;
103         if (!t.state.compare_exchange_strong(expected_state,
104                                              EXECUTOR_RUNNING)) {
105             return false;
106         }
107         t.now = gethrtime();
108     }
109     t.waketime = hrtime_t(-1);
110     return true;
111 }
112
113 bool TaskQueue::_fetchNextTask(ExecutorThread &t, bool toSleep) {
114     bool ret = false;
115     LockHolder lh(mutex);
116
117     if (toSleep && !_doSleep(t)) {
118         return ret; // shutting down
119     }
120
121     size_t numToWake = _moveReadyTasks(t.now);
122
123     if (!futureQueue.empty() && t.startIndex == queueType &&
124         futureQueue.top()->getWaketime() < t.waketime) {
125         t.waketime = futureQueue.top()->getWaketime(); // record earliest waketime
126     }
127
128     if (!readyQueue.empty() && readyQueue.top()->isdead()) {
129         t.setCurrentTask(_popReadyTask()); // clean out dead tasks first
130         ret = true;
131     } else if (!readyQueue.empty() || !pendingQueue.empty()) {
132         t.curTaskType = manager->tryNewWork(queueType);
133         if (t.curTaskType != NO_TASK_TYPE) {
134             // if this TaskQueue has obtained capacity for the thread, then we must
135             // consider any pending tasks too. To ensure prioritized run order,
136             // the function below will push any pending task back into
137             // the readyQueue (sorted by priority)
138             _checkPendingQueue();
139
140             ExTask tid = _popReadyTask(); // and pop out the top task
141             t.setCurrentTask(tid);
142             ret = true;
143         } else if (!readyQueue.empty()) { // We hit limit on max # workers
144             ExTask tid = _popReadyTask(); // that can work on current Q type!
145             pendingQueue.push_back(tid);
146             numToWake = numToWake ? numToWake - 1 : 0; // 1 fewer task ready
147         } else { // Let the task continue waiting in pendingQueue
148             cb_assert(!pendingQueue.empty());
149             numToWake = numToWake ? numToWake - 1 : 0; // 1 fewer task ready
150         }
151     }
152
153     _doWake_UNLOCKED(numToWake);
154     lh.unlock();
155
156     return ret;
157 }
158
159 bool TaskQueue::fetchNextTask(ExecutorThread &thread, bool toSleep) {
160     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
161     size_t rv = _fetchNextTask(thread, toSleep);
162     ObjectRegistry::onSwitchThread(epe);
163     return rv;
164 }
165
166 size_t TaskQueue::_moveReadyTasks(hrtime_t tv) {
167     if (!readyQueue.empty()) {
168         return 0;
169     }
170
171     size_t numReady = 0;
172     while (!futureQueue.empty()) {
173         ExTask tid = futureQueue.top();
174         if (tid->getWaketime() <= tv) {
175             futureQueue.pop();
176             readyQueue.push(tid);
177             numReady++;
178         } else {
179             break;
180         }
181     }
182
183     manager->addWork(numReady, queueType);
184
185     // Current thread will pop one task, so wake up one less thread
186     return numReady ? numReady - 1 : 0;
187 }
188
189 void TaskQueue::_checkPendingQueue(void) {
190     if (!pendingQueue.empty()) {
191         ExTask runnableTask = pendingQueue.front();
192         readyQueue.push(runnableTask);
193         manager->addWork(1, queueType);
194         pendingQueue.pop_front();
195     }
196 }
197
198 hrtime_t TaskQueue::_reschedule(ExTask &task, task_type_t &curTaskType) {
199     hrtime_t wakeTime;
200     manager->doneWork(curTaskType);
201
202     LockHolder lh(mutex);
203
204     futureQueue.push(task);
205     if (curTaskType == queueType) {
206         wakeTime = futureQueue.top()->getWaketime();
207     } else {
208         wakeTime = hrtime_t(-1);
209     }
210
211     return wakeTime;
212 }
213
214 hrtime_t TaskQueue::reschedule(ExTask &task, task_type_t &curTaskType) {
215     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
216     hrtime_t rv = _reschedule(task, curTaskType);
217     ObjectRegistry::onSwitchThread(epe);
218     return rv;
219 }
220
221 void TaskQueue::_schedule(ExTask &task) {
222     LockHolder lh(mutex);
223
224     futureQueue.push(task);
225
226     LOG(EXTENSION_LOG_DEBUG, "%s: Schedule a task \"%s\" id %d",
227             name.c_str(), task->getDescription().c_str(), task->getId());
228
229     size_t numToWake = 1;
230     TaskQueue *sleepQ = manager->getSleepQ(queueType);
231     _doWake_UNLOCKED(numToWake);
232     lh.unlock();
233     if (this != sleepQ) {
234         sleepQ->doWake(numToWake);
235     }
236 }
237
238 void TaskQueue::schedule(ExTask &task) {
239     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
240     _schedule(task);
241     ObjectRegistry::onSwitchThread(epe);
242 }
243
244 void TaskQueue::_wake(ExTask &task) {
245     size_t numReady = 0;
246     const hrtime_t now = gethrtime();
247
248     LockHolder lh(mutex);
249     LOG(EXTENSION_LOG_DEBUG, "%s: Wake a task \"%s\" id %d", name.c_str(),
250             task->getDescription().c_str(), task->getId());
251
252     // MB-9986: Re-sort futureQueue for now. TODO: avoid this O(N) overhead
253     std::queue<ExTask> notReady;
254     while (!futureQueue.empty()) {
255         ExTask tid = futureQueue.top();
256         notReady.push(tid);
257         futureQueue.pop();
258     }
259
260     // Wake thread-count-serialized tasks too
261     for (std::list<ExTask>::iterator it = pendingQueue.begin();
262          it != pendingQueue.end();) {
263         ExTask tid = *it;
264         if (tid->getId() == task->getId() || tid->isdead()) {
265             notReady.push(tid);
266             it = pendingQueue.erase(it);
267         } else {
268             it++;
269         }
270     }
271
272     // Note that this task that we are waking may nor may not be blocked in Q
273     task->updateWaketime(now);
274     task->setState(TASK_RUNNING, TASK_SNOOZED);
275
276     while (!notReady.empty()) {
277         ExTask tid = notReady.front();
278         if (tid->getWaketime() <= now || tid->isdead()) {
279             readyQueue.push(tid);
280             numReady++;
281         } else {
282             futureQueue.push(tid);
283         }
284         notReady.pop();
285     }
286
287     if (numReady) {
288         manager->addWork(numReady, queueType);
289         _doWake_UNLOCKED(numReady);
290         TaskQueue *sleepQ = manager->getSleepQ(queueType);
291         lh.unlock();
292         if (this != sleepQ) {
293             sleepQ->doWake(numReady);
294         }
295     }
296 }
297
298 void TaskQueue::wake(ExTask &task) {
299     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
300     _wake(task);
301     ObjectRegistry::onSwitchThread(epe);
302 }
303
304 const std::string TaskQueue::taskType2Str(task_type_t type) {
305     switch (type) {
306     case WRITER_TASK_IDX:
307         return std::string("Writer");
308     case READER_TASK_IDX:
309         return std::string("Reader");
310     case AUXIO_TASK_IDX:
311         return std::string("AuxIO");
312     case NONIO_TASK_IDX:
313         return std::string("NonIO");
314     default:
315         return std::string("None");
316     }
317 }