[BP] MB-18452: Single threaded test harness improvements
[ep-engine.git] / src / fakes / fake_executorpool.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2016 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 /*
19  * FakeExecutorPool / FakeExecutorThread
20  *
21  * A pair of classes which act as a fake ExecutorPool for testing purposes.
22  * Only executes tasks when explicitly told, and only on the main thread.
23  *
24  * See SingleThreadedEPStoreTest for basic usage.
25  *
26  * TODO: Improve usage documentation.
27  */
28
29 #pragma once
30
31 #include "executorpool.h"
32 #include "executorthread.h"
33 #include "taskqueue.h"
34
35 #include <gtest/gtest.h>
36
37 class SingleThreadedExecutorPool : public ExecutorPool {
38 public:
39
40     /* Registers an instance of this class as "the" executorpool (i.e. what
41      * you get when you call ExecutorPool::get()).
42      *
43      * This *must* be called before the normal ExecutorPool is created.
44      */
45     static void replaceExecutorPoolWithFake() {
46         LockHolder lh(initGuard);
47         ExecutorPool* tmp = ExecutorPool::instance;
48         if (tmp != NULL) {
49             throw std::runtime_error("replaceExecutorPoolWithFake: "
50                     "ExecutorPool instance already created - cowardly refusing to continue!");
51         }
52
53         EventuallyPersistentEngine *epe =
54                 ObjectRegistry::onSwitchThread(NULL, true);
55         tmp = new SingleThreadedExecutorPool(NUM_TASK_GROUPS);
56         ObjectRegistry::onSwitchThread(epe);
57         instance = tmp;
58     }
59
60     SingleThreadedExecutorPool(size_t nTaskSets)
61         : ExecutorPool(/*threads*/0, nTaskSets, 0, 0, 0, 0) {
62     }
63
64     bool _startWorkers() {
65         // Don't actually start any worker threads (all work will be done
66         // synchronously in the same thread) - but we do need to set
67         // maxWorkers to at least 1 otherwise ExecutorPool::tryNewWork() will
68         // never return any work.
69
70         maxWorkers[WRITER_TASK_IDX] = 1;
71         maxWorkers[READER_TASK_IDX] = 1;
72         maxWorkers[AUXIO_TASK_IDX]  = 1;
73         maxWorkers[NONIO_TASK_IDX]  = 1;
74
75         return true;
76     }
77
78     // Helper methods to access normally protected state of ExecutorPool
79
80     TaskQ& getLpTaskQ() {
81         return lpTaskQ;
82     }
83
84     /*
85      * Mark all tasks as cancelled and remove the from the locator.
86      */
87     void cancelAll() {
88         for (auto& it : taskLocator) {
89             it.second.first->cancel();
90             // And force awake so he is "runnable"
91             it.second.second->wake(it.second.first);
92         }
93         taskLocator.clear();
94     }
95 };
96
97 /*
98  * A container for a single task to 'execute' on divorced of the logical thread.
99  * Performs checks of the taskQueue once execution is complete.
100  */
101 class CheckedExecutor : public ExecutorThread {
102 public:
103
104     CheckedExecutor(ExecutorPool* manager_, TaskQueue& q)
105         : ExecutorThread(manager_, q.getQueueType(), "checked_executor"),
106           queue(q),
107           preFutureQueueSize(queue.getFutureQueueSize()),
108           preReadyQueueSize(queue.getReadyQueueSize()),
109           rescheduled(false) {
110         if (!queue.fetchNextTask(*this, false)) {
111             throw std::logic_error("CheckedExecutor failed fetchNextTask");
112         }
113
114         // Configure a checker to run, some tasks are subtly different
115         if (getTaskName().compare("Snapshotting vbucket states") == 0 ||
116             getTaskName().compare("Removing closed unreferenced checkpoints from memory") == 0 ||
117             getTaskName().compare("Paging expired items.") == 0 ||
118             getTaskName().compare("Adjusting hash table sizes.") == 0) {
119             checker = [=](bool taskRescheduled) {
120                 // These tasks all schedule one other task
121                 this->oneExecutes(taskRescheduled, 1);
122             };
123         } else {
124             checker = [=](bool taskRescheduled) {
125                 this->oneExecutes(taskRescheduled, 0);
126             };
127         }
128     }
129
130     void runCurrentTask(const std::string& expectedTask) {
131         EXPECT_EQ(expectedTask, getTaskName());
132         run();
133     }
134
135     void runCurrentTask() {
136         run();
137     }
138
139     void completeCurrentTask() {
140         manager->doneWork(curTaskType);
141         if (rescheduled && !currentTask->isdead()) {
142             queue.reschedule(currentTask, curTaskType);
143         } else {
144             manager->cancel(currentTask->getId(), true);
145         }
146
147         if (!currentTask->isdead()) {
148             checker(rescheduled);
149         }
150     }
151
152     void updateCurrentTime() {
153         now = gethrtime();
154     }
155
156     ExTask& getCurrentTask() {
157         return currentTask;
158     }
159
160 private:
161
162     /*
163      * Performs checks based on the assumption that one task executes and can
164      * as part of that execution
165      *   - request itself to be rescheduled
166      *   - schedule other tasks (expectedToBeScheduled)
167      */
168     void oneExecutes(bool rescheduled, int expectedToBeScheduled) {
169         if (rescheduled) {
170             // One task executed and was rescheduled, account for it.
171             expectedToBeScheduled++;
172         }
173
174         // Check that the new sizes of the future and ready tally given
175         // one executed and n were scheduled as a side effect.
176         EXPECT_EQ((preFutureQueueSize + preReadyQueueSize) - 1,
177                   (queue.getFutureQueueSize() + queue.getReadyQueueSize()) -
178                   expectedToBeScheduled);
179     }
180
181     /*
182      * Run the task and record if it was rescheduled.
183      */
184     void run() {
185         rescheduled = currentTask->run();
186     }
187
188     TaskQueue& queue;
189     size_t preFutureQueueSize;
190     size_t preReadyQueueSize;
191     bool rescheduled;
192
193     /*
194      * A function object that runs post task execution for the purpose of
195      * running checks against state changes.
196      * The defined function accepts one boolean parameter that states if the
197      * task which just executed has been rescheduled.
198      */
199     std::function<void(bool)> checker;
200 };