1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2013 Couchbase, Inc.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
23 #include "executorpool.h"
24 #include "executorthread.h"
25 #include "taskqueue.h"
26 #include "ep_engine.h"
28 AtomicValue<size_t> GlobalTask::task_id_counter(1);
31 static void launch_executor_thread(void *arg) {
32 ExecutorThread *executor = (ExecutorThread*) arg;
35 } catch (std::exception& e) {
36 LOG(EXTENSION_LOG_WARNING, "%s: Caught an exception: %s\n",
37 executor->getName().c_str(), e.what());
39 LOG(EXTENSION_LOG_WARNING, "%s: Caught a fatal exception\n",
40 executor->getName().c_str());
45 void ExecutorThread::start() {
46 cb_assert(state == EXECUTOR_CREATING);
47 if (cb_create_thread(&thread, launch_executor_thread, this, 0) != 0) {
49 ss << name.c_str() << ": Initialization error!!!";
50 throw std::runtime_error(ss.str().c_str());
54 void ExecutorThread::stop(bool wait) {
55 if (!wait && (state == EXECUTOR_SHUTDOWN || state == EXECUTOR_DEAD)) {
58 state = EXECUTOR_SHUTDOWN;
60 LOG(EXTENSION_LOG_INFO, "%s: Stopping", name.c_str());
63 cb_join_thread(thread);
64 LOG(EXTENSION_LOG_INFO, "%s: Stopped", name.c_str());
67 void ExecutorThread::run() {
68 state = EXECUTOR_RUNNING;
70 LOG(EXTENSION_LOG_DEBUG, "Thread %s running..", getName().c_str());
72 for (uint8_t tick = 1;; tick++) {
74 LockHolder lh(currentTaskMutex);
77 if (state != EXECUTOR_RUNNING) {
82 if (TaskQueue *q = manager->nextTask(*this, tick)) {
83 EventuallyPersistentEngine *engine = currentTask->getEngine();
84 ObjectRegistry::onSwitchThread(engine);
85 if (currentTask->isdead()) {
86 // release capacity back to TaskQueue
87 manager->doneWork(curTaskType);
88 manager->cancel(currentTask->taskId, true);
92 // Measure scheduling overhead as difference between the time
93 // that the task wanted to wake up and the current time
94 hrtime_t woketime = currentTask->getWaketime();
95 engine->getEpStore()->logQTime(currentTask->getTypeId(),
96 now > woketime ? now - woketime
100 rel_time_t startReltime = ep_current_time();
102 LOG(EXTENSION_LOG_DEBUG,
103 "%s: Run task \"%s\" id %d",
104 getName().c_str(), currentTask->getDescription().c_str(),
105 currentTask->getId());
107 // Now Run the Task ....
108 currentTask->setState(TASK_RUNNING, TASK_SNOOZED);
109 bool again = currentTask->run();
111 // Task done, log it ...
112 hrtime_t runtime((gethrtime() - taskStart) / 1000);
113 engine->getEpStore()->logRunTime(currentTask->getTypeId(),
115 ObjectRegistry::onSwitchThread(NULL);
116 addLogEntry(engine->getName() + currentTask->getDescription(),
117 q->getQueueType(), runtime, startReltime,
119 (hrtime_t)currentTask->maxExpectedDuration()));
120 ObjectRegistry::onSwitchThread(engine);
121 // Check if task is run once or needs to be rescheduled..
122 if (!again || currentTask->isdead()) {
123 // release capacity back to TaskQueue
124 manager->doneWork(curTaskType);
125 manager->cancel(currentTask->taskId, true);
127 hrtime_t new_waketime;
128 // if a task has not set snooze, update its waketime to now
129 // before rescheduling for more accurate timing histograms
130 currentTask->updateWaketimeIfLessThan(now);
132 // release capacity back to TaskQueue ..
133 manager->doneWork(curTaskType);
134 new_waketime = q->reschedule(currentTask, curTaskType);
135 // record min waketime ...
136 if (new_waketime < waketime) {
137 waketime = new_waketime;
139 LOG(EXTENSION_LOG_DEBUG, "%s: Reschedule a task"
140 " \"%s\" id %d[%llu %llu |%llu]",
142 currentTask->getDescription().c_str(),
143 currentTask->getId(), new_waketime,
144 currentTask->getWaketime(),
147 } catch (std::exception& e) {
148 LOG(EXTENSION_LOG_WARNING,
149 "%s: Exception caught in task \"%s\": %s", name.c_str(),
150 currentTask->getDescription().c_str(), e.what());
152 LOG(EXTENSION_LOG_WARNING,
153 "%s: Fatal exception caught in task \"%s\"\n",
154 name.c_str(), currentTask->getDescription().c_str());
158 state = EXECUTOR_DEAD;
161 void ExecutorThread::setCurrentTask(ExTask newTask) {
162 LockHolder lh(currentTaskMutex);
163 currentTask = newTask;
166 void ExecutorThread::addLogEntry(const std::string &desc,
167 const task_type_t taskType,
168 const hrtime_t runtime,
169 rel_time_t t, bool isSlowJob) {
170 LockHolder lh(logMutex);
171 TaskLogEntry tle(desc, taskType, runtime, t);
179 const std::string ExecutorThread::getStateName() {
180 switch (state.load()) {
181 case EXECUTOR_CREATING:
182 return std::string("creating");
183 case EXECUTOR_RUNNING:
184 return std::string("running");
185 case EXECUTOR_WAITING:
186 return std::string("waiting");
187 case EXECUTOR_SLEEPING:
188 return std::string("sleeping");
189 case EXECUTOR_SHUTDOWN:
190 return std::string("shutdown");
192 return std::string("dead");