[BP] MB-18452: Single threaded test harness improvements
[ep-engine.git] / tests / module_tests / evp_store_single_threaded_test.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "evp_store_test.h"
19
20 #include "fakes/fake_executorpool.h"
21 #include "programs/engine_testapp/mock_server.h"
22 #include "taskqueue.h"
23
24 /*
25  * A subclass of EventuallyPersistentStoreTest which uses a fake ExecutorPool,
26  * which will not spawn ExecutorThreads and hence not run any tasks
27  * automatically in the background. All tasks must be manually run().
28  */
29 class SingleThreadedEPStoreTest : public EventuallyPersistentStoreTest {
30     void SetUp() {
31         SingleThreadedExecutorPool::replaceExecutorPoolWithFake();
32         EventuallyPersistentStoreTest::SetUp();
33
34         task_executor = reinterpret_cast<SingleThreadedExecutorPool*>
35             (ExecutorPool::get());
36     }
37
38     void TearDown() {
39         if (engine) {
40             shutdownAndPurgeTasks();
41         }
42         EventuallyPersistentStoreTest::TearDown();
43     }
44
45 public:
46     /*
47      * Run the next task from the taskQ
48      * The task must match the expectedTaskName parameter
49      */
50     void runNextTask(TaskQueue& taskQ, const std::string& expectedTaskName) {
51         CheckedExecutor executor(task_executor, taskQ);
52
53         // Run the task
54         executor.runCurrentTask(expectedTaskName);
55         executor.completeCurrentTask();
56     }
57
58     /*
59      * Run the next task from the taskQ
60      */
61     void runNextTask(TaskQueue& taskQ) {
62         CheckedExecutor executor(task_executor, taskQ);
63
64         // Run the task
65         executor.runCurrentTask();
66         executor.completeCurrentTask();
67     }
68
69 protected:
70     /*
71      * Change the vbucket state and run the VBStatePeristTask
72      * On return the state will be changed and the task completed.
73      */
74     void setVBucketStateAndRunPersistTask(uint16_t vbid, vbucket_state_t newState) {
75         auto& lpWriterQ = *task_executor->getLpTaskQ()[WRITER_TASK_IDX];
76
77         // Change state - this should add 1 VBStatePersistTask to the WRITER queue.
78         EXPECT_EQ(ENGINE_SUCCESS,
79                   store->setVBucketState(vbid, newState, /*transfer*/false));
80
81         runNextTask(lpWriterQ, "Persisting a vbucket state for vbucket: "
82                                + std::to_string(vbid));
83     }
84
85     /*
86      * Set the stats isShutdown and attempt to drive all tasks to cancel
87      */
88     void shutdownAndPurgeTasks() {
89         engine->getEpStats().isShutdown = true;
90         task_executor->cancelAll();
91
92         for (task_type_t t :
93              {WRITER_TASK_IDX, READER_TASK_IDX, AUXIO_TASK_IDX, NONIO_TASK_IDX}) {
94
95             // Define a lambda to drive all tasks from the queue, if hpTaskQ
96             // is implemented then trivial to add a second call to runTasks.
97             auto runTasks = [=](TaskQueue& queue) {
98                 while (queue.getFutureQueueSize() > 0 || queue.getReadyQueueSize()> 0){
99                     runNextTask(queue);
100                 }
101             };
102             runTasks(*task_executor->getLpTaskQ()[t]);
103         }
104     }
105
106     SingleThreadedExecutorPool* task_executor;
107 };
108
109 static ENGINE_ERROR_CODE dummy_dcp_add_failover_cb(vbucket_failover_t* entry,
110                                                    size_t nentries,
111                                                    const void *cookie)
112 {
113     return ENGINE_SUCCESS;
114 }
115
116 /* Arguments for the background thread used by
117  * MB20054_onDeleteItem_during_bucket_deletion
118  */
119 typedef struct {
120     EventuallyPersistentEngine* engine;
121     CheckedExecutor& backfill;
122     SyncObject& backfill_cv;
123     SyncObject& destroy_cv;
124     TaskQueue* taskQ;
125 } mb20054_backfill_thread_params;
126
127 static void MB20054_run_backfill_task(void* arg) {
128     mb20054_backfill_thread_params* params = static_cast<mb20054_backfill_thread_params*>(arg);
129     EventuallyPersistentEngine* engine = params->engine;
130     CheckedExecutor& backfill = params->backfill;
131     SyncObject& backfill_cv = params->backfill_cv;
132     SyncObject& destroy_cv = params->destroy_cv;
133
134     TaskQueue* lpAuxioQ = params->taskQ;
135
136     ObjectRegistry::onSwitchThread(engine);
137
138     // Run the BackfillManagerTask task to push items to readyQ. In sherlock
139     // upwards this runs multiple times - so should return true.
140     backfill.runCurrentTask("Backfilling items for a DCP Connection");
141
142     // Notify the main thread that it can progress with destroying the
143     // engine [A].
144     {
145         LockHolder lh(backfill_cv);
146         backfill_cv.notifyOne();
147     }
148
149     // Now wait ourselves for destroy to be completed [B].
150     LockHolder lh(destroy_cv);
151     destroy_cv.wait();
152
153     // This is the only "hacky" part of the test - we need to somehow
154     // keep the DCPBackfill task 'running' - i.e. not call
155     // completeCurrentTask - until the main thread is in
156     // ExecutorPool::_stopTaskGroup. However we have no way from the test
157     // to properly signal that we are *inside* _stopTaskGroup -
158     // called from EVPStore's destructor.
159     // Best we can do is spin on waiting for the DCPBackfill task to be
160     // set to 'dead' - and only then completeCurrentTask; which will
161     // cancel the task.
162     while (!backfill.getCurrentTask()->isdead()) {
163         // spin.
164     }
165     backfill.completeCurrentTask();
166
167     // Cleanup - run the next (final) task -
168     // ActiveStreamCheckpointProcessorTask - so it can be cancelled
169     // and executorpool shut down.
170     CheckedExecutor executor(ExecutorPool::get(), *lpAuxioQ);
171     executor.runCurrentTask("Process checkpoint(s) for DCP producer");
172     executor.completeCurrentTask();
173 }
174
175 // Check that if onDeleteItem() is called during bucket deletion, we do not
176 // abort due to not having a valid thread-local 'engine' pointer. This
177 // has been observed when we have a DCPBackfill task which is deleted during
178 // bucket shutdown, which has a non-zero number of Items which are destructed
179 // (and call onDeleteItem).
180 TEST_F(SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
181     // Should start with no tasks registered on any queues.
182     TaskQ& lp_task_q = task_executor->getLpTaskQ();
183     for (auto& queue : lp_task_q) {
184         ASSERT_EQ(0, queue->getFutureQueueSize());
185         ASSERT_EQ(0, queue->getReadyQueueSize());
186     }
187
188     // [[1] Set our state to active.
189     setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
190
191     // Perform one SET, then close it's checkpoint. This means that we no
192     // longer have all sequence numbers in memory checkpoints, forcing the
193     // DCP stream request to go to disk (backfill).
194     store_item(vbid, "key", "value");
195
196     // Force a new checkpoint.
197     RCPtr<VBucket> vb = store->getVbMap().getBucket(vbid);
198     CheckpointManager& ckpt_mgr = vb->checkpointManager;
199     ckpt_mgr.createNewCheckpoint();
200     auto lpWriterQ = task_executor->getLpTaskQ()[WRITER_TASK_IDX];
201     EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
202     EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
203
204     auto lpAuxioQ = task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
205     EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
206     EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
207
208     // Directly flush the vbucket, ensuring data is on disk.
209     //  (This would normally also wake up the checkpoint remover task, but
210     //   as that task was never registered with the ExecutorPool in this test
211     //   environment, we need to manually remove the prev checkpoint).
212     EXPECT_EQ(1, store->flushVBucket(vbid));
213
214     bool new_ckpt_created;
215     EXPECT_EQ(1,
216               ckpt_mgr.removeClosedUnrefCheckpoints(vb, new_ckpt_created));
217     vb.reset();
218
219     EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
220     EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
221
222     // Create a DCP producer, and start a stream request.
223     std::string name("test_producer");
224     EXPECT_EQ(ENGINE_SUCCESS,
225               engine->dcpOpen(cookie, /*opaque:unused*/{}, /*seqno:unused*/{},
226                               DCP_OPEN_PRODUCER, name.data(), name.size()));
227
228     // Expect to have an ActiveStreamCheckpointProcessorTask, which is
229     // initially snoozed (so we can't run it).
230     EXPECT_EQ(1, lpAuxioQ->getFutureQueueSize());
231     EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
232
233     uint64_t rollbackSeqno;
234     // Actual stream request method (EvpDcpStreamReq) is static, so access via
235     // the engine_interface.
236     EXPECT_EQ(ENGINE_SUCCESS,
237               engine->dcp.stream_req(&engine->interface, cookie, /*flags*/0,
238                                      /*opaque*/0, /*vbucket*/vbid,
239                                      /*start_seqno*/0, /*end_seqno*/-1,
240                                      /*vb_uuid*/0xabcd, /*snap_start*/0,
241                                      /*snap_end*/0, &rollbackSeqno,
242                                      dummy_dcp_add_failover_cb));
243
244     // FutureQ should now have an additional DCPBackfill task.
245     EXPECT_EQ(2, lpAuxioQ->getFutureQueueSize());
246     EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
247
248     // Create an executor 'thread' to obtain shared ownership of the next
249     // AuxIO task (which should be BackfillManagerTask). As long as this
250     // object has it's currentTask set to BackfillManagerTask, the task
251     // will not be deleted.
252     // Essentially we are simulating a concurrent thread running this task.
253     CheckedExecutor backfill(task_executor, *lpAuxioQ);
254
255     // This is the one action we really need to perform 'concurrently' - delete
256     // the engine while a DCPBackfill task is still running. We spin up a
257     // separate thread which will run the DCPBackfill task
258     // concurrently with destroy - specifically DCPBackfill must start running
259     // (and add items to the readyQ) before destroy(), it must then continue
260     // running (stop after) _stopTaskGroup is invoked.
261     // To achieve this we use a couple of condition variables to synchronise
262     // between the two threads - the timeline needs to look like:
263     //
264     //  auxIO thread:  [------- DCPBackfill ----------]
265     //   main thread:          [destroy()]       [ExecutorPool::_stopTaskGroup]
266     //
267     //  --------------------------------------------------------> time
268     //
269     SyncObject backfill_cv;
270     SyncObject destroy_cv;
271
272     cb_thread_t concurrent_task_thread;
273     mb20054_backfill_thread_params params = {engine, backfill, backfill_cv,
274                                              destroy_cv, lpAuxioQ};
275
276     cb_create_thread(&concurrent_task_thread, MB20054_run_backfill_task, &params, 0);
277
278     // [A] Wait for DCPBackfill to complete.
279     LockHolder lh(backfill_cv);
280     backfill_cv.wait();
281
282     ObjectRegistry::onSwitchThread(engine);
283     // 'Destroy' the engine - this doesn't delete the object, just shuts down
284     // connections, marks streams as dead etc.
285     engine->destroy(/*force*/false);
286     destroy_mock_event_callbacks();
287
288     {
289         LockHolder lh(destroy_cv);
290         destroy_cv.notifyOne();
291     }
292
293     // Need to have the current engine valid before deleting (this is what
294     // EvpDestroy does normally; however we have a smart ptr to the engine
295     // so must delete via that).
296     delete engine;
297     engine = NULL;
298
299     cb_join_thread(concurrent_task_thread);
300 }