MB-19837: Increase number of NONIO threads
[ep-engine.git] / src / executorpool.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <algorithm>
21 #include <queue>
22 #include <sstream>
23
24 #include "configuration.h"
25 #include "ep_engine.h"
26 #include "statwriter.h"
27 #include "taskqueue.h"
28 #include "executorpool.h"
29 #include "executorthread.h"
30
31 Mutex ExecutorPool::initGuard;
32 ExecutorPool *ExecutorPool::instance = NULL;
33
34 static const size_t EP_MIN_NUM_THREADS    = 10;
35 static const size_t EP_MIN_READER_THREADS = 4;
36 static const size_t EP_MIN_WRITER_THREADS = 4;
37 static const size_t EP_MIN_NONIO_THREADS = 2;
38
39
40 static const size_t EP_MAX_READER_THREADS = 12;
41 static const size_t EP_MAX_WRITER_THREADS = 8;
42 static const size_t EP_MAX_AUXIO_THREADS  = 8;
43 static const size_t EP_MAX_NONIO_THREADS  = 8;
44
45 size_t ExecutorPool::getNumCPU(void) {
46     size_t numCPU;
47 #ifdef WIN32
48     SYSTEM_INFO sysinfo;
49     GetSystemInfo(&sysinfo);
50     numCPU = (size_t)sysinfo.dwNumberOfProcessors;
51 #else
52     numCPU = (size_t)sysconf(_SC_NPROCESSORS_ONLN);
53 #endif
54
55     return (numCPU < 256) ? numCPU : 0;
56 }
57
58 size_t ExecutorPool::getNumNonIO(void) {
59     // 1. compute: 30% of total threads
60     size_t count = maxGlobalThreads * 0.3;
61
62     // 2. adjust computed value to be within range
63     count = std::min(EP_MAX_NONIO_THREADS,
64                      std::max(EP_MIN_NONIO_THREADS, count));
65
66     // 3. pick user's value if specified
67     if (maxWorkers[NONIO_TASK_IDX]) {
68         count = maxWorkers[NONIO_TASK_IDX];
69     }
70     return count;
71 }
72
73 size_t ExecutorPool::getNumAuxIO(void) {
74     // 1. compute: ceil of 10% of total threads
75     size_t count = maxGlobalThreads / 10;
76     if (!count || maxGlobalThreads % 10) {
77         count++;
78     }
79     // 2. adjust computed value to be within range
80     if (count > EP_MAX_AUXIO_THREADS) {
81         count = EP_MAX_AUXIO_THREADS;
82     }
83     // 3. Override with user's value if specified
84     if (maxWorkers[AUXIO_TASK_IDX]) {
85         count = maxWorkers[AUXIO_TASK_IDX];
86     }
87     return count;
88 }
89
90 size_t ExecutorPool::getNumWriters(void) {
91     size_t count = 0;
92     // 1. compute: floor of Half of what remains after nonIO, auxIO threads
93     if (maxGlobalThreads > (getNumAuxIO() + getNumNonIO())) {
94         count = maxGlobalThreads - getNumAuxIO() - getNumNonIO();
95         count = count >> 1;
96     }
97     // 2. adjust computed value to be within range
98     if (count > EP_MAX_WRITER_THREADS) {
99         count = EP_MAX_WRITER_THREADS;
100     } else if (count < EP_MIN_WRITER_THREADS) {
101         count = EP_MIN_WRITER_THREADS;
102     }
103     // 3. Override with user's value if specified
104     if (maxWorkers[WRITER_TASK_IDX]) {
105         count = maxWorkers[WRITER_TASK_IDX];
106     }
107     return count;
108 }
109
110 size_t ExecutorPool::getNumReaders(void) {
111     size_t count = 0;
112     // 1. compute: what remains after writers, nonIO & auxIO threads are taken
113     if (maxGlobalThreads >
114             (getNumWriters() + getNumAuxIO() + getNumNonIO())) {
115         count = maxGlobalThreads
116               - getNumWriters() - getNumAuxIO() - getNumNonIO();
117     }
118     // 2. adjust computed value to be within range
119     if (count > EP_MAX_READER_THREADS) {
120         count = EP_MAX_READER_THREADS;
121     } else if (count < EP_MIN_READER_THREADS) {
122         count = EP_MIN_READER_THREADS;
123     }
124     // 3. Override with user's value if specified
125     if (maxWorkers[READER_TASK_IDX]) {
126         count = maxWorkers[READER_TASK_IDX];
127     }
128     return count;
129 }
130
131 ExecutorPool *ExecutorPool::get(void) {
132     if (!instance) {
133         LockHolder lh(initGuard);
134         if (!instance) {
135             Configuration &config =
136                 ObjectRegistry::getCurrentEngine()->getConfiguration();
137             EventuallyPersistentEngine *epe =
138                                    ObjectRegistry::onSwitchThread(NULL, true);
139             instance = new ExecutorPool(config.getMaxThreads(),
140                     NUM_TASK_GROUPS, config.getMaxNumReaders(),
141                     config.getMaxNumWriters(), config.getMaxNumAuxio(),
142                     config.getMaxNumNonio());
143             ObjectRegistry::onSwitchThread(epe);
144         }
145     }
146     return instance;
147 }
148
149 void ExecutorPool::shutdown(void) {
150     if (instance) {
151         delete instance;
152         instance = NULL;
153     }
154 }
155
156 ExecutorPool::ExecutorPool(size_t maxThreads, size_t nTaskSets,
157                            size_t maxReaders, size_t maxWriters,
158                            size_t maxAuxIO,   size_t maxNonIO) :
159                   numTaskSets(nTaskSets), totReadyTasks(0),
160                   isHiPrioQset(false), isLowPrioQset(false), numBuckets(0),
161                   numSleepers(0) {
162     size_t numCPU = getNumCPU();
163     size_t numThreads = (size_t)((numCPU * 3)/4);
164     numThreads = (numThreads < EP_MIN_NUM_THREADS) ?
165                         EP_MIN_NUM_THREADS : numThreads;
166     maxGlobalThreads = maxThreads ? maxThreads : numThreads;
167     curWorkers  = new AtomicValue<uint16_t>[nTaskSets];
168     maxWorkers  = new AtomicValue<uint16_t>[nTaskSets];
169     numReadyTasks  = new AtomicValue<size_t>[nTaskSets];
170     for (size_t i = 0; i < nTaskSets; i++) {
171         curWorkers[i] = 0;
172         numReadyTasks[i] = 0;
173     }
174     maxWorkers[WRITER_TASK_IDX] = maxWriters;
175     maxWorkers[READER_TASK_IDX] = maxReaders;
176     maxWorkers[AUXIO_TASK_IDX]  = maxAuxIO;
177     maxWorkers[NONIO_TASK_IDX]  = maxNonIO;
178 }
179
180 ExecutorPool::~ExecutorPool(void) {
181     delete [] curWorkers;
182     delete[] maxWorkers;
183     delete[] numReadyTasks;
184     if (isHiPrioQset) {
185         for (size_t i = 0; i < numTaskSets; i++) {
186             delete hpTaskQ[i];
187         }
188     }
189     if (isLowPrioQset) {
190         for (size_t i = 0; i < numTaskSets; i++) {
191             delete lpTaskQ[i];
192         }
193     }
194 }
195
196 // To prevent starvation of low priority queues, we define their
197 // polling frequencies as follows ...
198 #define LOW_PRIORITY_FREQ 5 // 1 out of 5 times threads check low priority Q
199
200 TaskQueue *ExecutorPool::_nextTask(ExecutorThread &t, uint8_t tick) {
201     if (!tick) {
202         return NULL;
203     }
204
205     unsigned int myq = t.startIndex;
206     TaskQueue *checkQ; // which TaskQueue set should be polled first
207     TaskQueue *checkNextQ; // which set of TaskQueue should be polled next
208     TaskQueue *toggle = NULL;
209     if ( !(tick % LOW_PRIORITY_FREQ)) { // if only 1 Q set, both point to it
210         checkQ = isLowPrioQset ? lpTaskQ[myq] :
211                 (isHiPrioQset ? hpTaskQ[myq] : NULL);
212         checkNextQ = isHiPrioQset ? hpTaskQ[myq] : checkQ;
213     } else {
214         checkQ = isHiPrioQset ? hpTaskQ[myq] :
215                 (isLowPrioQset ? lpTaskQ[myq] : NULL);
216         checkNextQ = isLowPrioQset ? lpTaskQ[myq] : checkQ;
217     }
218     while (t.state == EXECUTOR_RUNNING) {
219         if (checkQ &&
220             checkQ->fetchNextTask(t, false)) {
221             return checkQ;
222         }
223         if (toggle || checkQ == checkNextQ) {
224             TaskQueue *sleepQ = getSleepQ(myq);
225             if (sleepQ->fetchNextTask(t, true)) {
226                 return sleepQ;
227             } else {
228                 return NULL;
229             }
230         }
231         toggle = checkQ;
232         checkQ = checkNextQ;
233         checkNextQ = toggle;
234     }
235     return NULL;
236 }
237
238 TaskQueue *ExecutorPool::nextTask(ExecutorThread &t, uint8_t tick) {
239     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
240     TaskQueue *tq = _nextTask(t, tick);
241     ObjectRegistry::onSwitchThread(epe);
242     return tq;
243 }
244
245 void ExecutorPool::addWork(size_t newWork, task_type_t qType) {
246     if (newWork) {
247         totReadyTasks.fetch_add(newWork);
248         numReadyTasks[qType].fetch_add(newWork);
249     }
250 }
251
252 void ExecutorPool::lessWork(task_type_t qType) {
253     cb_assert(numReadyTasks[qType].load());
254     numReadyTasks[qType]--;
255     totReadyTasks--;
256 }
257
258 void ExecutorPool::doneWork(task_type_t &curTaskType) {
259     if (curTaskType != NO_TASK_TYPE) {
260         // Record that a thread is done working on a particular queue type
261         LOG(EXTENSION_LOG_DEBUG, "Done with Task Type %d capacity = %d",
262                 curTaskType, curWorkers[curTaskType].load());
263         curWorkers[curTaskType]--;
264         curTaskType = NO_TASK_TYPE;
265     }
266 }
267
268 task_type_t ExecutorPool::tryNewWork(task_type_t newTaskType) {
269     task_type_t ret = newTaskType;
270     curWorkers[newTaskType]++; // atomic increment
271     // Test if a thread can take up task from the target Queue type
272     if (curWorkers[newTaskType] <= maxWorkers[newTaskType]) {
273         // Ok to proceed as limit not hit
274         LOG(EXTENSION_LOG_DEBUG,
275                 "Taking up work in task type %d capacity = %d, max=%d",
276                 newTaskType, curWorkers[newTaskType].load(),
277                 maxWorkers[newTaskType].load());
278     } else {
279         curWorkers[newTaskType]--; // do not exceed the limit at maxWorkers
280         LOG(EXTENSION_LOG_DEBUG, "Limiting from taking up work in task "
281                 "type %d capacity = %d, max = %d", newTaskType,
282                 curWorkers[newTaskType].load(),
283                 maxWorkers[newTaskType].load());
284         ret = NO_TASK_TYPE;
285     }
286
287     return ret;
288 }
289
290 bool ExecutorPool::_cancel(size_t taskId, bool eraseTask) {
291     LockHolder lh(tMutex);
292     std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
293     if (itr == taskLocator.end()) {
294         LOG(EXTENSION_LOG_DEBUG, "Task id %d not found");
295         return false;
296     }
297
298     ExTask task = itr->second.first;
299     LOG(EXTENSION_LOG_DEBUG, "Cancel task %s id %d on bucket %s %s",
300             task->getDescription().c_str(), task->getId(),
301             task->getEngine()->getName(), eraseTask ? "final erase" : "!");
302
303     task->cancel(); // must be idempotent, just set state to dead
304
305     if (eraseTask) { // only internal threads can erase tasks
306         cb_assert(task->isdead());
307         taskLocator.erase(itr);
308         tMutex.notify();
309     } else { // wake up the task from the TaskQ so a thread can safely erase it
310              // otherwise we may race with unregisterBucket where a unlocated
311              // task runs in spite of its bucket getting unregistered
312         itr->second.second->wake(task);
313     }
314     return true;
315 }
316
317 bool ExecutorPool::cancel(size_t taskId, bool eraseTask) {
318     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
319     bool rv = _cancel(taskId, eraseTask);
320     ObjectRegistry::onSwitchThread(epe);
321     return rv;
322 }
323
324 bool ExecutorPool::_wake(size_t taskId) {
325     LockHolder lh(tMutex);
326     std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
327     if (itr != taskLocator.end()) {
328         itr->second.second->wake(itr->second.first);
329         return true;
330     }
331     return false;
332 }
333
334 bool ExecutorPool::wake(size_t taskId) {
335     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
336     bool rv = _wake(taskId);
337     ObjectRegistry::onSwitchThread(epe);
338     return rv;
339 }
340
341 bool ExecutorPool::_snooze(size_t taskId, double tosleep) {
342     LockHolder lh(tMutex);
343     std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
344     if (itr != taskLocator.end()) {
345         itr->second.first->snooze(tosleep);
346         return true;
347     }
348     return false;
349 }
350
351 bool ExecutorPool::snooze(size_t taskId, double tosleep) {
352     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
353     bool rv = _snooze(taskId, tosleep);
354     ObjectRegistry::onSwitchThread(epe);
355     return rv;
356 }
357
358 TaskQueue* ExecutorPool::_getTaskQueue(EventuallyPersistentEngine *e,
359                                        task_type_t qidx) {
360     TaskQueue         *q             = NULL;
361     size_t            curNumThreads  = 0;
362     bucket_priority_t bucketPriority = e->getWorkloadPriority();
363
364     cb_assert(0 <= (int)qidx && (size_t)qidx < numTaskSets);
365
366     curNumThreads = threadQ.size();
367
368     if (!bucketPriority) {
369         LOG(EXTENSION_LOG_WARNING, "Trying to schedule task for unregistered "
370             "bucket %s", e->getName());
371         return q;
372     }
373
374     if (curNumThreads < maxGlobalThreads) {
375         if (isHiPrioQset) {
376             q = hpTaskQ[qidx];
377         } else if (isLowPrioQset) {
378             q = lpTaskQ[qidx];
379         }
380     } else { // Max capacity Mode scheduling ...
381         if (bucketPriority == LOW_BUCKET_PRIORITY) {
382             cb_assert(lpTaskQ.size() == numTaskSets);
383             q = lpTaskQ[qidx];
384         } else {
385             cb_assert(hpTaskQ.size() == numTaskSets);
386             q = hpTaskQ[qidx];
387         }
388     }
389     return q;
390 }
391
392 size_t ExecutorPool::_schedule(ExTask task, task_type_t qidx) {
393     LockHolder lh(tMutex);
394     TaskQueue *q = _getTaskQueue(task->getEngine(), qidx);
395     TaskQpair tqp(task, q);
396     taskLocator[task->getId()] = tqp;
397
398     q->schedule(task);
399
400     return task->getId();
401 }
402
403 size_t ExecutorPool::schedule(ExTask task, task_type_t qidx) {
404     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
405     size_t rv = _schedule(task, qidx);
406     ObjectRegistry::onSwitchThread(epe);
407     return rv;
408 }
409
410 void ExecutorPool::_registerBucket(EventuallyPersistentEngine *engine) {
411     TaskQ *taskQ;
412     bool *whichQset;
413     const char *queueName;
414     WorkLoadPolicy &workload = engine->getWorkLoadPolicy();
415     bucket_priority_t priority = workload.getBucketPriority();
416
417     if (priority < HIGH_BUCKET_PRIORITY) {
418         engine->setWorkloadPriority(LOW_BUCKET_PRIORITY);
419         taskQ = &lpTaskQ;
420         whichQset = &isLowPrioQset;
421         queueName = "LowPrioQ_";
422         LOG(EXTENSION_LOG_WARNING, "Bucket %s registered with low priority",
423             engine->getName());
424     } else {
425         engine->setWorkloadPriority(HIGH_BUCKET_PRIORITY);
426         taskQ = &hpTaskQ;
427         whichQset = &isHiPrioQset;
428         queueName = "HiPrioQ_";
429         LOG(EXTENSION_LOG_WARNING, "Bucket %s registered with high priority",
430             engine->getName());
431     }
432
433     LockHolder lh(tMutex);
434
435     if (!(*whichQset)) {
436         taskQ->reserve(numTaskSets);
437         for (size_t i = 0; i < numTaskSets; i++) {
438             taskQ->push_back(new TaskQueue(this, (task_type_t)i, queueName));
439         }
440         *whichQset = true;
441     }
442
443     buckets.insert(engine);
444     numBuckets++;
445
446     _startWorkers();
447 }
448
449 void ExecutorPool::registerBucket(EventuallyPersistentEngine *engine) {
450     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
451     _registerBucket(engine);
452     ObjectRegistry::onSwitchThread(epe);
453 }
454
455 bool ExecutorPool::_startWorkers(void) {
456     if (threadQ.size()) {
457         return false;
458     }
459
460     size_t numReaders = getNumReaders();
461     size_t numWriters = getNumWriters();
462     size_t numAuxIO   = getNumAuxIO();
463     size_t numNonIO   = getNumNonIO();
464
465     std::stringstream ss;
466     ss << "Spawning " << numReaders << " readers, " << numWriters <<
467     " writers, " << numAuxIO << " auxIO, " << numNonIO << " nonIO threads";
468     LOG(EXTENSION_LOG_WARNING, ss.str().c_str());
469
470     for (size_t tidx = 0; tidx < numReaders; ++tidx) {
471         std::stringstream ss;
472         ss << "reader_worker_" << tidx;
473
474         threadQ.push_back(new ExecutorThread(this, READER_TASK_IDX, ss.str()));
475         threadQ.back()->start();
476     }
477     for (size_t tidx = 0; tidx < numWriters; ++tidx) {
478         std::stringstream ss;
479         ss << "writer_worker_" << numReaders + tidx;
480
481         threadQ.push_back(new ExecutorThread(this, WRITER_TASK_IDX, ss.str()));
482         threadQ.back()->start();
483     }
484     for (size_t tidx = 0; tidx < numAuxIO; ++tidx) {
485         std::stringstream ss;
486         ss << "auxio_worker_" << numReaders + numWriters + tidx;
487
488         threadQ.push_back(new ExecutorThread(this, AUXIO_TASK_IDX, ss.str()));
489         threadQ.back()->start();
490     }
491     for (size_t tidx = 0; tidx < numNonIO; ++tidx) {
492         std::stringstream ss;
493         ss << "nonio_worker_" << numReaders + numWriters + numAuxIO + tidx;
494
495         threadQ.push_back(new ExecutorThread(this, NONIO_TASK_IDX, ss.str()));
496         threadQ.back()->start();
497     }
498
499     if (!maxWorkers[WRITER_TASK_IDX]) {
500         // MB-12279: Limit writers to 4 for faster bgfetches in DGM by default
501         numWriters = 4;
502     }
503     maxWorkers[WRITER_TASK_IDX] = numWriters;
504     maxWorkers[READER_TASK_IDX] = numReaders;
505     maxWorkers[AUXIO_TASK_IDX]  = numAuxIO;
506     maxWorkers[NONIO_TASK_IDX]  = numNonIO;
507
508     return true;
509 }
510
511 bool ExecutorPool::_stopTaskGroup(EventuallyPersistentEngine *e,
512                                   task_type_t taskType,
513                                   bool force) {
514     bool unfinishedTask;
515     bool retVal = false;
516     std::map<size_t, TaskQpair>::iterator itr;
517
518     LockHolder lh(tMutex);
519     LOG(EXTENSION_LOG_WARNING, "Stopping %d type tasks in bucket %s", taskType,
520             e->getName());
521     do {
522         ExTask task;
523         unfinishedTask = false;
524         for (itr = taskLocator.begin(); itr != taskLocator.end(); itr++) {
525             task = itr->second.first;
526             TaskQueue *q = itr->second.second;
527             if (task->getEngine() == e &&
528                 (taskType == NO_TASK_TYPE || q->queueType == taskType)) {
529                 LOG(EXTENSION_LOG_WARNING, "Stopping Task id %d %s ",
530                         task->getId(), task->getDescription().c_str());
531                 // If force flag is set during shutdown, cancel all tasks
532                 // without considering the blockShutdown status of the task.
533                 if (force || !task->blockShutdown) {
534                     task->cancel(); // Must be idempotent
535                 }
536                 q->wake(task);
537                 unfinishedTask = true;
538                 retVal = true;
539             }
540         }
541         if (unfinishedTask) {
542             tMutex.wait(MIN_SLEEP_TIME); // Wait till task gets cancelled
543         }
544     } while (unfinishedTask);
545
546     return retVal;
547 }
548
549 bool ExecutorPool::stopTaskGroup(EventuallyPersistentEngine *e,
550                                  task_type_t taskType,
551                                  bool force) {
552     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
553     bool rv = _stopTaskGroup(e, taskType, force);
554     ObjectRegistry::onSwitchThread(epe);
555     return rv;
556 }
557
558 void ExecutorPool::_unregisterBucket(EventuallyPersistentEngine *engine,
559                                      bool force) {
560
561     LOG(EXTENSION_LOG_WARNING, "Unregistering %s bucket %s",
562             (numBuckets == 1)? "last" : "", engine->getName());
563
564     _stopTaskGroup(engine, NO_TASK_TYPE, force);
565
566     LockHolder lh(tMutex);
567
568     buckets.erase(engine);
569     if (!(--numBuckets)) {
570         assert (!taskLocator.size());
571         for (unsigned int idx = 0; idx < numTaskSets; idx++) {
572             TaskQueue *sleepQ = getSleepQ(idx);
573             size_t wakeAll = threadQ.size();
574             numReadyTasks[idx]++; // this prevents woken workers from sleeping
575             totReadyTasks++;
576             sleepQ->doWake(wakeAll);
577         }
578         for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
579             threadQ[tidx]->stop(false); // only set state to DEAD
580         }
581         for (unsigned int idx = 0; idx < numTaskSets; idx++) {
582             numReadyTasks[idx]--; // once woken reset the ready tasks
583             totReadyTasks--;
584         }
585
586         for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
587             threadQ[tidx]->stop(/*wait for threads */);
588             delete threadQ[tidx];
589         }
590
591         for (size_t i = 0; i < numTaskSets; i++) {
592             curWorkers[i] = 0;
593         }
594
595         threadQ.clear();
596         if (isHiPrioQset) {
597             for (size_t i = 0; i < numTaskSets; i++) {
598                 delete hpTaskQ[i];
599             }
600             hpTaskQ.clear();
601             isHiPrioQset = false;
602         }
603         if (isLowPrioQset) {
604             for (size_t i = 0; i < numTaskSets; i++) {
605                 delete lpTaskQ[i];
606             }
607             lpTaskQ.clear();
608             isLowPrioQset = false;
609         }
610     }
611 }
612
613 void ExecutorPool::unregisterBucket(EventuallyPersistentEngine *engine,
614                                     bool force) {
615     // Note: unregistering a bucket is special - any memory allocations /
616     // deallocations made while unregistering *should* be accounted to the
617     // bucket in question - hence no `onSwitchThread(NULL)` call.
618     _unregisterBucket(engine, force);
619 }
620
621 void ExecutorPool::doTaskQStat(EventuallyPersistentEngine *engine,
622                                const void *cookie, ADD_STAT add_stat) {
623     if (engine->getEpStats().isShutdown) {
624         return;
625     }
626
627     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
628     char statname[80] = {0};
629     if (isHiPrioQset) {
630         for (size_t i = 0; i < numTaskSets; i++) {
631             snprintf(statname, sizeof(statname), "ep_workload:%s:InQsize",
632                      hpTaskQ[i]->getName().c_str());
633             add_casted_stat(statname, hpTaskQ[i]->getFutureQueueSize(), add_stat,
634                             cookie);
635             snprintf(statname, sizeof(statname), "ep_workload:%s:OutQsize",
636                      hpTaskQ[i]->getName().c_str());
637             add_casted_stat(statname, hpTaskQ[i]->getReadyQueueSize(), add_stat,
638                             cookie);
639             size_t pendingQsize = hpTaskQ[i]->getPendingQueueSize();
640             if (pendingQsize > 0) {
641                 snprintf(statname, sizeof(statname), "ep_workload:%s:PendingQ",
642                         hpTaskQ[i]->getName().c_str());
643                 add_casted_stat(statname, pendingQsize, add_stat, cookie);
644             }
645         }
646     }
647     if (isLowPrioQset) {
648         for (size_t i = 0; i < numTaskSets; i++) {
649             snprintf(statname, sizeof(statname), "ep_workload:%s:InQsize",
650                      lpTaskQ[i]->getName().c_str());
651             add_casted_stat(statname, lpTaskQ[i]->getFutureQueueSize(), add_stat,
652                             cookie);
653             snprintf(statname, sizeof(statname), "ep_workload:%s:OutQsize",
654                      lpTaskQ[i]->getName().c_str());
655             add_casted_stat(statname, lpTaskQ[i]->getReadyQueueSize(), add_stat,
656                             cookie);
657             size_t pendingQsize = lpTaskQ[i]->getPendingQueueSize();
658             if (pendingQsize > 0) {
659                 snprintf(statname, sizeof(statname), "ep_workload:%s:PendingQ",
660                         lpTaskQ[i]->getName().c_str());
661                 add_casted_stat(statname, pendingQsize, add_stat, cookie);
662             }
663         }
664     }
665     ObjectRegistry::onSwitchThread(epe);
666 }
667
668 static void showJobLog(const char *logname, const char *prefix,
669                        const std::vector<TaskLogEntry> &log,
670                        const void *cookie, ADD_STAT add_stat) {
671     char statname[80] = {0};
672     for (size_t i = 0;i < log.size(); ++i) {
673         snprintf(statname, sizeof(statname), "%s:%s:%d:task", prefix,
674                 logname, static_cast<int>(i));
675         add_casted_stat(statname, log[i].getName().c_str(), add_stat,
676                         cookie);
677         snprintf(statname, sizeof(statname), "%s:%s:%d:type", prefix,
678                 logname, static_cast<int>(i));
679         add_casted_stat(statname,
680                         TaskQueue::taskType2Str(log[i].getTaskType()).c_str(),
681                         add_stat, cookie);
682         snprintf(statname, sizeof(statname), "%s:%s:%d:starttime",
683                 prefix, logname, static_cast<int>(i));
684         add_casted_stat(statname, log[i].getTimestamp(), add_stat,
685                 cookie);
686         snprintf(statname, sizeof(statname), "%s:%s:%d:runtime",
687                 prefix, logname, static_cast<int>(i));
688         add_casted_stat(statname, log[i].getDuration(), add_stat,
689                 cookie);
690     }
691 }
692
693 static void addWorkerStats(const char *prefix, ExecutorThread *t,
694                            const void *cookie, ADD_STAT add_stat) {
695     char statname[80] = {0};
696     snprintf(statname, sizeof(statname), "%s:state", prefix);
697     add_casted_stat(statname, t->getStateName().c_str(), add_stat, cookie);
698     snprintf(statname, sizeof(statname), "%s:task", prefix);
699     add_casted_stat(statname, t->getTaskName().c_str(), add_stat, cookie);
700
701     if (strcmp(t->getStateName().c_str(), "running") == 0) {
702         snprintf(statname, sizeof(statname), "%s:runtime", prefix);
703         add_casted_stat(statname,
704                 (gethrtime() - t->getTaskStart()) / 1000, add_stat, cookie);
705     }
706     snprintf(statname, sizeof(statname), "%s:waketime", prefix);
707     add_casted_stat(statname, t->getWaketime(), add_stat, cookie);
708     snprintf(statname, sizeof(statname), "%s:cur_time", prefix);
709     add_casted_stat(statname, t->getCurTime(), add_stat, cookie);
710 }
711
712 void ExecutorPool::doWorkerStat(EventuallyPersistentEngine *engine,
713                                const void *cookie, ADD_STAT add_stat) {
714     if (engine->getEpStats().isShutdown) {
715         return;
716     }
717
718     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
719     //TODO: implement tracking per engine stats ..
720     for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
721         addWorkerStats(threadQ[tidx]->getName().c_str(), threadQ[tidx],
722                      cookie, add_stat);
723         showJobLog("log", threadQ[tidx]->getName().c_str(),
724                    threadQ[tidx]->getLog(), cookie, add_stat);
725         showJobLog("slow", threadQ[tidx]->getName().c_str(),
726                    threadQ[tidx]->getSlowLog(), cookie, add_stat);
727     }
728     ObjectRegistry::onSwitchThread(epe);
729 }