1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2013 Couchbase, Inc.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
24 #include "configuration.h"
25 #include "ep_engine.h"
26 #include "statwriter.h"
27 #include "taskqueue.h"
28 #include "executorpool.h"
29 #include "executorthread.h"
31 Mutex ExecutorPool::initGuard;
32 ExecutorPool *ExecutorPool::instance = NULL;
34 static const size_t EP_MIN_NUM_THREADS = 10;
35 static const size_t EP_MIN_READER_THREADS = 4;
36 static const size_t EP_MIN_WRITER_THREADS = 4;
38 static const size_t EP_MAX_READER_THREADS = 12;
39 static const size_t EP_MAX_WRITER_THREADS = 8;
40 static const size_t EP_MAX_AUXIO_THREADS = 8;
41 static const size_t EP_MAX_NONIO_THREADS = 8;
43 size_t ExecutorPool::getNumCPU(void) {
47 GetSystemInfo(&sysinfo);
48 numCPU = (size_t)sysinfo.dwNumberOfProcessors;
50 numCPU = (size_t)sysconf(_SC_NPROCESSORS_ONLN);
53 return (numCPU < 256) ? numCPU : 0;
56 size_t ExecutorPool::getNumNonIO(void) {
57 // 1. compute: ceil of 10% of total threads
58 size_t count = maxGlobalThreads / 10;
59 if (!count || maxGlobalThreads % 10) {
62 // 2. adjust computed value to be within range
63 if (count > EP_MAX_NONIO_THREADS) {
64 count = EP_MAX_NONIO_THREADS;
66 // 3. pick user's value if specified
67 if (maxWorkers[NONIO_TASK_IDX]) {
68 count = maxWorkers[NONIO_TASK_IDX];
73 size_t ExecutorPool::getNumAuxIO(void) {
74 // 1. compute: ceil of 10% of total threads
75 size_t count = maxGlobalThreads / 10;
76 if (!count || maxGlobalThreads % 10) {
79 // 2. adjust computed value to be within range
80 if (count > EP_MAX_AUXIO_THREADS) {
81 count = EP_MAX_AUXIO_THREADS;
83 // 3. Override with user's value if specified
84 if (maxWorkers[AUXIO_TASK_IDX]) {
85 count = maxWorkers[AUXIO_TASK_IDX];
90 size_t ExecutorPool::getNumWriters(void) {
92 // 1. compute: floor of Half of what remains after nonIO, auxIO threads
93 if (maxGlobalThreads > (getNumAuxIO() + getNumNonIO())) {
94 count = maxGlobalThreads - getNumAuxIO() - getNumNonIO();
97 // 2. adjust computed value to be within range
98 if (count > EP_MAX_WRITER_THREADS) {
99 count = EP_MAX_WRITER_THREADS;
100 } else if (count < EP_MIN_WRITER_THREADS) {
101 count = EP_MIN_WRITER_THREADS;
103 // 3. Override with user's value if specified
104 if (maxWorkers[WRITER_TASK_IDX]) {
105 count = maxWorkers[WRITER_TASK_IDX];
110 size_t ExecutorPool::getNumReaders(void) {
112 // 1. compute: what remains after writers, nonIO & auxIO threads are taken
113 if (maxGlobalThreads >
114 (getNumWriters() + getNumAuxIO() + getNumNonIO())) {
115 count = maxGlobalThreads
116 - getNumWriters() - getNumAuxIO() - getNumNonIO();
118 // 2. adjust computed value to be within range
119 if (count > EP_MAX_READER_THREADS) {
120 count = EP_MAX_READER_THREADS;
121 } else if (count < EP_MIN_READER_THREADS) {
122 count = EP_MIN_READER_THREADS;
124 // 3. Override with user's value if specified
125 if (maxWorkers[READER_TASK_IDX]) {
126 count = maxWorkers[READER_TASK_IDX];
131 ExecutorPool *ExecutorPool::get(void) {
133 LockHolder lh(initGuard);
135 Configuration &config =
136 ObjectRegistry::getCurrentEngine()->getConfiguration();
137 EventuallyPersistentEngine *epe =
138 ObjectRegistry::onSwitchThread(NULL, true);
139 instance = new ExecutorPool(config.getMaxThreads(),
140 NUM_TASK_GROUPS, config.getMaxNumReaders(),
141 config.getMaxNumWriters(), config.getMaxNumAuxio(),
142 config.getMaxNumNonio());
143 ObjectRegistry::onSwitchThread(epe);
149 void ExecutorPool::shutdown(void) {
156 ExecutorPool::ExecutorPool(size_t maxThreads, size_t nTaskSets,
157 size_t maxReaders, size_t maxWriters,
158 size_t maxAuxIO, size_t maxNonIO) :
159 numTaskSets(nTaskSets), totReadyTasks(0),
160 isHiPrioQset(false), isLowPrioQset(false), numBuckets(0),
162 size_t numCPU = getNumCPU();
163 size_t numThreads = (size_t)((numCPU * 3)/4);
164 numThreads = (numThreads < EP_MIN_NUM_THREADS) ?
165 EP_MIN_NUM_THREADS : numThreads;
166 maxGlobalThreads = maxThreads ? maxThreads : numThreads;
167 curWorkers = new AtomicValue<uint16_t>[nTaskSets];
168 maxWorkers = new AtomicValue<uint16_t>[nTaskSets];
169 numReadyTasks = new AtomicValue<size_t>[nTaskSets];
170 for (size_t i = 0; i < nTaskSets; i++) {
172 numReadyTasks[i] = 0;
174 maxWorkers[WRITER_TASK_IDX] = maxWriters;
175 maxWorkers[READER_TASK_IDX] = maxReaders;
176 maxWorkers[AUXIO_TASK_IDX] = maxAuxIO;
177 maxWorkers[NONIO_TASK_IDX] = maxNonIO;
180 ExecutorPool::~ExecutorPool(void) {
181 delete [] curWorkers;
183 delete[] numReadyTasks;
185 for (size_t i = 0; i < numTaskSets; i++) {
190 for (size_t i = 0; i < numTaskSets; i++) {
196 // To prevent starvation of low priority queues, we define their
197 // polling frequencies as follows ...
198 #define LOW_PRIORITY_FREQ 5 // 1 out of 5 times threads check low priority Q
200 TaskQueue *ExecutorPool::_nextTask(ExecutorThread &t, uint8_t tick) {
205 unsigned int myq = t.startIndex;
206 TaskQueue *checkQ; // which TaskQueue set should be polled first
207 TaskQueue *checkNextQ; // which set of TaskQueue should be polled next
208 TaskQueue *toggle = NULL;
209 if ( !(tick % LOW_PRIORITY_FREQ)) { // if only 1 Q set, both point to it
210 checkQ = isLowPrioQset ? lpTaskQ[myq] :
211 (isHiPrioQset ? hpTaskQ[myq] : NULL);
212 checkNextQ = isHiPrioQset ? hpTaskQ[myq] : checkQ;
214 checkQ = isHiPrioQset ? hpTaskQ[myq] :
215 (isLowPrioQset ? lpTaskQ[myq] : NULL);
216 checkNextQ = isLowPrioQset ? lpTaskQ[myq] : checkQ;
218 while (t.state == EXECUTOR_RUNNING) {
220 checkQ->fetchNextTask(t, false)) {
223 if (toggle || checkQ == checkNextQ) {
224 TaskQueue *sleepQ = getSleepQ(myq);
225 if (sleepQ->fetchNextTask(t, true)) {
238 TaskQueue *ExecutorPool::nextTask(ExecutorThread &t, uint8_t tick) {
239 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
240 TaskQueue *tq = _nextTask(t, tick);
241 ObjectRegistry::onSwitchThread(epe);
245 void ExecutorPool::addWork(size_t newWork, task_type_t qType) {
247 totReadyTasks.fetch_add(newWork);
248 numReadyTasks[qType].fetch_add(newWork);
252 void ExecutorPool::lessWork(task_type_t qType) {
253 cb_assert(numReadyTasks[qType].load());
254 numReadyTasks[qType]--;
258 void ExecutorPool::doneWork(task_type_t &curTaskType) {
259 if (curTaskType != NO_TASK_TYPE) {
260 // Record that a thread is done working on a particular queue type
261 LOG(EXTENSION_LOG_DEBUG, "Done with Task Type %d capacity = %d",
262 curTaskType, curWorkers[curTaskType].load());
263 curWorkers[curTaskType]--;
264 curTaskType = NO_TASK_TYPE;
268 task_type_t ExecutorPool::tryNewWork(task_type_t newTaskType) {
269 task_type_t ret = newTaskType;
270 curWorkers[newTaskType]++; // atomic increment
271 // Test if a thread can take up task from the target Queue type
272 if (curWorkers[newTaskType] <= maxWorkers[newTaskType]) {
273 // Ok to proceed as limit not hit
274 LOG(EXTENSION_LOG_DEBUG,
275 "Taking up work in task type %d capacity = %d, max=%d",
276 newTaskType, curWorkers[newTaskType].load(),
277 maxWorkers[newTaskType].load());
279 curWorkers[newTaskType]--; // do not exceed the limit at maxWorkers
280 LOG(EXTENSION_LOG_DEBUG, "Limiting from taking up work in task "
281 "type %d capacity = %d, max = %d", newTaskType,
282 curWorkers[newTaskType].load(),
283 maxWorkers[newTaskType].load());
290 bool ExecutorPool::_cancel(size_t taskId, bool eraseTask) {
291 LockHolder lh(tMutex);
292 std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
293 if (itr == taskLocator.end()) {
294 LOG(EXTENSION_LOG_DEBUG, "Task id %d not found");
298 ExTask task = itr->second.first;
299 LOG(EXTENSION_LOG_DEBUG, "Cancel task %s id %d on bucket %s %s",
300 task->getDescription().c_str(), task->getId(),
301 task->getEngine()->getName(), eraseTask ? "final erase" : "!");
303 task->cancel(); // must be idempotent, just set state to dead
305 if (eraseTask) { // only internal threads can erase tasks
306 cb_assert(task->isdead());
307 taskLocator.erase(itr);
309 } else { // wake up the task from the TaskQ so a thread can safely erase it
310 // otherwise we may race with unregisterBucket where a unlocated
311 // task runs in spite of its bucket getting unregistered
312 itr->second.second->wake(task);
317 bool ExecutorPool::cancel(size_t taskId, bool eraseTask) {
318 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
319 bool rv = _cancel(taskId, eraseTask);
320 ObjectRegistry::onSwitchThread(epe);
324 bool ExecutorPool::_wake(size_t taskId) {
325 LockHolder lh(tMutex);
326 std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
327 if (itr != taskLocator.end()) {
328 itr->second.second->wake(itr->second.first);
334 bool ExecutorPool::wake(size_t taskId) {
335 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
336 bool rv = _wake(taskId);
337 ObjectRegistry::onSwitchThread(epe);
341 bool ExecutorPool::_snooze(size_t taskId, double tosleep) {
342 LockHolder lh(tMutex);
343 std::map<size_t, TaskQpair>::iterator itr = taskLocator.find(taskId);
344 if (itr != taskLocator.end()) {
345 itr->second.first->snooze(tosleep);
351 bool ExecutorPool::snooze(size_t taskId, double tosleep) {
352 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
353 bool rv = _snooze(taskId, tosleep);
354 ObjectRegistry::onSwitchThread(epe);
358 TaskQueue* ExecutorPool::_getTaskQueue(EventuallyPersistentEngine *e,
361 size_t curNumThreads = 0;
362 bucket_priority_t bucketPriority = e->getWorkloadPriority();
364 cb_assert(0 <= (int)qidx && (size_t)qidx < numTaskSets);
366 curNumThreads = threadQ.size();
368 if (!bucketPriority) {
369 LOG(EXTENSION_LOG_WARNING, "Trying to schedule task for unregistered "
370 "bucket %s", e->getName());
374 if (curNumThreads < maxGlobalThreads) {
377 } else if (isLowPrioQset) {
380 } else { // Max capacity Mode scheduling ...
381 if (bucketPriority == LOW_BUCKET_PRIORITY) {
382 cb_assert(lpTaskQ.size() == numTaskSets);
385 cb_assert(hpTaskQ.size() == numTaskSets);
392 size_t ExecutorPool::_schedule(ExTask task, task_type_t qidx) {
393 LockHolder lh(tMutex);
394 TaskQueue *q = _getTaskQueue(task->getEngine(), qidx);
395 TaskQpair tqp(task, q);
396 taskLocator[task->getId()] = tqp;
400 return task->getId();
403 size_t ExecutorPool::schedule(ExTask task, task_type_t qidx) {
404 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
405 size_t rv = _schedule(task, qidx);
406 ObjectRegistry::onSwitchThread(epe);
410 void ExecutorPool::_registerBucket(EventuallyPersistentEngine *engine) {
413 const char *queueName;
414 WorkLoadPolicy &workload = engine->getWorkLoadPolicy();
415 bucket_priority_t priority = workload.getBucketPriority();
417 if (priority < HIGH_BUCKET_PRIORITY) {
418 engine->setWorkloadPriority(LOW_BUCKET_PRIORITY);
420 whichQset = &isLowPrioQset;
421 queueName = "LowPrioQ_";
422 LOG(EXTENSION_LOG_WARNING, "Bucket %s registered with low priority",
425 engine->setWorkloadPriority(HIGH_BUCKET_PRIORITY);
427 whichQset = &isHiPrioQset;
428 queueName = "HiPrioQ_";
429 LOG(EXTENSION_LOG_WARNING, "Bucket %s registered with high priority",
433 LockHolder lh(tMutex);
436 taskQ->reserve(numTaskSets);
437 for (size_t i = 0; i < numTaskSets; i++) {
438 taskQ->push_back(new TaskQueue(this, (task_type_t)i, queueName));
443 buckets.insert(engine);
449 void ExecutorPool::registerBucket(EventuallyPersistentEngine *engine) {
450 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
451 _registerBucket(engine);
452 ObjectRegistry::onSwitchThread(epe);
455 bool ExecutorPool::_startWorkers(void) {
456 if (threadQ.size()) {
460 size_t numReaders = getNumReaders();
461 size_t numWriters = getNumWriters();
462 size_t numAuxIO = getNumAuxIO();
463 size_t numNonIO = getNumNonIO();
465 std::stringstream ss;
466 ss << "Spawning " << numReaders << " readers, " << numWriters <<
467 " writers, " << numAuxIO << " auxIO, " << numNonIO << " nonIO threads";
468 LOG(EXTENSION_LOG_WARNING, ss.str().c_str());
470 for (size_t tidx = 0; tidx < numReaders; ++tidx) {
471 std::stringstream ss;
472 ss << "reader_worker_" << tidx;
474 threadQ.push_back(new ExecutorThread(this, READER_TASK_IDX, ss.str()));
475 threadQ.back()->start();
477 for (size_t tidx = 0; tidx < numWriters; ++tidx) {
478 std::stringstream ss;
479 ss << "writer_worker_" << numReaders + tidx;
481 threadQ.push_back(new ExecutorThread(this, WRITER_TASK_IDX, ss.str()));
482 threadQ.back()->start();
484 for (size_t tidx = 0; tidx < numAuxIO; ++tidx) {
485 std::stringstream ss;
486 ss << "auxio_worker_" << numReaders + numWriters + tidx;
488 threadQ.push_back(new ExecutorThread(this, AUXIO_TASK_IDX, ss.str()));
489 threadQ.back()->start();
491 for (size_t tidx = 0; tidx < numNonIO; ++tidx) {
492 std::stringstream ss;
493 ss << "nonio_worker_" << numReaders + numWriters + numAuxIO + tidx;
495 threadQ.push_back(new ExecutorThread(this, NONIO_TASK_IDX, ss.str()));
496 threadQ.back()->start();
499 if (!maxWorkers[WRITER_TASK_IDX]) {
500 // MB-12279: Limit writers to 4 for faster bgfetches in DGM by default
503 maxWorkers[WRITER_TASK_IDX] = numWriters;
504 maxWorkers[READER_TASK_IDX] = numReaders;
505 maxWorkers[AUXIO_TASK_IDX] = numAuxIO;
506 maxWorkers[NONIO_TASK_IDX] = numNonIO;
511 bool ExecutorPool::_stopTaskGroup(EventuallyPersistentEngine *e,
512 task_type_t taskType) {
515 std::map<size_t, TaskQpair>::iterator itr;
517 LockHolder lh(tMutex);
518 LOG(EXTENSION_LOG_DEBUG, "Stopping %d type tasks in bucket %s", taskType,
522 unfinishedTask = false;
523 for (itr = taskLocator.begin(); itr != taskLocator.end(); itr++) {
524 task = itr->second.first;
525 TaskQueue *q = itr->second.second;
526 if (task->getEngine() == e &&
527 (taskType == NO_TASK_TYPE || q->queueType == taskType)) {
528 LOG(EXTENSION_LOG_DEBUG, "Stopping Task id %d %s ",
529 task->getId(), task->getDescription().c_str());
530 if (!task->blockShutdown) {
531 task->cancel(); // Must be idempotent
534 unfinishedTask = true;
538 if (unfinishedTask) {
539 tMutex.wait(MIN_SLEEP_TIME); // Wait till task gets cancelled
541 } while (unfinishedTask);
546 bool ExecutorPool::stopTaskGroup(EventuallyPersistentEngine *e,
547 task_type_t taskType) {
548 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
549 bool rv = _stopTaskGroup(e, taskType);
550 ObjectRegistry::onSwitchThread(epe);
554 void ExecutorPool::_unregisterBucket(EventuallyPersistentEngine *engine) {
556 LOG(EXTENSION_LOG_WARNING, "Unregistering %s bucket %s",
557 (numBuckets == 1)? "last" : "", engine->getName());
559 _stopTaskGroup(engine, NO_TASK_TYPE);
561 LockHolder lh(tMutex);
563 buckets.erase(engine);
564 if (!(--numBuckets)) {
565 assert (!taskLocator.size());
566 for (unsigned int idx = 0; idx < numTaskSets; idx++) {
567 TaskQueue *sleepQ = getSleepQ(idx);
568 size_t wakeAll = threadQ.size();
569 numReadyTasks[idx]++; // this prevents woken workers from sleeping
571 sleepQ->doWake(wakeAll);
573 for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
574 threadQ[tidx]->stop(false); // only set state to DEAD
576 for (unsigned int idx = 0; idx < numTaskSets; idx++) {
577 numReadyTasks[idx]--; // once woken reset the ready tasks
581 for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
582 threadQ[tidx]->stop(/*wait for threads */);
583 delete threadQ[tidx];
586 for (size_t i = 0; i < numTaskSets; i++) {
592 for (size_t i = 0; i < numTaskSets; i++) {
596 isHiPrioQset = false;
599 for (size_t i = 0; i < numTaskSets; i++) {
603 isLowPrioQset = false;
608 void ExecutorPool::unregisterBucket(EventuallyPersistentEngine *engine) {
609 // Note: unregistering a bucket is special - any memory allocations /
610 // deallocations made while unregistering *should* be accounted to the
611 // bucket in question - hence no `onSwitchThread(NULL)` call.
612 _unregisterBucket(engine);
615 void ExecutorPool::doTaskQStat(EventuallyPersistentEngine *engine,
616 const void *cookie, ADD_STAT add_stat) {
617 if (engine->getEpStats().isShutdown) {
621 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
622 char statname[80] = {0};
624 for (size_t i = 0; i < numTaskSets; i++) {
625 snprintf(statname, sizeof(statname), "ep_workload:%s:InQsize",
626 hpTaskQ[i]->getName().c_str());
627 add_casted_stat(statname, hpTaskQ[i]->getFutureQueueSize(), add_stat,
629 snprintf(statname, sizeof(statname), "ep_workload:%s:OutQsize",
630 hpTaskQ[i]->getName().c_str());
631 add_casted_stat(statname, hpTaskQ[i]->getReadyQueueSize(), add_stat,
633 size_t pendingQsize = hpTaskQ[i]->getPendingQueueSize();
634 if (pendingQsize > 0) {
635 snprintf(statname, sizeof(statname), "ep_workload:%s:PendingQ",
636 hpTaskQ[i]->getName().c_str());
637 add_casted_stat(statname, pendingQsize, add_stat, cookie);
642 for (size_t i = 0; i < numTaskSets; i++) {
643 snprintf(statname, sizeof(statname), "ep_workload:%s:InQsize",
644 lpTaskQ[i]->getName().c_str());
645 add_casted_stat(statname, lpTaskQ[i]->getFutureQueueSize(), add_stat,
647 snprintf(statname, sizeof(statname), "ep_workload:%s:OutQsize",
648 lpTaskQ[i]->getName().c_str());
649 add_casted_stat(statname, lpTaskQ[i]->getReadyQueueSize(), add_stat,
651 size_t pendingQsize = lpTaskQ[i]->getPendingQueueSize();
652 if (pendingQsize > 0) {
653 snprintf(statname, sizeof(statname), "ep_workload:%s:PendingQ",
654 lpTaskQ[i]->getName().c_str());
655 add_casted_stat(statname, pendingQsize, add_stat, cookie);
659 ObjectRegistry::onSwitchThread(epe);
662 static void showJobLog(const char *logname, const char *prefix,
663 const std::vector<TaskLogEntry> &log,
664 const void *cookie, ADD_STAT add_stat) {
665 char statname[80] = {0};
666 for (size_t i = 0;i < log.size(); ++i) {
667 snprintf(statname, sizeof(statname), "%s:%s:%d:task", prefix,
668 logname, static_cast<int>(i));
669 add_casted_stat(statname, log[i].getName().c_str(), add_stat,
671 snprintf(statname, sizeof(statname), "%s:%s:%d:type", prefix,
672 logname, static_cast<int>(i));
673 add_casted_stat(statname,
674 TaskQueue::taskType2Str(log[i].getTaskType()).c_str(),
676 snprintf(statname, sizeof(statname), "%s:%s:%d:starttime",
677 prefix, logname, static_cast<int>(i));
678 add_casted_stat(statname, log[i].getTimestamp(), add_stat,
680 snprintf(statname, sizeof(statname), "%s:%s:%d:runtime",
681 prefix, logname, static_cast<int>(i));
682 add_casted_stat(statname, log[i].getDuration(), add_stat,
687 static void addWorkerStats(const char *prefix, ExecutorThread *t,
688 const void *cookie, ADD_STAT add_stat) {
689 char statname[80] = {0};
690 snprintf(statname, sizeof(statname), "%s:state", prefix);
691 add_casted_stat(statname, t->getStateName().c_str(), add_stat, cookie);
692 snprintf(statname, sizeof(statname), "%s:task", prefix);
693 add_casted_stat(statname, t->getTaskName().c_str(), add_stat, cookie);
695 if (strcmp(t->getStateName().c_str(), "running") == 0) {
696 snprintf(statname, sizeof(statname), "%s:runtime", prefix);
697 add_casted_stat(statname,
698 (gethrtime() - t->getTaskStart()) / 1000, add_stat, cookie);
700 snprintf(statname, sizeof(statname), "%s:waketime", prefix);
701 add_casted_stat(statname, t->getWaketime(), add_stat, cookie);
702 snprintf(statname, sizeof(statname), "%s:cur_time", prefix);
703 add_casted_stat(statname, t->getCurTime(), add_stat, cookie);
706 void ExecutorPool::doWorkerStat(EventuallyPersistentEngine *engine,
707 const void *cookie, ADD_STAT add_stat) {
708 if (engine->getEpStats().isShutdown) {
712 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
713 //TODO: implement tracking per engine stats ..
714 for (size_t tidx = 0; tidx < threadQ.size(); ++tidx) {
715 addWorkerStats(threadQ[tidx]->getName().c_str(), threadQ[tidx],
717 showJobLog("log", threadQ[tidx]->getName().c_str(),
718 threadQ[tidx]->getLog(), cookie, add_stat);
719 showJobLog("slow", threadQ[tidx]->getName().c_str(),
720 threadQ[tidx]->getSlowLog(), cookie, add_stat);
722 ObjectRegistry::onSwitchThread(epe);