MB-20054: Regression test - bucket is deleted with DCPBackfill running
[ep-engine.git] / tests / module_tests / evp_store_single_threaded_test.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "evp_store_test.h"
19
20 #include "fakes/fake_executorpool.h"
21 #include "programs/engine_testapp/mock_server.h"
22 #include "taskqueue.h"
23
24 /*
25  * A subclass of EventuallyPersistentStoreTest which uses a fake ExecutorPool,
26  * which will not spawn ExecutorThreads and hence not run any tasks
27  * automatically in the background. All tasks must be manually run().
28  */
29 class SingleThreadedEPStoreTest : public EventuallyPersistentStoreTest {
30     void SetUp() {
31         SingleThreadedExecutorPool::replaceExecutorPoolWithFake();
32         EventuallyPersistentStoreTest::SetUp();
33     }
34 };
35
36 static ENGINE_ERROR_CODE dummy_dcp_add_failover_cb(vbucket_failover_t* entry,
37                                                    size_t nentries,
38                                                    const void *cookie)
39 {
40     return ENGINE_SUCCESS;
41 }
42
43 /* Arguments for the background thread used by
44  * MB20054_onDeleteItem_during_bucket_deletion
45  */
46 typedef struct {
47     EventuallyPersistentEngine* engine;
48     FakeExecutorThread& fake_executor_thread;
49     SyncObject& backfill_cv;
50     SyncObject& destroy_cv;
51     TaskQueue* taskQ;
52 } mb20054_backfill_thread_params;
53
54 static void MB20054_run_backfill_task(void* arg) {
55     mb20054_backfill_thread_params* params = static_cast<mb20054_backfill_thread_params*>(arg);
56     EventuallyPersistentEngine* engine = params->engine;
57     FakeExecutorThread& auxio_thread = params->fake_executor_thread;
58     SyncObject& backfill_cv = params->backfill_cv;
59     SyncObject& destroy_cv = params->destroy_cv;
60
61     TaskQueue* lpAuxioQ = params->taskQ;
62
63     ObjectRegistry::onSwitchThread(engine);
64
65     // Run the DCPBackfill task to push items to readyQ. Should return
66     // false (i.e. one-shot).
67     EXPECT_FALSE(auxio_thread.getCurrentTask()->run());
68
69     // Notify the main thread that it can progress with destroying the
70     // engine [A].
71     {
72         LockHolder lh(backfill_cv);
73         backfill_cv.notifyOne();
74     }
75
76     // Now wait ourselves for destroy to be completed [B].
77     LockHolder lh(destroy_cv);
78     destroy_cv.wait();
79
80     // This is the only "hacky" part of the test - we need to somehow
81     // keep the DCPBackfill task 'running' - i.e. not call
82     // completeCurrentTask - until the main thread is in
83     // ExecutorPool::_stopTaskGroup. However we have no way from the test
84     // to properly signal that we are *inside* _stopTaskGroup -
85     // called from EVPStore's destructor.
86     // Best we can do is spin on waiting for the DCPBackfill task to be
87     // set to 'dead' - and only then completeCurrentTask; which will
88     // cancel the task.
89     while (!auxio_thread.getCurrentTask()->isdead()) {
90         // spin.
91     }
92     auxio_thread.completeCurrentTask();
93
94     // Cleanup - fetch the next (final) task -
95     // ActiveStreamCheckpointProcessorTask - so it can be cancelled
96     // and executorpool shut down.
97     auxio_thread.updateCurrentTime();
98     EXPECT_TRUE(lpAuxioQ->fetchNextTask(auxio_thread, false));
99     EXPECT_EQ("Process checkpoint(s) for DCP producer",
100               auxio_thread.getTaskName());
101     auxio_thread.runCurrentTask();
102 }
103
104 // Check that if onDeleteItem() is called during bucket deletion, we do not
105 // abort due to not having a valid thread-local 'engine' pointer. This
106 // has been observed when we have a DCPBackfill task which is deleted during
107 // bucket shutdown, which has a non-zero number of Items which are destructed
108 // (and call onDeleteItem).
109 TEST_F(SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
110     SingleThreadedExecutorPool* task_executor =
111             reinterpret_cast<SingleThreadedExecutorPool*>(ExecutorPool::get());
112
113     // Should start with no tasks registered on any queues.
114     TaskQ& lp_task_q = task_executor->getLpTaskQ();
115     for (int i = 0; i < lp_task_q.size(); i++) {
116         ASSERT_EQ(0, lp_task_q[i]->getFutureQueueSize());
117         ASSERT_EQ(0, lp_task_q[i]->getReadyQueueSize());
118     }
119
120     // [[1] Set our state to active. This should add a VBStatePersistTask to
121     // the WRITER queue.
122     EXPECT_EQ(ENGINE_SUCCESS,
123               store->setVBucketState(vbid, vbucket_state_active, false));
124
125     TaskQueue* lpWriterQ = task_executor->getLpTaskQ()[WRITER_TASK_IDX];
126     TaskQueue* lpAuxioQ = task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
127
128     EXPECT_EQ(1, lpWriterQ->getFutureQueueSize());
129     EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
130
131     // Use a FakeExecutorThread to fetch and run the persistTask.
132     FakeExecutorThread writer_thread(task_executor, WRITER_TASK_IDX);
133     writer_thread.updateCurrentTime();
134     EXPECT_TRUE(lpWriterQ->fetchNextTask(writer_thread, false));
135     EXPECT_EQ("Persisting a vbucket state for vbucket: 0",
136               writer_thread.getTaskName());
137     EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
138     EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
139     writer_thread.runCurrentTask();
140
141     // Perform one SET, then close it's checkpoint. This means that we no
142     // longer have all sequence numbers in memory checkpoints, forcing the
143     // DCP stream request to go to disk (backfill).
144     store_item(vbid, "key", "value");
145
146     // Force a new checkpoint.
147     RCPtr<VBucket> vb = store->getVbMap().getBucket(vbid);
148     CheckpointManager& ckpt_mgr = vb->checkpointManager;
149     ckpt_mgr.createNewCheckpoint();
150
151     EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
152     EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
153
154     EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
155     EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
156
157     // Directly flush the vbucket, ensuring data is on disk.
158     //  (This would normally also wake up the checkpoint remover task, but
159     //   as that task was never registered with the ExecutorPool in this test
160     //   environment, we need to manually remove the prev checkpoint).
161     EXPECT_EQ(1, store->flushVBucket(vbid));
162
163     bool new_ckpt_created;
164     EXPECT_EQ(1,
165               ckpt_mgr.removeClosedUnrefCheckpoints(vb, new_ckpt_created));
166     vb.reset();
167
168     EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
169     EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
170
171     // Create a DCP producer, and start a stream request.
172     std::string name("test_producer");
173     EXPECT_EQ(ENGINE_SUCCESS,
174               engine->dcpOpen(cookie, /*opaque:unused*/{}, /*seqno:unused*/{},
175                               DCP_OPEN_PRODUCER, name.data(), name.size()));
176
177     // Expect to have an ActiveStreamCheckpointProcessorTask, which is
178     // initially snoozed (so we can't run it).
179     EXPECT_EQ(1, lpAuxioQ->getFutureQueueSize());
180     EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
181
182     uint64_t rollbackSeqno;
183     // Actual stream request method (EvpDcpStreamReq) is static, so access via
184     // the engine_interface.
185     EXPECT_EQ(ENGINE_SUCCESS,
186               engine->dcp.stream_req(&engine->interface, cookie, /*flags*/0,
187                                      /*opaque*/0, /*vbucket*/vbid,
188                                      /*start_seqno*/0, /*end_seqno*/-1,
189                                      /*vb_uuid*/0xabcd, /*snap_start*/0,
190                                      /*snap_end*/0, &rollbackSeqno,
191                                      dummy_dcp_add_failover_cb));
192
193     // FutureQ should now have an additional DCPBackfill task.
194     EXPECT_EQ(2, lpAuxioQ->getFutureQueueSize());
195     EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
196
197     // Create an executor 'thread' to obtain shared ownership of the next
198     // AuxIO task (which should be DCPBackfill). As long as this
199     // object has it's currentTask set to DCPBackfill, the DCPBackfill task
200     // will not be deleted.
201     // Essentially we are simulating a concurrent thread running this task.
202     FakeExecutorThread auxio_thread(task_executor, AUXIO_TASK_IDX);
203     auxio_thread.updateCurrentTime();
204     EXPECT_TRUE(lpAuxioQ->fetchNextTask(auxio_thread, false));
205     EXPECT_EQ("DCP backfill for vbucket 0", auxio_thread.getTaskName());
206
207     // This is the one action we really need to perform 'concurrently' - delete
208     // the engine while a DCPBackfill task is still running. We spin up a
209     // separate thread which will run the DCPBackfill task
210     // concurrently with destroy - specifically DCPBackfill must start running
211     // (and add items to the readyQ) before destroy(), it must then continue
212     // running (stop after) _stopTaskGroup is invoked.
213     // To achieve this we use a couple of condition variables to synchronise
214     // between the two threads - the timeline needs to look like:
215     //
216     //  auxIO thread:  [------- DCPBackfill ----------]
217     //   main thread:          [destroy()]       [ExecutorPool::_stopTaskGroup]
218     //
219     //  --------------------------------------------------------> time
220     //
221     SyncObject backfill_cv;
222     SyncObject destroy_cv;
223
224     cb_thread_t concurrent_task_thread;
225     mb20054_backfill_thread_params params = {engine, auxio_thread, backfill_cv,
226                                      destroy_cv, lpAuxioQ};
227
228     cb_create_thread(&concurrent_task_thread, MB20054_run_backfill_task, &params, 0);
229
230     // [A] Wait for DCPBackfill to complete.
231     LockHolder lh(backfill_cv);
232     backfill_cv.wait();
233
234     // 'Destroy' the engine - this doesn't delete the object, just shuts down
235     // connections, marks streams as dead etc.
236     engine->destroy(/*force*/false);
237     destroy_mock_event_callbacks();
238
239     {
240         LockHolder lh(destroy_cv);
241         destroy_cv.notifyOne();
242     }
243
244     // Need to have the current engine valid before deleting (this is what
245     // EvpDestroy does normally; however we have a smart ptr to the engine
246     // so must delete via that).
247     ObjectRegistry::onSwitchThread(engine);
248     delete engine;
249     engine = NULL;
250
251     cb_join_thread(concurrent_task_thread);
252 }