Merge remote-tracking branch 'couchbase/3.0.x' into sherlock 15/65615/3
authorDave Rigby <daver@couchbase.com>
Fri, 8 Jul 2016 13:44:29 +0000 (14:44 +0100)
committerDave Rigby <daver@couchbase.com>
Fri, 8 Jul 2016 16:39:58 +0000 (17:39 +0100)
* couchbase/3.0.x:
  MB-20054: Regression test - bucket is deleted with DCPBackfill running
  MB-20054: Account for memory alloc/dealloc in unregisterBucket
  MB-20054: [BP] Add verbose (logging) output to ep_unit_tests_main

Change-Id: I5f05dd3355cc0d581350db65463c6c1dc155f3c6

1  2 
src/connmap.cc
src/ep_engine.cc
src/ep_engine.h
src/executorpool.cc
src/executorthread.cc
tests/module_tests/ep_unit_tests_main.cc
tests/module_tests/evp_store_single_threaded_test.cc
tests/module_tests/evp_store_test.cc

diff --cc src/connmap.cc
Simple merge
  #include "dcp-producer.h"
  #include "warmup.h"
  
 -#include <JSON_checker.h>
 -
  static ALLOCATOR_HOOKS_API *hooksApi;
- static SERVER_LOG_API *loggerApi;
+ SERVER_LOG_API* EventuallyPersistentEngine::loggerApi;
  
  static size_t percentOf(size_t val, double percent) {
      return static_cast<size_t>(static_cast<double>(val) * percent);
diff --cc src/ep_engine.h
@@@ -785,11 -737,11 +785,13 @@@ public
       * @param add_response The method used to format the output buffer
       * @return ENGINE_SUCCESS upon success
       */
 -    ENGINE_ERROR_CODE getAllVBucketSequenceNumbers(const void *cookie,
 -                                                   ADD_RESPONSE response);
 +    ENGINE_ERROR_CODE getAllVBucketSequenceNumbers(
 +                                        const void *cookie,
 +                                        protocol_binary_request_header *request,
 +                                        ADD_RESPONSE response);
  
+     static SERVER_LOG_API *loggerApi;
  protected:
      friend class EpEngineValueChangeListener;
  
@@@ -610,11 -605,11 +610,12 @@@ void ExecutorPool::_unregisterBucket(Ev
      }
  }
  
 -void ExecutorPool::unregisterBucket(EventuallyPersistentEngine *engine) {
 +void ExecutorPool::unregisterBucket(EventuallyPersistentEngine *engine,
 +                                    bool force) {
-     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
+     // Note: unregistering a bucket is special - any memory allocations /
+     // deallocations made while unregistering *should* be accounted to the
+     // bucket in question - hence no `onSwitchThread(NULL)` call.
 -    _unregisterBucket(engine);
 +    _unregisterBucket(engine, force);
-     ObjectRegistry::onSwitchThread(epe);
  }
  
  void ExecutorPool::doTaskQStat(EventuallyPersistentEngine *engine,
Simple merge
  #include "configuration.h"
  #include "stored-value.h"
  
- /* static storage for environment variable set by putenv(). */
- static char allow_no_stats_env[] = "ALLOW_NO_STATS_UPDATE=yeah";
  
  int main(int argc, char **argv) {
-     putenv(allow_no_stats_env);
+     bool log_to_stderr = false;
+     // Parse command-line options.
+     int cmd;
+     bool invalid_argument = false;
+     while (!invalid_argument &&
+            (cmd = getopt(argc, argv, "v")) != EOF) {
+         switch (cmd) {
+         case 'v':
+             log_to_stderr = true;
+             break;
+         default:
+             std::cerr << "Usage: " << argv[0] << " [-v] [gtest_options...]" << std::endl
+                       << std::endl
+                       << "  -v Verbose - Print verbose output to stderr."
+                       << std::endl << std::endl;
+             invalid_argument = true;
+             break;
+         }
+     }
+     init_mock_server(log_to_stderr);
+     get_mock_server_api()->log->set_level(EXTENSION_LOG_DEBUG);
+     if (memcached_initialize_stderr_logger(get_mock_server_api) != EXTENSION_SUCCESS) {
+         std::cerr << argv[0] << ": Failed to initialize log system" << std::endl;
+         return 1;
+     }
  
-     init_mock_server(false);
 +    mock_init_alloc_hooks();
  
      // Default number of hashtable locks is too large for TSan to
      // track. Use the value in configuration.json (47 at time of
@@@ -31,3 -32,221 +32,222 @@@ class SingleThreadedEPStoreTest : publi
          EventuallyPersistentStoreTest::SetUp();
      }
  };
 -    // Run the DCPBackfill task to push items to readyQ. Should return
 -    // false (i.e. one-shot).
 -    EXPECT_FALSE(auxio_thread.getCurrentTask()->run());
+ static ENGINE_ERROR_CODE dummy_dcp_add_failover_cb(vbucket_failover_t* entry,
+                                                    size_t nentries,
+                                                    const void *cookie)
+ {
+     return ENGINE_SUCCESS;
+ }
+ /* Arguments for the background thread used by
+  * MB20054_onDeleteItem_during_bucket_deletion
+  */
+ typedef struct {
+     EventuallyPersistentEngine* engine;
+     FakeExecutorThread& fake_executor_thread;
+     SyncObject& backfill_cv;
+     SyncObject& destroy_cv;
+     TaskQueue* taskQ;
+ } mb20054_backfill_thread_params;
+ static void MB20054_run_backfill_task(void* arg) {
+     mb20054_backfill_thread_params* params = static_cast<mb20054_backfill_thread_params*>(arg);
+     EventuallyPersistentEngine* engine = params->engine;
+     FakeExecutorThread& auxio_thread = params->fake_executor_thread;
+     SyncObject& backfill_cv = params->backfill_cv;
+     SyncObject& destroy_cv = params->destroy_cv;
+     TaskQueue* lpAuxioQ = params->taskQ;
+     ObjectRegistry::onSwitchThread(engine);
 -    for (int i = 0; i < lp_task_q.size(); i++) {
 -        ASSERT_EQ(0, lp_task_q[i]->getFutureQueueSize());
 -        ASSERT_EQ(0, lp_task_q[i]->getReadyQueueSize());
++    // Run the BackfillManagerTask task to push items to readyQ. In sherlock
++    // upwards this runs multiple times - so should return true.
++    EXPECT_TRUE(auxio_thread.getCurrentTask()->run());
+     // Notify the main thread that it can progress with destroying the
+     // engine [A].
+     {
+         LockHolder lh(backfill_cv);
+         backfill_cv.notifyOne();
+     }
+     // Now wait ourselves for destroy to be completed [B].
+     LockHolder lh(destroy_cv);
+     destroy_cv.wait();
+     // This is the only "hacky" part of the test - we need to somehow
+     // keep the DCPBackfill task 'running' - i.e. not call
+     // completeCurrentTask - until the main thread is in
+     // ExecutorPool::_stopTaskGroup. However we have no way from the test
+     // to properly signal that we are *inside* _stopTaskGroup -
+     // called from EVPStore's destructor.
+     // Best we can do is spin on waiting for the DCPBackfill task to be
+     // set to 'dead' - and only then completeCurrentTask; which will
+     // cancel the task.
+     while (!auxio_thread.getCurrentTask()->isdead()) {
+         // spin.
+     }
+     auxio_thread.completeCurrentTask();
+     // Cleanup - fetch the next (final) task -
+     // ActiveStreamCheckpointProcessorTask - so it can be cancelled
+     // and executorpool shut down.
+     auxio_thread.updateCurrentTime();
+     EXPECT_TRUE(lpAuxioQ->fetchNextTask(auxio_thread, false));
+     EXPECT_EQ("Process checkpoint(s) for DCP producer",
+               auxio_thread.getTaskName());
+     auxio_thread.runCurrentTask();
+ }
+ // Check that if onDeleteItem() is called during bucket deletion, we do not
+ // abort due to not having a valid thread-local 'engine' pointer. This
+ // has been observed when we have a DCPBackfill task which is deleted during
+ // bucket shutdown, which has a non-zero number of Items which are destructed
+ // (and call onDeleteItem).
+ TEST_F(SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
+     SingleThreadedExecutorPool* task_executor =
+             reinterpret_cast<SingleThreadedExecutorPool*>(ExecutorPool::get());
+     // Should start with no tasks registered on any queues.
+     TaskQ& lp_task_q = task_executor->getLpTaskQ();
 -    // AuxIO task (which should be DCPBackfill). As long as this
 -    // object has it's currentTask set to DCPBackfill, the DCPBackfill task
++    for (auto& queue : lp_task_q) {
++        ASSERT_EQ(0, queue->getFutureQueueSize());
++        ASSERT_EQ(0, queue->getReadyQueueSize());
+     }
+     // [[1] Set our state to active. This should add a VBStatePersistTask to
+     // the WRITER queue.
+     EXPECT_EQ(ENGINE_SUCCESS,
+               store->setVBucketState(vbid, vbucket_state_active, false));
+     TaskQueue* lpWriterQ = task_executor->getLpTaskQ()[WRITER_TASK_IDX];
+     TaskQueue* lpAuxioQ = task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
+     EXPECT_EQ(1, lpWriterQ->getFutureQueueSize());
+     EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
+     // Use a FakeExecutorThread to fetch and run the persistTask.
+     FakeExecutorThread writer_thread(task_executor, WRITER_TASK_IDX);
+     writer_thread.updateCurrentTime();
+     EXPECT_TRUE(lpWriterQ->fetchNextTask(writer_thread, false));
+     EXPECT_EQ("Persisting a vbucket state for vbucket: 0",
+               writer_thread.getTaskName());
+     EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
+     EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
+     writer_thread.runCurrentTask();
+     // Perform one SET, then close it's checkpoint. This means that we no
+     // longer have all sequence numbers in memory checkpoints, forcing the
+     // DCP stream request to go to disk (backfill).
+     store_item(vbid, "key", "value");
+     // Force a new checkpoint.
+     RCPtr<VBucket> vb = store->getVbMap().getBucket(vbid);
+     CheckpointManager& ckpt_mgr = vb->checkpointManager;
+     ckpt_mgr.createNewCheckpoint();
+     EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
+     EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
+     EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
+     EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
+     // Directly flush the vbucket, ensuring data is on disk.
+     //  (This would normally also wake up the checkpoint remover task, but
+     //   as that task was never registered with the ExecutorPool in this test
+     //   environment, we need to manually remove the prev checkpoint).
+     EXPECT_EQ(1, store->flushVBucket(vbid));
+     bool new_ckpt_created;
+     EXPECT_EQ(1,
+               ckpt_mgr.removeClosedUnrefCheckpoints(vb, new_ckpt_created));
+     vb.reset();
+     EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
+     EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
+     // Create a DCP producer, and start a stream request.
+     std::string name("test_producer");
+     EXPECT_EQ(ENGINE_SUCCESS,
+               engine->dcpOpen(cookie, /*opaque:unused*/{}, /*seqno:unused*/{},
+                               DCP_OPEN_PRODUCER, name.data(), name.size()));
+     // Expect to have an ActiveStreamCheckpointProcessorTask, which is
+     // initially snoozed (so we can't run it).
+     EXPECT_EQ(1, lpAuxioQ->getFutureQueueSize());
+     EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
+     uint64_t rollbackSeqno;
+     // Actual stream request method (EvpDcpStreamReq) is static, so access via
+     // the engine_interface.
+     EXPECT_EQ(ENGINE_SUCCESS,
+               engine->dcp.stream_req(&engine->interface, cookie, /*flags*/0,
+                                      /*opaque*/0, /*vbucket*/vbid,
+                                      /*start_seqno*/0, /*end_seqno*/-1,
+                                      /*vb_uuid*/0xabcd, /*snap_start*/0,
+                                      /*snap_end*/0, &rollbackSeqno,
+                                      dummy_dcp_add_failover_cb));
+     // FutureQ should now have an additional DCPBackfill task.
+     EXPECT_EQ(2, lpAuxioQ->getFutureQueueSize());
+     EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
+     // Create an executor 'thread' to obtain shared ownership of the next
 -    EXPECT_EQ("DCP backfill for vbucket 0", auxio_thread.getTaskName());
++    // AuxIO task (which should be BackfillManagerTask). As long as this
++    // object has it's currentTask set to BackfillManagerTask, the task
+     // will not be deleted.
+     // Essentially we are simulating a concurrent thread running this task.
+     FakeExecutorThread auxio_thread(task_executor, AUXIO_TASK_IDX);
+     auxio_thread.updateCurrentTime();
+     EXPECT_TRUE(lpAuxioQ->fetchNextTask(auxio_thread, false));
++    EXPECT_EQ("Backfilling items for a DCP Connection",
++              auxio_thread.getTaskName());
+     // This is the one action we really need to perform 'concurrently' - delete
+     // the engine while a DCPBackfill task is still running. We spin up a
+     // separate thread which will run the DCPBackfill task
+     // concurrently with destroy - specifically DCPBackfill must start running
+     // (and add items to the readyQ) before destroy(), it must then continue
+     // running (stop after) _stopTaskGroup is invoked.
+     // To achieve this we use a couple of condition variables to synchronise
+     // between the two threads - the timeline needs to look like:
+     //
+     //  auxIO thread:  [------- DCPBackfill ----------]
+     //   main thread:          [destroy()]       [ExecutorPool::_stopTaskGroup]
+     //
+     //  --------------------------------------------------------> time
+     //
+     SyncObject backfill_cv;
+     SyncObject destroy_cv;
+     cb_thread_t concurrent_task_thread;
+     mb20054_backfill_thread_params params = {engine, auxio_thread, backfill_cv,
+                                      destroy_cv, lpAuxioQ};
+     cb_create_thread(&concurrent_task_thread, MB20054_run_backfill_task, &params, 0);
+     // [A] Wait for DCPBackfill to complete.
+     LockHolder lh(backfill_cv);
+     backfill_cv.wait();
+     // 'Destroy' the engine - this doesn't delete the object, just shuts down
+     // connections, marks streams as dead etc.
+     engine->destroy(/*force*/false);
+     destroy_mock_event_callbacks();
+     {
+         LockHolder lh(destroy_cv);
+         destroy_cv.notifyOne();
+     }
+     // Need to have the current engine valid before deleting (this is what
+     // EvpDestroy does normally; however we have a smart ptr to the engine
+     // so must delete via that).
+     ObjectRegistry::onSwitchThread(engine);
+     delete engine;
+     engine = NULL;
+     cb_join_thread(concurrent_task_thread);
+ }
Simple merge