MB-20054: Regression test - bucket is deleted with DCPBackfill running 20/65520/14
authorDave Rigby <daver@couchbase.com>
Thu, 7 Jul 2016 08:23:25 +0000 (09:23 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 7 Jul 2016 15:59:46 +0000 (15:59 +0000)
Regression test for MB-20054 - the following abort is encountered when
a DCPBackfill task is still running when a bucket is deleted:

    Assertion failed: (engine), function verifyEngine, file
    ep-engine/src/objectregistry.cc, line 58.

This issue occurs because the DCPBackfill object (and associated
objects ActiveStream and importantly ActiveStreams' readyQ of Items)
is not deleted earlier in the shutdown sequence (via EvpDestroy), as
we use ref-counted pointers for it and there is a still an outstanding
reference by the AuxIO Thread which is running the task. Hence the
DCPBackfill object is only deleted when we finally unregister the
deleted bucket from the shared ExecutorPool - see the following
backtrace:

    #1  0x00007f513b75a085 in abort () from /lib64/libc.so.6
    #2  0x00007f51337034e2 in ObjectRegistry::onDeleteItem (pItem=<value optimized out>) at ep-engine/src/objectregistry.cc:157
    #3  0x00007f5133652094 in ~Item (this=<value optimized out>) at ep-engine/src/item.h:352
    #4  SingleThreadedRCPtr<Item>::~SingleThreadedRCPtr (this=<value optimized out>) at ep-engine/src/atomic.h:430
    #5  0x00007f51336c7f47 in ~MutationResponse (this=0x3cd87880) at ep-engine/src/dcp-response.h:275
    #6  MutationResponse::~MutationResponse (this=0x3cd87880) at ep-engine/src/dcp-response.h:275
    #7  0x00007f51336d86aa in clear_UNLOCKED (this=0x7a3f5fa0) at ep-engine/src/dcp-stream.cc:201
    #8  ~ActiveStream (this=0x7a3f5fa0) at ep-engine/src/dcp-stream.h:178
    #9  ActiveStream::~ActiveStream (this=0x7a3f5fa0) at ep-engine/src/dcp-stream.h:179
    #10 0x00007f51336cc808 in RCPtr<Stream>::~RCPtr (this=0xb1823780) at ep-engine/src/atomic.h:348
    #11 0x00007f51336d77c7 in ~DCPBackfill (this=0xb1823740) at ep-engine/src/dcp-stream.cc:114
    #12 DCPBackfill::~DCPBackfill (this=0xb1823740) at ep-engine/src/dcp-stream.cc:114
    #13 0x00007f513368d95f in ~SingleThreadedRCPtr (this=0x5b55a20, e=0x59c4000, taskType=NO_TASK_TYPE) at ep-engine/src/atomic.h:430
    #14 ExecutorPool::_stopTaskGroup (this=0x5b55a20, e=0x59c4000, taskType=NO_TASK_TYPE) at ep-engine/src/executorpool.cc:532
    #15 0x00007f513368dad3 in ExecutorPool::_unregisterBucket (this=0x5b55a20, engine=0x59c4000) at ep-engine/src/executorpool.cc:551
    #16 0x00007f513368e143 in ExecutorPool::unregisterBucket (this=0x5b55a20, engine=0x59c4000) at ep-engine/src/executorpool.cc:602
    #17 0x00007f5133655f82 in EventuallyPersistentStore::~EventuallyPersistentStore (this=0x59e6000)
        at ep-engine/src/ep.cc:365
    #18 0x00007f5133672a25 in EventuallyPersistentEngine::~EventuallyPersistentEngine (this=0x59c4000)
        at ep-engine/src/ep_engine.cc:5791
    #19 0x00007f5133672c95 in EvpDestroy (handle=0x59c4000, force=<value optimized out>) at ep-engine/src/ep_engine.cc:143

To actually reproduce the issue is somewhat involved - we need to
orchestrate the world such that we delete the engine while a
DCPBackfill task is still running. We spin up a separate thread which
will run the DCPBackfill task concurrently with destroy - specifically
DCPBackfill must start running (and add items to the readyQ) before
destroy(), it must then continue running (stop after) _stopTaskGroup
is invoked.  To achieve this we use a couple of condition variables to
synchronise between the two threads - the timeline needs to look like:

    auxIO thread:  [------- DCPBackfill ----------]
     main thread:      [--destroy()--]       [ExecutorPool::_stopTaskGroup]

    --------------------------------------------------------> time

Change-Id: Ic64c419cb8e4e0af2378efba9711b121aacee15b
Reviewed-on: http://review.couchbase.org/65520
Well-Formed: buildbot <build@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Jim Walker <jim@couchbase.com>
src/connmap.cc
src/connmap.h
src/ep_engine.cc
src/ep_engine.h
src/executorthread.cc
src/fakes/fake_executorpool.h
src/warmup.cc
tests/module_tests/ep_unit_tests_main.cc
tests/module_tests/evp_store_single_threaded_test.cc
tests/module_tests/evp_store_test.cc

index 26b493c..e59cf8b 100644 (file)
@@ -109,8 +109,9 @@ void ConnNotifier::stop() {
 void ConnNotifier::notifyMutationEvent(void) {
     bool inverse = false;
     if (pendingNotification.compare_exchange_strong(inverse, true)) {
-        cb_assert(task > 0);
-        ExecutorPool::get()->wake(task);
+        if (task > 0) {
+            ExecutorPool::get()->wake(task);
+        }
     }
 }
 
@@ -754,7 +755,9 @@ bool TapConnMap::mapped(connection_t &tc) {
 void TapConnMap::shutdownAllConnections() {
     LOG(EXTENSION_LOG_WARNING, "Shutting down tap connections!");
 
-    connNotifier_->stop();
+    if (connNotifier_ != NULL) {
+        connNotifier_->stop();
+    }
 
     // Not safe to acquire both connsLock and releaseLock at the same time
     // (can trigger deadlock), so first acquire releaseLock to release all
@@ -1023,7 +1026,9 @@ DcpProducer *DcpConnMap::newProducer(const void* cookie,
 void DcpConnMap::shutdownAllConnections() {
     LOG(EXTENSION_LOG_WARNING, "Shutting down dcp connections!");
 
-    connNotifier_->stop();
+    if (connNotifier_ != NULL) {
+        connNotifier_->stop();
+    }
 
     // Not safe to acquire both connsLock and releaseLock at the same time
     // (can trigger deadlock), so first acquire releaseLock to release all
index 0726898..4ef8665 100644 (file)
@@ -254,7 +254,10 @@ protected:
 class ConnNotifier {
 public:
     ConnNotifier(conn_notifier_type ntype, ConnMap &cm)
-        : notifier_type(ntype), connMap(cm), pendingNotification(false)  { }
+        : notifier_type(ntype),
+          connMap(cm),
+          task(0),
+          pendingNotification(false)  { }
 
     void start();
 
index 8dc4f82..9012c8e 100644 (file)
@@ -5661,7 +5661,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::dcpOpen(const void* cookie,
                                                        uint32_t opaque,
                                                        uint32_t seqno,
                                                        uint32_t flags,
-                                                       void *stream_name,
+                                                       const void *stream_name,
                                                        uint16_t nname)
 {
     (void) opaque;
index a1fa67f..0323d5b 100644 (file)
@@ -429,7 +429,7 @@ public:
                               uint32_t opaque,
                               uint32_t seqno,
                               uint32_t flags,
-                              void *stream_name,
+                              const void *stream_name,
                               uint16_t nname);
 
     ENGINE_ERROR_CODE dcpAddStream(const void* cookie,
index a7f62dd..383d199 100644 (file)
@@ -78,6 +78,7 @@ void ExecutorThread::run() {
             break;
         }
 
+        now = gethrtime();
         if (TaskQueue *q = manager->nextTask(*this, tick)) {
             EventuallyPersistentEngine *engine = currentTask->getEngine();
             ObjectRegistry::onSwitchThread(engine);
@@ -90,7 +91,6 @@ void ExecutorThread::run() {
 
             // Measure scheduling overhead as difference between the time
             // that the task wanted to wake up and the current time
-            now = gethrtime();
             hrtime_t woketime = currentTask->getWaketime();
             engine->getEpStore()->logQTime(currentTask->getTypeId(),
                                            now > woketime ? now - woketime
index 63cc4ce..a8cca39 100644 (file)
@@ -82,7 +82,7 @@ public:
 };
 
 /* A fake execution 'thread', to be used with the FakeExecutorPool Allows
- * execution of tasks synchronously in the current thrad.
+ * execution of tasks synchronously in the current thread.
  */
 class FakeExecutorThread : public ExecutorThread {
 public:
@@ -93,8 +93,15 @@ public:
     void runCurrentTask() {
         // Only supports one-shot tasks
         EXPECT_FALSE(currentTask->run());
+        completeCurrentTask();
+    }
+
+    // 'completes' the current task; useful if the caller wants to seperately
+    // run() the current task and then tidy up afterwards.
+    void completeCurrentTask() {
         manager->doneWork(curTaskType);
         manager->cancel(currentTask->getId(), true);
+        currentTask.reset();
     }
 
     ExTask& getCurrentTask() {
index beb2655..ce036bf 100644 (file)
@@ -361,7 +361,7 @@ void LoadValueCallback::callback(CacheLookup &lookup)
 
 
 Warmup::Warmup(EventuallyPersistentStore *st) :
-    state(), store(st), startTime(0), metadata(0), warmup(0),
+    state(), store(st), taskId(0), startTime(0), metadata(0), warmup(0),
     threadtask_count(0),
     estimateTime(0), estimatedItemCount(std::numeric_limits<size_t>::max()),
     cleanShutdown(true), corruptAccessLog(false), warmupComplete(false),
index 4923636..d34d38e 100644 (file)
@@ -28,8 +28,6 @@
 #include "configuration.h"
 #include "stored-value.h"
 
-/* static storage for environment variable set by putenv(). */
-static char allow_no_stats_env[] = "ALLOW_NO_STATS_UPDATE=yeah";
 
 int main(int argc, char **argv) {
     bool log_to_stderr = false;
@@ -52,8 +50,6 @@ int main(int argc, char **argv) {
         }
     }
 
-    putenv(allow_no_stats_env);
-
     init_mock_server(log_to_stderr);
     get_mock_server_api()->log->set_level(EXTENSION_LOG_DEBUG);
 
index 0115490..426ca2d 100644 (file)
@@ -18,6 +18,7 @@
 #include "evp_store_test.h"
 
 #include "fakes/fake_executorpool.h"
+#include "programs/engine_testapp/mock_server.h"
 #include "taskqueue.h"
 
 /*
@@ -31,3 +32,221 @@ class SingleThreadedEPStoreTest : public EventuallyPersistentStoreTest {
         EventuallyPersistentStoreTest::SetUp();
     }
 };
+
+static ENGINE_ERROR_CODE dummy_dcp_add_failover_cb(vbucket_failover_t* entry,
+                                                   size_t nentries,
+                                                   const void *cookie)
+{
+    return ENGINE_SUCCESS;
+}
+
+/* Arguments for the background thread used by
+ * MB20054_onDeleteItem_during_bucket_deletion
+ */
+typedef struct {
+    EventuallyPersistentEngine* engine;
+    FakeExecutorThread& fake_executor_thread;
+    SyncObject& backfill_cv;
+    SyncObject& destroy_cv;
+    TaskQueue* taskQ;
+} mb20054_backfill_thread_params;
+
+static void MB20054_run_backfill_task(void* arg) {
+    mb20054_backfill_thread_params* params = static_cast<mb20054_backfill_thread_params*>(arg);
+    EventuallyPersistentEngine* engine = params->engine;
+    FakeExecutorThread& auxio_thread = params->fake_executor_thread;
+    SyncObject& backfill_cv = params->backfill_cv;
+    SyncObject& destroy_cv = params->destroy_cv;
+
+    TaskQueue* lpAuxioQ = params->taskQ;
+
+    ObjectRegistry::onSwitchThread(engine);
+
+    // Run the DCPBackfill task to push items to readyQ. Should return
+    // false (i.e. one-shot).
+    EXPECT_FALSE(auxio_thread.getCurrentTask()->run());
+
+    // Notify the main thread that it can progress with destroying the
+    // engine [A].
+    {
+        LockHolder lh(backfill_cv);
+        backfill_cv.notifyOne();
+    }
+
+    // Now wait ourselves for destroy to be completed [B].
+    LockHolder lh(destroy_cv);
+    destroy_cv.wait();
+
+    // This is the only "hacky" part of the test - we need to somehow
+    // keep the DCPBackfill task 'running' - i.e. not call
+    // completeCurrentTask - until the main thread is in
+    // ExecutorPool::_stopTaskGroup. However we have no way from the test
+    // to properly signal that we are *inside* _stopTaskGroup -
+    // called from EVPStore's destructor.
+    // Best we can do is spin on waiting for the DCPBackfill task to be
+    // set to 'dead' - and only then completeCurrentTask; which will
+    // cancel the task.
+    while (!auxio_thread.getCurrentTask()->isdead()) {
+        // spin.
+    }
+    auxio_thread.completeCurrentTask();
+
+    // Cleanup - fetch the next (final) task -
+    // ActiveStreamCheckpointProcessorTask - so it can be cancelled
+    // and executorpool shut down.
+    auxio_thread.updateCurrentTime();
+    EXPECT_TRUE(lpAuxioQ->fetchNextTask(auxio_thread, false));
+    EXPECT_EQ("Process checkpoint(s) for DCP producer",
+              auxio_thread.getTaskName());
+    auxio_thread.runCurrentTask();
+}
+
+// Check that if onDeleteItem() is called during bucket deletion, we do not
+// abort due to not having a valid thread-local 'engine' pointer. This
+// has been observed when we have a DCPBackfill task which is deleted during
+// bucket shutdown, which has a non-zero number of Items which are destructed
+// (and call onDeleteItem).
+TEST_F(SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
+    SingleThreadedExecutorPool* task_executor =
+            reinterpret_cast<SingleThreadedExecutorPool*>(ExecutorPool::get());
+
+    // Should start with no tasks registered on any queues.
+    TaskQ& lp_task_q = task_executor->getLpTaskQ();
+    for (int i = 0; i < lp_task_q.size(); i++) {
+        ASSERT_EQ(0, lp_task_q[i]->getFutureQueueSize());
+        ASSERT_EQ(0, lp_task_q[i]->getReadyQueueSize());
+    }
+
+    // [[1] Set our state to active. This should add a VBStatePersistTask to
+    // the WRITER queue.
+    EXPECT_EQ(ENGINE_SUCCESS,
+              store->setVBucketState(vbid, vbucket_state_active, false));
+
+    TaskQueue* lpWriterQ = task_executor->getLpTaskQ()[WRITER_TASK_IDX];
+    TaskQueue* lpAuxioQ = task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
+
+    EXPECT_EQ(1, lpWriterQ->getFutureQueueSize());
+    EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
+
+    // Use a FakeExecutorThread to fetch and run the persistTask.
+    FakeExecutorThread writer_thread(task_executor, WRITER_TASK_IDX);
+    writer_thread.updateCurrentTime();
+    EXPECT_TRUE(lpWriterQ->fetchNextTask(writer_thread, false));
+    EXPECT_EQ("Persisting a vbucket state for vbucket: 0",
+              writer_thread.getTaskName());
+    EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
+    EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
+    writer_thread.runCurrentTask();
+
+    // Perform one SET, then close it's checkpoint. This means that we no
+    // longer have all sequence numbers in memory checkpoints, forcing the
+    // DCP stream request to go to disk (backfill).
+    store_item(vbid, "key", "value");
+
+    // Force a new checkpoint.
+    RCPtr<VBucket> vb = store->getVbMap().getBucket(vbid);
+    CheckpointManager& ckpt_mgr = vb->checkpointManager;
+    ckpt_mgr.createNewCheckpoint();
+
+    EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
+    EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
+
+    EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
+    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
+
+    // Directly flush the vbucket, ensuring data is on disk.
+    //  (This would normally also wake up the checkpoint remover task, but
+    //   as that task was never registered with the ExecutorPool in this test
+    //   environment, we need to manually remove the prev checkpoint).
+    EXPECT_EQ(1, store->flushVBucket(vbid));
+
+    bool new_ckpt_created;
+    EXPECT_EQ(1,
+              ckpt_mgr.removeClosedUnrefCheckpoints(vb, new_ckpt_created));
+    vb.reset();
+
+    EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
+    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
+
+    // Create a DCP producer, and start a stream request.
+    std::string name("test_producer");
+    EXPECT_EQ(ENGINE_SUCCESS,
+              engine->dcpOpen(cookie, /*opaque:unused*/{}, /*seqno:unused*/{},
+                              DCP_OPEN_PRODUCER, name.data(), name.size()));
+
+    // Expect to have an ActiveStreamCheckpointProcessorTask, which is
+    // initially snoozed (so we can't run it).
+    EXPECT_EQ(1, lpAuxioQ->getFutureQueueSize());
+    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
+
+    uint64_t rollbackSeqno;
+    // Actual stream request method (EvpDcpStreamReq) is static, so access via
+    // the engine_interface.
+    EXPECT_EQ(ENGINE_SUCCESS,
+              engine->dcp.stream_req(&engine->interface, cookie, /*flags*/0,
+                                     /*opaque*/0, /*vbucket*/vbid,
+                                     /*start_seqno*/0, /*end_seqno*/-1,
+                                     /*vb_uuid*/0xabcd, /*snap_start*/0,
+                                     /*snap_end*/0, &rollbackSeqno,
+                                     dummy_dcp_add_failover_cb));
+
+    // FutureQ should now have an additional DCPBackfill task.
+    EXPECT_EQ(2, lpAuxioQ->getFutureQueueSize());
+    EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
+
+    // Create an executor 'thread' to obtain shared ownership of the next
+    // AuxIO task (which should be DCPBackfill). As long as this
+    // object has it's currentTask set to DCPBackfill, the DCPBackfill task
+    // will not be deleted.
+    // Essentially we are simulating a concurrent thread running this task.
+    FakeExecutorThread auxio_thread(task_executor, AUXIO_TASK_IDX);
+    auxio_thread.updateCurrentTime();
+    EXPECT_TRUE(lpAuxioQ->fetchNextTask(auxio_thread, false));
+    EXPECT_EQ("DCP backfill for vbucket 0", auxio_thread.getTaskName());
+
+    // This is the one action we really need to perform 'concurrently' - delete
+    // the engine while a DCPBackfill task is still running. We spin up a
+    // separate thread which will run the DCPBackfill task
+    // concurrently with destroy - specifically DCPBackfill must start running
+    // (and add items to the readyQ) before destroy(), it must then continue
+    // running (stop after) _stopTaskGroup is invoked.
+    // To achieve this we use a couple of condition variables to synchronise
+    // between the two threads - the timeline needs to look like:
+    //
+    //  auxIO thread:  [------- DCPBackfill ----------]
+    //   main thread:          [destroy()]       [ExecutorPool::_stopTaskGroup]
+    //
+    //  --------------------------------------------------------> time
+    //
+    SyncObject backfill_cv;
+    SyncObject destroy_cv;
+
+    cb_thread_t concurrent_task_thread;
+    mb20054_backfill_thread_params params = {engine, auxio_thread, backfill_cv,
+                                     destroy_cv, lpAuxioQ};
+
+    cb_create_thread(&concurrent_task_thread, MB20054_run_backfill_task, &params, 0);
+
+    // [A] Wait for DCPBackfill to complete.
+    LockHolder lh(backfill_cv);
+    backfill_cv.wait();
+
+    // 'Destroy' the engine - this doesn't delete the object, just shuts down
+    // connections, marks streams as dead etc.
+    engine->destroy(/*force*/false);
+    destroy_mock_event_callbacks();
+
+    {
+        LockHolder lh(destroy_cv);
+        destroy_cv.notifyOne();
+    }
+
+    // Need to have the current engine valid before deleting (this is what
+    // EvpDestroy does normally; however we have a smart ptr to the engine
+    // so must delete via that).
+    ObjectRegistry::onSwitchThread(engine);
+    delete engine;
+    engine = NULL;
+
+    cb_join_thread(concurrent_task_thread);
+}
index 50e698a..9e1f109 100644 (file)
@@ -32,6 +32,7 @@
 #include "connmap.h"
 #include "ep_engine.h"
 #include "flusher.h"
+#include "tapthrottle.h"
 #include "../mock/mock_dcp_producer.h"
 
 #include "programs/engine_testapp/mock_server.h"
@@ -64,6 +65,12 @@ SynchronousEPEngine::SynchronousEPEngine(const std::string& extra_config)
 
     // checkpointConfig is needed by CheckpointManager (via EPStore).
     checkpointConfig = new CheckpointConfig(*this);
+
+    // tapConfig is needed by doTapStats().
+    tapConfig = new TapConfig(*this);
+
+    // tapThrottle is needed by doEngineStats().
+    tapThrottle = new TapThrottle(configuration, stats);
 }
 
 void SynchronousEPEngine::setEPStore(EventuallyPersistentStore* store) {
@@ -132,7 +139,9 @@ void EventuallyPersistentStoreTest::SetUp() {
 void EventuallyPersistentStoreTest::TearDown() {
     destroy_mock_cookie(cookie);
     destroy_mock_event_callbacks();
-    engine->getDcpConnMap().manageConnections();
+    if (engine) {
+        engine->getDcpConnMap().manageConnections();
+    }
 
     // Need to have the current engine valid before deleting (this is what
     // EvpDestroy does normally; however we have a smart ptr to the engine