401c52e2656a443c965d1a391a50cd89da46b759
[ep-engine.git] / src / executorpool.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <algorithm>
21 #include <queue>
22 #include <sstream>
23
24 #include "configuration.h"
25 #include "ep_engine.h"
26 #include "statwriter.h"
27 #include "taskqueue.h"
28 #include "executorpool.h"
29 #include "executorthread.h"
30
31 Mutex ExecutorPool::initGuard;
32 ExecutorPool *ExecutorPool::instance = NULL;
33
34 static const size_t EP_MIN_NUM_THREADS    = 10;
35 static const size_t EP_MIN_READER_THREADS = 4;
36 static const size_t EP_MIN_WRITER_THREADS = 4;
37
38 static const size_t EP_MAX_READER_THREADS = 12;
39 static const size_t EP_MAX_WRITER_THREADS = 8;
40 static const size_t EP_MAX_AUXIO_THREADS  = 8;
41 static const size_t EP_MAX_NONIO_THREADS  = 8;
42
43 size_t ExecutorPool::getNumCPU(void) {
44     size_t numCPU;
45 #ifdef WIN32
46     SYSTEM_INFO sysinfo;
47     GetSystemInfo(&sysinfo);
48     numCPU = (size_t)sysinfo.dwNumberOfProcessors;
49 #else
50     numCPU = (size_t)sysconf(_SC_NPROCESSORS_ONLN);
51 #endif
52
53     return (numCPU < 256) ? numCPU : 0;
54 }
55
56 size_t ExecutorPool::getNumNonIO(void) {
57     // 1. compute: ceil of 10% of total threads
58     size_t count = maxGlobalThreads / 10;
59     if (!count || maxGlobalThreads % 10) {
60         count++;
61     }
62     // 2. adjust computed value to be within range
63     if (count > EP_MAX_NONIO_THREADS) {
64         count = EP_MAX_NONIO_THREADS;
65     }
66     // 3. pick user's value if specified
67     if (maxWorkers[NONIO_TASK_IDX]) {
68         count = maxWorkers[NONIO_TASK_IDX];
69     }
70     return count;
71 }
72
73 size_t ExecutorPool::getNumAuxIO(void) {
74     // 1. compute: ceil of 10% of total threads
75     size_t count = maxGlobalThreads / 10;
76     if (!count || maxGlobalThreads % 10) {
77         count++;
78     }
79     // 2. adjust computed value to be within range
80     if (count > EP_MAX_AUXIO_THREADS) {
81         count = EP_MAX_AUXIO_THREADS;
82     }
83     // 3. Override with user's value if specified
84     if (maxWorkers[AUXIO_TASK_IDX]) {
85         count = maxWorkers[AUXIO_TASK_IDX];
86     }
87     return count;
88 }
89
90 size_t ExecutorPool::getNumWriters(void) {
91     size_t count = 0;
92     // 1. compute: floor of Half of what remains after nonIO, auxIO threads
93     if (maxGlobalThreads > (getNumAuxIO() + getNumNonIO())) {
94         count = maxGlobalThreads - getNumAuxIO() - getNumNonIO();
95         count = count >> 1;
96     }
97     // 2. adjust computed value to be within range
98     if (count > EP_MAX_WRITER_THREADS) {
99         count = EP_MAX_WRITER_THREADS;
100     } else if (count < EP_MIN_WRITER_THREADS) {
101         count = EP_MIN_WRITER_THREADS;
102     }
103     // 3. Override with user's value if specified
104     if (maxWorkers[WRITER_TASK_IDX]) {
105         count = maxWorkers[WRITER_TASK_IDX];
106     }
107     return count;
108 }
109
110 size_t ExecutorPool::getNumReaders(void) {
111     size_t count = 0;
112     // 1. compute: what remains after writers, nonIO & auxIO threads are taken
113     if (maxGlobalThreads >
114             (getNumWriters() + getNumAuxIO() + getNumNonIO())) {
115         count = maxGlobalThreads
116               - getNumWriters() - getNumAuxIO() - getNumNonIO();
117     }
118     // 2. adjust computed value to be within range
119     if (count > EP_MAX_READER_THREADS) {
120         count = EP_MAX_READER_THREADS;
121     } else if (count < EP_MIN_READER_THREADS) {
122         count = EP_MIN_READER_THREADS;
123     }
124     // 3. Override with user's value if specified
125     if (maxWorkers[READER_TASK_IDX]) {
126         count = maxWorkers[READER_TASK_IDX];
127     }
128     return count;
129 }
130
131 ExecutorPool *ExecutorPool::get(void) {
132     if (!instance) {
133         LockHolder lh(initGuard);
134         if (!instance) {
135             Configuration &config =
136                 ObjectRegistry::getCurrentEngine()->getConfiguration();
137             EventuallyPersistentEngine *epe =
138                                    ObjectRegistry::onSwitchThread(NULL, true);
139             instance = new ExecutorPool(config.getMaxThreads(),
140                     NUM_TASK_GROUPS, config.getMaxNumReaders(),
141                     config.getMaxNumWriters(), config.getMaxNumAuxio(),
142                     config.getMaxNumNonio());
143             ObjectRegistry::onSwitchThread(epe);
144         }
145     }
146     return instance;
147 }
148
149 void ExecutorPool::shutdown(void) {
150     if (instance) {
151         delete instance;
152         instance = NULL;
153     }
154 }
155
156 ExecutorPool::ExecutorPool(size_t maxThreads, size_t nTaskSets,
157                            size_t maxReaders, size_t maxWriters,
158                            size_t maxAuxIO,   size_t maxNonIO) :
159                   numTaskSets(nTaskSets), totReadyTasks(0),
160                   isHiPrioQset(false), isLowPrioQset(false), numBuckets(0),
161                   numSleepers(0) {
162     size_t numCPU = getNumCPU();
163     size_t numThreads = (size_t)((numCPU * 3)/4);
164     numThreads = (numThreads < EP_MIN_NUM_THREADS) ?
165                         EP_MIN_NUM_THREADS : numThreads;
166     maxGlobalThreads = maxThreads ? maxThreads : numThreads;
167     curWorkers  = new AtomicValue<uint16_t>[nTaskSets];
168     maxWorkers  = new AtomicValue<uint16_t>[nTaskSets];
169     numReadyTasks  = new AtomicValue<size_t>[nTaskSets];
170     for (size_t i = 0; i < nTaskSets; i++) {
171         curWorkers[i] = 0;
172         numReadyTasks[i] = 0;
173     }
174     maxWorkers[WRITER_TASK_IDX] = maxWriters;
175     maxWorkers[READER_TASK_IDX] = maxReaders;
176     maxWorkers[AUXIO_TASK_IDX]  = maxAuxIO;
177     maxWorkers[NONIO_TASK_IDX]  = maxNonIO;
178 }
179
180 ExecutorPool::~ExecutorPool(void) {
181     delete [] curWorkers;
182     delete[] maxWorkers;
183     delete[] numReadyTasks;
184     if (isHiPrioQset) {
185         for (size_t i = 0; i < numTaskSets; i++) {
186             delete hpTaskQ[i];
187         }
188     }
189     if (isLowPrioQset) {
190         for (size_t i = 0; i < numTaskSets; i++) {
191             delete lpTaskQ[i];
192         }
193     }
194 }
195
196 // To prevent starvation of low priority queues, we define their
197 // polling frequencies as follows ...
198 #define LOW_PRIORITY_FREQ 5 // 1 out of 5 times threads check low priority Q
199
200 TaskQueue *ExecutorPool::_nextTask(ExecutorThread &t, uint8_t tick) {
201     if (!tick) {
202         return NULL;
203     }
204
205     unsigned int myq = t.startIndex;
206     TaskQueue *checkQ; // which TaskQueue set should be polled first
207     TaskQueue *checkNextQ; // which set of TaskQueue should be polled next
208     TaskQueue *toggle = NULL;
209     if ( !(tick % LOW_PRIORITY_FREQ)) { // if only 1 Q set, both point to it
210         checkQ = isLowPrioQset ? lpTaskQ[myq] :
211                 (isHiPrioQset ? hpTaskQ[myq] : NULL);
212         checkNextQ = isHiPrioQset ? hpTaskQ[myq] : checkQ;
213     } else {
214         checkQ = isHiPrioQset ? hpTaskQ[myq] :
215                 (isLowPrioQset ? lpTaskQ[myq] : NULL);
216         checkNextQ = isLowPrioQset ? lpTaskQ[myq] : checkQ;
217     }
218     while (t.state == EXECUTOR_RUNNING) {
219         if (checkQ &&
220             checkQ->fetchNextTask(t, false)) {
221             return checkQ;
222         }
223         if (toggle || checkQ == checkNextQ) {
224             TaskQueue *sleepQ = getSleepQ(myq);
225             if (sleepQ->fetchNextTask(t, true)) {
226                 return sleepQ;
227             } else {
228                 return NULL;
229             }
230         }
231         toggle = checkQ;
232         checkQ = checkNextQ;
233         checkNextQ = toggle;
234     }
235     return NULL;
236 }
237
238 TaskQueue *ExecutorPool::nextTask(ExecutorThread &t, uint8_t tick) {
239     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
240     TaskQueue *tq = _nextTask(t, tick);
241     ObjectRegistry::onSwitchThread(epe);
242     return tq;
243 }
244
245 void ExecutorPool::addWork(size_t newWork, task_type_t qType) {
246     if (newWork) {
247         totReadyTasks.fetch_add(newWork);
248         numReadyTasks[qType].fetch_add(newWork);
249     }
250 }
251
252 void ExecutorPool::lessWork(task_type_t qType) {
253     cb_assert(numReadyTasks[qType].load());
254     numReadyTasks[qType]--;
255     totReadyTasks--;
256 }
257
258 void ExecutorPool::doneWork(task_type_t &curTaskType) {
259     if (curTaskType != NO_TASK_TYPE) {
260         // Record that a thread is done working on a particular queue type
261         LOG(EXTENSION_LOG_DEBUG, "Done with Task Type %d capacity = %d",
262                 curTaskType, curWorkers[curTaskType].load());
263         curWorkers[curTaskType]--;
264         curTaskType = NO_TASK_TYPE;
265     }
266 }
267
268 task_type_t ExecutorPool::tryNewWork(task_type_t newTaskType) {
269     task_type_t ret = newTaskType;
270     curWorkers[newTaskType]++; // atomic increment
271     // Test if a thread can take up task from the target Queue type
272     if (curWorkers[newTaskType] <= maxWorkers[newTaskType]) {
273         // Ok to proceed as limit not hit
274         LOG(EXTENSION_LOG_DEBUG,
275                 "Taking up work in task type %d capacity = %d, max=%d",
276                 newTaskType, curWorkers[newTaskType].load(),
277                 maxWorkers[newTaskType].load());
278     } else {
279         curWorkers[newTaskType]--; // do not exceed the limit at maxWorkers
280         LOG(EXTENSION_LOG_DEBUG, "Limiting from taking up work in task "
281                 "type %d capacity = %d, max = %d", newTaskType,
282                 curWorkers[newTaskType].load(),
283                 maxWorkers[newTaskType].load());
284         ret = NO_TASK_TYPE;
285     }
286
287     return ret;
288 }
289
290 bool ExecutorPool::_cancel(size_t taskId, bool eraseTask) {
291     LockHolder lh(tMutex);
292     std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
293     if (itr == taskLocator.end()) {
294         LOG(EXTENSION_LOG_DEBUG, "Task id %d not found");
295         return false;
296     }
297
298     ExTask task = itr->second.first;
299     LOG(EXTENSION_LOG_DEBUG, "Cancel task %s id %d on bucket %s %s",
300             task->getDescription().c_str(), task->getId(),
301             task->getEngine()->getName(), eraseTask ? "final erase" : "!");
302
303     task->cancel(); // must be idempotent, just set state to dead
304
305     if (eraseTask) { // only internal threads can erase tasks
306         cb_assert(task->isdead());
307         taskLocator.erase(itr);
308         tMutex.notify();
309     } else { // wake up the task from the TaskQ so a thread can safely erase it
310              // otherwise we may race with unregisterBucket where a unlocated
311              // task runs in spite of its bucket getting unregistered
312         itr->second.second->wake(task);
313     }
314     return true;
315 }
316
317 bool ExecutorPool::cancel(size_t taskId, bool eraseTask) {
318     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
319     bool rv = _cancel(taskId, eraseTask);
320     ObjectRegistry::onSwitchThread(epe);
321     return rv;
322 }
323
324 bool ExecutorPool::_wake(size_t taskId) {
325     LockHolder lh(tMutex);
326     std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
327     if (itr != taskLocator.end()) {
328         itr->second.second->wake(itr->second.first);
329         return true;
330     }
331     return false;
332 }
333
334 bool ExecutorPool::wake(size_t taskId) {
335     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
336     bool rv = _wake(taskId);
337     ObjectRegistry::onSwitchThread(epe);
338     return rv;
339 }
340
341 bool ExecutorPool::_snooze(size_t taskId, double tosleep) {
342     LockHolder lh(tMutex);
343     std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
344     if (itr != taskLocator.end()) {
345         itr->second.first->snooze(tosleep);
346         return true;
347     }
348     return false;
349 }
350
351 bool ExecutorPool::snooze(size_t taskId, double tosleep) {
352     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
353     bool rv = _snooze(taskId, tosleep);
354     ObjectRegistry::onSwitchThread(epe);
355     return rv;
356 }
357
358 TaskQueue* ExecutorPool::_getTaskQueue(EventuallyPersistentEngine *e,
359                                        task_type_t qidx) {
360     TaskQueue         *q             = NULL;
361     size_t            curNumThreads  = 0;
362     bucket_priority_t bucketPriority = e->getWorkloadPriority();
363
364     cb_assert(0 <= (int)qidx && (size_t)qidx < numTaskSets);
365
366     curNumThreads = threadQ.size();
367
368     if (!bucketPriority) {
369         LOG(EXTENSION_LOG_WARNING, "Trying to schedule task for unregistered "
370             "bucket %s", e->getName());
371         return q;
372     }
373
374     if (curNumThreads < maxGlobalThreads) {
375         if (isHiPrioQset) {
376             q = hpTaskQ[qidx];
377         } else if (isLowPrioQset) {
378             q = lpTaskQ[qidx];
379         }
380     } else { // Max capacity Mode scheduling ...
381         if (bucketPriority == LOW_BUCKET_PRIORITY) {
382             cb_assert(lpTaskQ.size() == numTaskSets);
383             q = lpTaskQ[qidx];
384         } else {
385             cb_assert(hpTaskQ.size() == numTaskSets);
386             q = hpTaskQ[qidx];
387         }
388     }
389     return q;
390 }
391
392 size_t ExecutorPool::_schedule(ExTask task, task_type_t qidx) {
393     LockHolder lh(tMutex);
394     TaskQueue *q = _getTaskQueue(task->getEngine(), qidx);
395     TaskQpair tqp(task, q);
396     taskLocator[task->getId()] = tqp;
397
398     q->schedule(task);
399
400     return task->getId();
401 }
402
403 size_t ExecutorPool::schedule(ExTask task, task_type_t qidx) {
404     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
405     size_t rv = _schedule(task, qidx);
406     ObjectRegistry::onSwitchThread(epe);
407     return rv;
408 }
409
410 void ExecutorPool::_registerBucket(EventuallyPersistentEngine *engine) {
411     TaskQ *taskQ;
412     bool *whichQset;
413     const char *queueName;
414     WorkLoadPolicy &workload = engine->getWorkLoadPolicy();
415     bucket_priority_t priority = workload.getBucketPriority();
416
417     if (priority < HIGH_BUCKET_PRIORITY) {
418         engine->setWorkloadPriority(LOW_BUCKET_PRIORITY);
419         taskQ = &lpTaskQ;
420         whichQset = &isLowPrioQset;
421         queueName = "LowPrioQ_";
422         LOG(EXTENSION_LOG_WARNING, "Bucket %s registered with low priority",
423             engine->getName());
424     } else {
425         engine->setWorkloadPriority(HIGH_BUCKET_PRIORITY);
426         taskQ = &hpTaskQ;
427         whichQset = &isHiPrioQset;
428         queueName = "HiPrioQ_";
429         LOG(EXTENSION_LOG_WARNING, "Bucket %s registered with high priority",
430             engine->getName());
431     }
432
433     LockHolder lh(tMutex);
434
435     if (!(*whichQset)) {
436         taskQ->reserve(numTaskSets);
437         for (size_t i = 0; i < numTaskSets; i++) {
438             taskQ->push_back(new TaskQueue(this, (task_type_t)i, queueName));
439         }
440         *whichQset = true;
441     }
442
443     buckets.insert(engine);
444     numBuckets++;
445
446     _startWorkers();
447 }
448
449 void ExecutorPool::registerBucket(EventuallyPersistentEngine *engine) {
450     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
451     _registerBucket(engine);
452     ObjectRegistry::onSwitchThread(epe);
453 }
454
455 bool ExecutorPool::_startWorkers(void) {
456     if (threadQ.size()) {
457         return false;
458     }
459
460     size_t numReaders = getNumReaders();
461     size_t numWriters = getNumWriters();
462     size_t numAuxIO   = getNumAuxIO();
463     size_t numNonIO   = getNumNonIO();
464
465     std::stringstream ss;
466     ss << "Spawning " << numReaders << " readers, " << numWriters <<
467     " writers, " << numAuxIO << " auxIO, " << numNonIO << " nonIO threads";
468     LOG(EXTENSION_LOG_WARNING, ss.str().c_str());
469
470     for (size_t tidx = 0; tidx < numReaders; ++tidx) {
471         std::stringstream ss;
472         ss << "reader_worker_" << tidx;
473
474         threadQ.push_back(new ExecutorThread(this, READER_TASK_IDX, ss.str()));
475         threadQ.back()->start();
476     }
477     for (size_t tidx = 0; tidx < numWriters; ++tidx) {
478         std::stringstream ss;
479         ss << "writer_worker_" << numReaders + tidx;
480
481         threadQ.push_back(new ExecutorThread(this, WRITER_TASK_IDX, ss.str()));
482         threadQ.back()->start();
483     }
484     for (size_t tidx = 0; tidx < numAuxIO; ++tidx) {
485         std::stringstream ss;
486         ss << "auxio_worker_" << numReaders + numWriters + tidx;
487
488         threadQ.push_back(new ExecutorThread(this, AUXIO_TASK_IDX, ss.str()));
489         threadQ.back()->start();
490     }
491     for (size_t tidx = 0; tidx < numNonIO; ++tidx) {
492         std::stringstream ss;
493         ss << "nonio_worker_" << numReaders + numWriters + numAuxIO + tidx;
494
495         threadQ.push_back(new ExecutorThread(this, NONIO_TASK_IDX, ss.str()));
496         threadQ.back()->start();
497     }
498
499     if (!maxWorkers[WRITER_TASK_IDX]) {
500         // MB-12279: Limit writers to 4 for faster bgfetches in DGM by default
501         numWriters = 4;
502     }
503     maxWorkers[WRITER_TASK_IDX] = numWriters;
504     maxWorkers[READER_TASK_IDX] = numReaders;
505     maxWorkers[AUXIO_TASK_IDX]  = numAuxIO;
506     maxWorkers[NONIO_TASK_IDX]  = numNonIO;
507
508     return true;
509 }
510
511 bool ExecutorPool::_stopTaskGroup(EventuallyPersistentEngine *e,
512                                   task_type_t taskType) {
513     bool unfinishedTask;
514     bool retVal = false;
515     std::map<size_t, TaskQpair>::iterator itr;
516
517     LockHolder lh(tMutex);
518     LOG(EXTENSION_LOG_DEBUG, "Stopping %d type tasks in bucket %s", taskType,
519             e->getName());
520     do {
521         ExTask task;
522         unfinishedTask = false;
523         for (itr = taskLocator.begin(); itr != taskLocator.end(); itr++) {
524             task = itr->second.first;
525             TaskQueue *q = itr->second.second;
526             if (task->getEngine() == e &&
527                 (taskType == NO_TASK_TYPE || q->queueType == taskType)) {
528                 LOG(EXTENSION_LOG_DEBUG, "Stopping Task id %d %s ",
529                         task->getId(), task->getDescription().c_str());
530                 if (!task->blockShutdown) {
531                     task->cancel(); // Must be idempotent
532                 }
533                 q->wake(task);
534                 unfinishedTask = true;
535                 retVal = true;
536             }
537         }
538         if (unfinishedTask) {
539             tMutex.wait(MIN_SLEEP_TIME); // Wait till task gets cancelled
540         }
541     } while (unfinishedTask);
542
543     return retVal;
544 }
545
546 bool ExecutorPool::stopTaskGroup(EventuallyPersistentEngine *e,
547                                  task_type_t taskType) {
548     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
549     bool rv = _stopTaskGroup(e, taskType);
550     ObjectRegistry::onSwitchThread(epe);
551     return rv;
552 }
553
554 void ExecutorPool::_unregisterBucket(EventuallyPersistentEngine *engine) {
555
556     LOG(EXTENSION_LOG_WARNING, "Unregistering %s bucket %s",
557             (numBuckets == 1)? "last" : "", engine->getName());
558
559     _stopTaskGroup(engine, NO_TASK_TYPE);
560
561     LockHolder lh(tMutex);
562
563     buckets.erase(engine);
564     if (!(--numBuckets)) {
565         assert (!taskLocator.size());
566         for (unsigned int idx = 0; idx < numTaskSets; idx++) {
567             TaskQueue *sleepQ = getSleepQ(idx);
568             size_t wakeAll = threadQ.size();
569             numReadyTasks[idx]++; // this prevents woken workers from sleeping
570             totReadyTasks++;
571             sleepQ->doWake(wakeAll);
572         }
573         for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
574             threadQ[tidx]->stop(false); // only set state to DEAD
575         }
576         for (unsigned int idx = 0; idx < numTaskSets; idx++) {
577             numReadyTasks[idx]--; // once woken reset the ready tasks
578             totReadyTasks--;
579         }
580
581         for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
582             threadQ[tidx]->stop(/*wait for threads */);
583             delete threadQ[tidx];
584         }
585
586         for (size_t i = 0; i < numTaskSets; i++) {
587             curWorkers[i] = 0;
588         }
589
590         threadQ.clear();
591         if (isHiPrioQset) {
592             for (size_t i = 0; i < numTaskSets; i++) {
593                 delete hpTaskQ[i];
594             }
595             hpTaskQ.clear();
596             isHiPrioQset = false;
597         }
598         if (isLowPrioQset) {
599             for (size_t i = 0; i < numTaskSets; i++) {
600                 delete lpTaskQ[i];
601             }
602             lpTaskQ.clear();
603             isLowPrioQset = false;
604         }
605     }
606 }
607
608 void ExecutorPool::unregisterBucket(EventuallyPersistentEngine *engine) {
609     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
610     _unregisterBucket(engine);
611     ObjectRegistry::onSwitchThread(epe);
612 }
613
614 void ExecutorPool::doTaskQStat(EventuallyPersistentEngine *engine,
615                                const void *cookie, ADD_STAT add_stat) {
616     if (engine->getEpStats().isShutdown) {
617         return;
618     }
619
620     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
621     char statname[80] = {0};
622     if (isHiPrioQset) {
623         for (size_t i = 0; i < numTaskSets; i++) {
624             snprintf(statname, sizeof(statname), "ep_workload:%s:InQsize",
625                      hpTaskQ[i]->getName().c_str());
626             add_casted_stat(statname, hpTaskQ[i]->getFutureQueueSize(), add_stat,
627                             cookie);
628             snprintf(statname, sizeof(statname), "ep_workload:%s:OutQsize",
629                      hpTaskQ[i]->getName().c_str());
630             add_casted_stat(statname, hpTaskQ[i]->getReadyQueueSize(), add_stat,
631                             cookie);
632             size_t pendingQsize = hpTaskQ[i]->getPendingQueueSize();
633             if (pendingQsize > 0) {
634                 snprintf(statname, sizeof(statname), "ep_workload:%s:PendingQ",
635                         hpTaskQ[i]->getName().c_str());
636                 add_casted_stat(statname, pendingQsize, add_stat, cookie);
637             }
638         }
639     }
640     if (isLowPrioQset) {
641         for (size_t i = 0; i < numTaskSets; i++) {
642             snprintf(statname, sizeof(statname), "ep_workload:%s:InQsize",
643                      lpTaskQ[i]->getName().c_str());
644             add_casted_stat(statname, lpTaskQ[i]->getFutureQueueSize(), add_stat,
645                             cookie);
646             snprintf(statname, sizeof(statname), "ep_workload:%s:OutQsize",
647                      lpTaskQ[i]->getName().c_str());
648             add_casted_stat(statname, lpTaskQ[i]->getReadyQueueSize(), add_stat,
649                             cookie);
650             size_t pendingQsize = lpTaskQ[i]->getPendingQueueSize();
651             if (pendingQsize > 0) {
652                 snprintf(statname, sizeof(statname), "ep_workload:%s:PendingQ",
653                         lpTaskQ[i]->getName().c_str());
654                 add_casted_stat(statname, pendingQsize, add_stat, cookie);
655             }
656         }
657     }
658     ObjectRegistry::onSwitchThread(epe);
659 }
660
661 static void showJobLog(const char *logname, const char *prefix,
662                        const std::vector<TaskLogEntry> &log,
663                        const void *cookie, ADD_STAT add_stat) {
664     char statname[80] = {0};
665     for (size_t i = 0;i < log.size(); ++i) {
666         snprintf(statname, sizeof(statname), "%s:%s:%d:task", prefix,
667                 logname, static_cast<int>(i));
668         add_casted_stat(statname, log[i].getName().c_str(), add_stat,
669                         cookie);
670         snprintf(statname, sizeof(statname), "%s:%s:%d:type", prefix,
671                 logname, static_cast<int>(i));
672         add_casted_stat(statname,
673                         TaskQueue::taskType2Str(log[i].getTaskType()).c_str(),
674                         add_stat, cookie);
675         snprintf(statname, sizeof(statname), "%s:%s:%d:starttime",
676                 prefix, logname, static_cast<int>(i));
677         add_casted_stat(statname, log[i].getTimestamp(), add_stat,
678                 cookie);
679         snprintf(statname, sizeof(statname), "%s:%s:%d:runtime",
680                 prefix, logname, static_cast<int>(i));
681         add_casted_stat(statname, log[i].getDuration(), add_stat,
682                 cookie);
683     }
684 }
685
686 static void addWorkerStats(const char *prefix, ExecutorThread *t,
687                            const void *cookie, ADD_STAT add_stat) {
688     char statname[80] = {0};
689     snprintf(statname, sizeof(statname), "%s:state", prefix);
690     add_casted_stat(statname, t->getStateName().c_str(), add_stat, cookie);
691     snprintf(statname, sizeof(statname), "%s:task", prefix);
692     add_casted_stat(statname, t->getTaskName().c_str(), add_stat, cookie);
693
694     if (strcmp(t->getStateName().c_str(), "running") == 0) {
695         snprintf(statname, sizeof(statname), "%s:runtime", prefix);
696         add_casted_stat(statname,
697                 (gethrtime() - t->getTaskStart()) / 1000, add_stat, cookie);
698     }
699     snprintf(statname, sizeof(statname), "%s:waketime", prefix);
700     add_casted_stat(statname, t->getWaketime(), add_stat, cookie);
701     snprintf(statname, sizeof(statname), "%s:cur_time", prefix);
702     add_casted_stat(statname, t->getCurTime(), add_stat, cookie);
703 }
704
705 void ExecutorPool::doWorkerStat(EventuallyPersistentEngine *engine,
706                                const void *cookie, ADD_STAT add_stat) {
707     if (engine->getEpStats().isShutdown) {
708         return;
709     }
710
711     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
712     //TODO: implement tracking per engine stats ..
713     for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
714         addWorkerStats(threadQ[tidx]->getName().c_str(), threadQ[tidx],
715                      cookie, add_stat);
716         showJobLog("log", threadQ[tidx]->getName().c_str(),
717                    threadQ[tidx]->getLog(), cookie, add_stat);
718         showJobLog("slow", threadQ[tidx]->getName().c_str(),
719                    threadQ[tidx]->getSlowLog(), cookie, add_stat);
720     }
721     ObjectRegistry::onSwitchThread(epe);
722 }