1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2016 Couchbase, Inc.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 * FakeExecutorPool / FakeExecutorThread
21 * A pair of classes which act as a fake ExecutorPool for testing purposes.
22 * Only executes tasks when explicitly told, and only on the main thread.
24 * See SingleThreadedEPStoreTest for basic usage.
26 * TODO: Improve usage documentation.
31 #include "executorpool.h"
32 #include "executorthread.h"
33 #include "taskqueue.h"
35 #include <gtest/gtest.h>
37 class SingleThreadedExecutorPool : public ExecutorPool {
40 /* Registers an instance of this class as "the" executorpool (i.e. what
41 * you get when you call ExecutorPool::get()).
43 * This *must* be called before the normal ExecutorPool is created.
45 static void replaceExecutorPoolWithFake() {
46 LockHolder lh(initGuard);
47 ExecutorPool* tmp = ExecutorPool::instance;
49 throw std::runtime_error("replaceExecutorPoolWithFake: "
50 "ExecutorPool instance already created - cowardly refusing to continue!");
53 EventuallyPersistentEngine *epe =
54 ObjectRegistry::onSwitchThread(NULL, true);
55 tmp = new SingleThreadedExecutorPool(NUM_TASK_GROUPS);
56 ObjectRegistry::onSwitchThread(epe);
60 SingleThreadedExecutorPool(size_t nTaskSets)
61 : ExecutorPool(/*threads*/0, nTaskSets, 0, 0, 0, 0) {
64 bool _startWorkers() {
65 // Don't actually start any worker threads (all work will be done
66 // synchronously in the same thread) - but we do need to set
67 // maxWorkers to at least 1 otherwise ExecutorPool::tryNewWork() will
68 // never return any work.
70 maxWorkers[WRITER_TASK_IDX] = 1;
71 maxWorkers[READER_TASK_IDX] = 1;
72 maxWorkers[AUXIO_TASK_IDX] = 1;
73 maxWorkers[NONIO_TASK_IDX] = 1;
78 // Helper methods to access normally protected state of ExecutorPool
85 * Mark all tasks as cancelled and remove the from the locator.
88 for (auto& it : taskLocator) {
89 it.second.first->cancel();
90 // And force awake so he is "runnable"
91 it.second.second->wake(it.second.first);
98 * A container for a single task to 'execute' on divorced of the logical thread.
99 * Performs checks of the taskQueue once execution is complete.
101 class CheckedExecutor : public ExecutorThread {
104 CheckedExecutor(ExecutorPool* manager_, TaskQueue& q)
105 : ExecutorThread(manager_, q.getQueueType(), "checked_executor"),
107 preFutureQueueSize(queue.getFutureQueueSize()),
108 preReadyQueueSize(queue.getReadyQueueSize()),
110 if (!queue.fetchNextTask(*this, false)) {
111 throw std::logic_error("CheckedExecutor failed fetchNextTask");
114 // Configure a checker to run, some tasks are subtly different
115 if (getTaskName().compare("Snapshotting vbucket states") == 0 ||
116 getTaskName().compare("Removing closed unreferenced checkpoints from memory") == 0 ||
117 getTaskName().compare("Paging expired items.") == 0 ||
118 getTaskName().compare("Adjusting hash table sizes.") == 0) {
119 checker = [=](bool taskRescheduled) {
120 // These tasks all schedule one other task
121 this->oneExecutes(taskRescheduled, 1);
124 checker = [=](bool taskRescheduled) {
125 this->oneExecutes(taskRescheduled, 0);
130 void runCurrentTask(const std::string& expectedTask) {
131 EXPECT_EQ(expectedTask, getTaskName());
135 void runCurrentTask() {
139 void completeCurrentTask() {
140 manager->doneWork(curTaskType);
141 if (rescheduled && !currentTask->isdead()) {
142 queue.reschedule(currentTask, curTaskType);
144 manager->cancel(currentTask->getId(), true);
147 if (!currentTask->isdead()) {
148 checker(rescheduled);
152 void updateCurrentTime() {
156 ExTask& getCurrentTask() {
163 * Performs checks based on the assumption that one task executes and can
164 * as part of that execution
165 * - request itself to be rescheduled
166 * - schedule other tasks (expectedToBeScheduled)
168 void oneExecutes(bool rescheduled, int expectedToBeScheduled) {
170 // One task executed and was rescheduled, account for it.
171 expectedToBeScheduled++;
174 // Check that the new sizes of the future and ready tally given
175 // one executed and n were scheduled as a side effect.
176 EXPECT_EQ((preFutureQueueSize + preReadyQueueSize) - 1,
177 (queue.getFutureQueueSize() + queue.getReadyQueueSize()) -
178 expectedToBeScheduled);
182 * Run the task and record if it was rescheduled.
185 rescheduled = currentTask->run();
189 size_t preFutureQueueSize;
190 size_t preReadyQueueSize;
194 * A function object that runs post task execution for the purpose of
195 * running checks against state changes.
196 * The defined function accepts one boolean parameter that states if the
197 * task which just executed has been rescheduled.
199 std::function<void(bool)> checker;