a7f62dda40828e42de86f14e6e0445d4e484bace
[ep-engine.git] / src / executorthread.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <queue>
21
22 #include "common.h"
23 #include "executorpool.h"
24 #include "executorthread.h"
25 #include "taskqueue.h"
26 #include "ep_engine.h"
27
28 AtomicValue<size_t> GlobalTask::task_id_counter(1);
29
30 extern "C" {
31     static void launch_executor_thread(void *arg) {
32         ExecutorThread *executor = (ExecutorThread*) arg;
33         try {
34             executor->run();
35         } catch (std::exception& e) {
36             LOG(EXTENSION_LOG_WARNING, "%s: Caught an exception: %s\n",
37                 executor->getName().c_str(), e.what());
38         } catch(...) {
39             LOG(EXTENSION_LOG_WARNING, "%s: Caught a fatal exception\n",
40                 executor->getName().c_str());
41         }
42     }
43 }
44
45 void ExecutorThread::start() {
46     cb_assert(state == EXECUTOR_CREATING);
47     if (cb_create_thread(&thread, launch_executor_thread, this, 0) != 0) {
48         std::stringstream ss;
49         ss << name.c_str() << ": Initialization error!!!";
50         throw std::runtime_error(ss.str().c_str());
51     }
52 }
53
54 void ExecutorThread::stop(bool wait) {
55     if (!wait && (state == EXECUTOR_SHUTDOWN || state == EXECUTOR_DEAD)) {
56         return;
57     }
58     state = EXECUTOR_SHUTDOWN;
59     if (!wait) {
60         LOG(EXTENSION_LOG_INFO, "%s: Stopping", name.c_str());
61         return;
62     }
63     cb_join_thread(thread);
64     LOG(EXTENSION_LOG_INFO, "%s: Stopped", name.c_str());
65 }
66
67 void ExecutorThread::run() {
68     state = EXECUTOR_RUNNING;
69
70     LOG(EXTENSION_LOG_DEBUG, "Thread %s running..", getName().c_str());
71
72     for (uint8_t tick = 1;; tick++) {
73         {
74             LockHolder lh(currentTaskMutex);
75             currentTask.reset();
76         }
77         if (state != EXECUTOR_RUNNING) {
78             break;
79         }
80
81         if (TaskQueue *q = manager->nextTask(*this, tick)) {
82             EventuallyPersistentEngine *engine = currentTask->getEngine();
83             ObjectRegistry::onSwitchThread(engine);
84             if (currentTask->isdead()) {
85                 // release capacity back to TaskQueue
86                 manager->doneWork(curTaskType);
87                 manager->cancel(currentTask->taskId, true);
88                 continue;
89             }
90
91             // Measure scheduling overhead as difference between the time
92             // that the task wanted to wake up and the current time
93             now = gethrtime();
94             hrtime_t woketime = currentTask->getWaketime();
95             engine->getEpStore()->logQTime(currentTask->getTypeId(),
96                                            now > woketime ? now - woketime
97                                                           : 0);
98
99             taskStart = now;
100             rel_time_t startReltime = ep_current_time();
101             try {
102                 LOG(EXTENSION_LOG_DEBUG,
103                     "%s: Run task \"%s\" id %d",
104                 getName().c_str(), currentTask->getDescription().c_str(),
105                 currentTask->getId());
106
107                 // Now Run the Task ....
108                 currentTask->setState(TASK_RUNNING, TASK_SNOOZED);
109                 bool again = currentTask->run();
110
111                 // Task done, log it ...
112                 hrtime_t runtime((gethrtime() - taskStart) / 1000);
113                 engine->getEpStore()->logRunTime(currentTask->getTypeId(),
114                                                runtime);
115                 ObjectRegistry::onSwitchThread(NULL);
116                 addLogEntry(engine->getName() + currentTask->getDescription(),
117                         q->getQueueType(), runtime, startReltime,
118                         (runtime >
119                          (hrtime_t)currentTask->maxExpectedDuration()));
120                 ObjectRegistry::onSwitchThread(engine);
121                 // Check if task is run once or needs to be rescheduled..
122                 if (!again || currentTask->isdead()) {
123                     // release capacity back to TaskQueue
124                     manager->doneWork(curTaskType);
125                     manager->cancel(currentTask->taskId, true);
126                 } else {
127                     hrtime_t new_waketime;
128                     // if a task has not set snooze, update its waketime to now
129                     // before rescheduling for more accurate timing histograms
130                     currentTask->updateWaketimeIfLessThan(now);
131
132                     // release capacity back to TaskQueue ..
133                     manager->doneWork(curTaskType);
134                     new_waketime = q->reschedule(currentTask, curTaskType);
135                     // record min waketime ...
136                     if (new_waketime < waketime) {
137                         waketime = new_waketime;
138                     }
139                     LOG(EXTENSION_LOG_DEBUG, "%s: Reschedule a task"
140                             " \"%s\" id %d[%llu %llu |%llu]",
141                             name.c_str(),
142                             currentTask->getDescription().c_str(),
143                             currentTask->getId(), new_waketime,
144                             currentTask->getWaketime(),
145                             waketime.load());
146                 }
147             } catch (std::exception& e) {
148                 LOG(EXTENSION_LOG_WARNING,
149                     "%s: Exception caught in task \"%s\": %s", name.c_str(),
150                     currentTask->getDescription().c_str(), e.what());
151             } catch(...) {
152                 LOG(EXTENSION_LOG_WARNING,
153                     "%s: Fatal exception caught in task \"%s\"\n",
154                     name.c_str(), currentTask->getDescription().c_str());
155             }
156         }
157     }
158     state = EXECUTOR_DEAD;
159 }
160
161 void ExecutorThread::setCurrentTask(ExTask newTask) {
162     LockHolder lh(currentTaskMutex);
163     currentTask = newTask;
164 }
165
166 void ExecutorThread::addLogEntry(const std::string &desc,
167                                  const task_type_t taskType,
168                                  const hrtime_t runtime,
169                                  rel_time_t t, bool isSlowJob) {
170     LockHolder lh(logMutex);
171     TaskLogEntry tle(desc, taskType, runtime, t);
172     if (isSlowJob) {
173         slowjobs.add(tle);
174     } else {
175         tasklog.add(tle);
176     }
177 }
178
179 const std::string ExecutorThread::getStateName() {
180     switch (state.load()) {
181     case EXECUTOR_CREATING:
182         return std::string("creating");
183     case EXECUTOR_RUNNING:
184         return std::string("running");
185     case EXECUTOR_WAITING:
186         return std::string("waiting");
187     case EXECUTOR_SLEEPING:
188         return std::string("sleeping");
189     case EXECUTOR_SHUTDOWN:
190         return std::string("shutdown");
191     default:
192         return std::string("dead");
193     }
194 }