MB-20054: Backport ep-engine_unit_tests from watson to 3.0.x 79/64979/11
authorDave Rigby <daver@couchbase.com>
Thu, 16 Jun 2016 11:19:34 +0000 (12:19 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 7 Jul 2016 15:59:27 +0000 (15:59 +0000)
In Watson we have created a set of 'unit' (i.e. class-level) tests for
ep-engine. To assist in backporting bug fixes, and specifically their
unit tests (to demonstrate they are correct), this patch backports the
test infrastructure itself.

Note these tests require GTest, so the CMake changes necessary for it
have also been included.

Tests are a backport from couchbase/watson as of commit feda304.
Modified to handle changes in APIs etc, and to remove tests
which fail on 3.0.x as we never chose to fix them in the 3.0.x
branch.

Change-Id: Iaaf59b0d8d6ba0a2211b630ba00fd837ca01614a
Reviewed-on: http://review.couchbase.org/64979
Well-Formed: buildbot <build@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Jim Walker <jim@couchbase.com>
30 files changed:
CMakeLists.txt
src/atomicqueue.h
src/bgfetcher.cc
src/connmap.cc
src/connmap.h
src/dcp-consumer.h
src/dcp-producer.h
src/dcp-stream.h
src/ep.h
src/ep_engine.cc
src/ep_engine.h
src/ep_time.c
src/ep_time.h
src/executorpool.cc
src/executorpool.h
src/executorthread.h
src/fakes/fake_executorpool.h [new file with mode: 0644]
src/flusher.cc
src/memory_tracker.cc
src/tasks.h
tests/mock/mock_dcp_consumer.h [new file with mode: 0644]
tests/mock/mock_dcp_producer.h [new file with mode: 0644]
tests/module_tests/dcp_test.cc [new file with mode: 0644]
tests/module_tests/ep_unit_tests_main.cc [new file with mode: 0644]
tests/module_tests/evp_engine_test.cc [new file with mode: 0644]
tests/module_tests/evp_engine_test.h [new file with mode: 0644]
tests/module_tests/evp_store_single_threaded_test.cc [new file with mode: 0644]
tests/module_tests/evp_store_test.cc [new file with mode: 0644]
tests/module_tests/evp_store_test.h [new file with mode: 0644]
tests/module_tests/stream_test.cc [deleted file]

index c90b5d6..b39982d 100644 (file)
@@ -15,10 +15,14 @@ INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include
                            ${CMAKE_CURRENT_BINARY_DIR}/src
                            ${SNAPPY_INCLUDE_DIR}
                            ${Platform_SOURCE_DIR}/include
+                           ${Memcached_SOURCE_DIR}
                            ${Memcached_SOURCE_DIR}/include
                            ${Couchstore_SOURCE_DIR}/include
                            ${CMAKE_CURRENT_BINARY_DIR})
 
+INCLUDE_DIRECTORIES(AFTER
+                    ${gtest_SOURCE_DIR}/include)
+
 CHECK_INCLUDE_FILES("arpa/inet.h" HAVE_ARPA_INET_H)
 CHECK_INCLUDE_FILES("unistd.h" HAVE_UNISTD_H)
 CHECK_INCLUDE_FILES("netdb.h" HAVE_NETDB_H)
@@ -137,8 +141,17 @@ ADD_LIBRARY(ep SHARED
 SET_TARGET_PROPERTIES(ep PROPERTIES PREFIX "")
 TARGET_LINK_LIBRARIES(ep cJSON JSON_checker couchstore dirutils platform ${LIBEVENT_LIBRARIES})
 
-ADD_EXECUTABLE(ep-engine_stream_test
-  tests/module_tests/stream_test.cc
+# Single executable containing all class-level unit tests involving
+# EventuallyPersistentEngine driven by GoogleTest.
+# (We end up compiling most of the src/ files of ep-engine for these unit tests,
+# so simpler / quicker just to link them into a single executable).
+ADD_EXECUTABLE(ep-engine_ep_unit_tests
+  tests/mock/mock_dcp.cc
+  tests/module_tests/ep_unit_tests_main.cc
+  tests/module_tests/dcp_test.cc
+  tests/module_tests/evp_engine_test.cc
+  tests/module_tests/evp_store_test.cc
+  tests/module_tests/evp_store_single_threaded_test.cc
   src/access_scanner.cc
   src/atomic.cc
   src/backfill.cc
@@ -180,7 +193,7 @@ ADD_EXECUTABLE(ep-engine_stream_test
   ${KVSTORE_SOURCE}
   ${COUCH_KVSTORE_SOURCE}
   ${Memcached_SOURCE_DIR}/programs/engine_testapp/mock_server.c)
-TARGET_LINK_LIBRARIES(ep-engine_stream_test couchstore cJSON dirutils JSON_checker mcd_util platform)
+TARGET_LINK_LIBRARIES(ep-engine_ep_unit_tests couchstore cJSON dirutils gtest JSON_checker mcd_util platform)
 
 ADD_EXECUTABLE(ep-engine_atomic_ptr_test
   tests/module_tests/atomic_ptr_test.cc
@@ -239,6 +252,7 @@ ADD_TEST(ep-engine_atomic_ptr_test ep-engine_atomic_ptr_test)
 ADD_TEST(ep-engine_atomic_test ep-engine_atomic_test)
 ADD_TEST(ep-engine_checkpoint_test ep-engine_checkpoint_test)
 ADD_TEST(ep-engine_chunk_creation_test ep-engine_chunk_creation_test)
+ADD_TEST(ep-engine_ep_unit_tests ep-engine_ep_unit_tests)
 ADD_TEST(ep-engine_failover_table_test ep-engine_failover_table_test)
 ADD_TEST(ep-engine_hash_table_test ep-engine_hash_table_test)
 ADD_TEST(ep-engine_histo_test ep-engine_histo_test)
@@ -247,7 +261,6 @@ ADD_TEST(ep-engine_misc_test ep-engine_misc_test)
 ADD_TEST(ep-engine_mutex_test ep-engine_mutex_test)
 ADD_TEST(ep-engine_priority_test ep-engine_priority_test)
 ADD_TEST(ep-engine_ringbuffer_test ep-engine_ringbuffer_test)
-ADD_TEST(ep-engine_stream_test ep-engine_stream_test)
 
 ADD_LIBRARY(timing_tests SHARED tests/module_tests/timing_tests.cc)
 SET_TARGET_PROPERTIES(timing_tests PROPERTIES PREFIX "")
index a8644dc..531ac32 100644 (file)
@@ -18,6 +18,8 @@
 #ifndef SRC_ATOMICQUEUE_H_
 #define SRC_ATOMICQUEUE_H_ 1
 
+#include "atomic.h"
+
 #ifdef _MSC_VER
 
 #include <queue>
index 1c57eba..50f96db 100644 (file)
@@ -41,7 +41,6 @@ void BgFetcher::start() {
 void BgFetcher::stop() {
     bool inverse = true;
     pendingFetch.compare_exchange_strong(inverse, false);
-    cb_assert(taskId > 0);
     ExecutorPool::get()->cancel(taskId);
 }
 
index 09d8b35..26b493c 100644 (file)
@@ -179,7 +179,8 @@ private:
 };
 
 ConnMap::ConnMap(EventuallyPersistentEngine &theEngine)
-    :  engine(theEngine) {
+    :  engine(theEngine),
+       connNotifier_(NULL) {
 
     Configuration &config = engine.getConfiguration();
     vbConnLocks = new SpinLock[vbConnLockNum];
@@ -198,7 +199,9 @@ void ConnMap::initialize(conn_notifier_type ntype) {
 
 ConnMap::~ConnMap() {
     delete [] vbConnLocks;
-    connNotifier_->stop();
+    if (connNotifier_ != NULL) {
+        connNotifier_->stop();
+    }
     delete connNotifier_;
 }
 
index a4e0dc2..0726898 100644 (file)
@@ -484,7 +484,7 @@ public:
 
     void addStats(ADD_STAT add_stat, const void *c);
 
-private:
+protected:
 
     bool isPassiveStreamConnected_UNLOCKED(uint16_t vbucket);
 
index 0f4d4eb..df3fdf8 100644 (file)
@@ -95,7 +95,7 @@ public:
 
     bool isStreamPresent(uint16_t vbucket);
 
-private:
+protected:
 
     DcpResponse* getNextItem();
 
index b88c5cc..24df502 100644 (file)
@@ -194,7 +194,7 @@ public:
     */
     void cancelCheckpointProcessorTask();
 
-private:
+protected:
 
     /**
      * DcpProducerReadyQueue is a std::queue wrapper for managing a
index ad8b29b..1910866 100644 (file)
@@ -226,8 +226,6 @@ protected:
 
     bool nextCheckpointItem();
 
-private:
-
     void transitionState(stream_state_t newState);
 
     DcpResponse* backfillPhase();
index 1fa757b..5ed0b96 100644 (file)
--- a/src/ep.h
+++ b/src/ep.h
@@ -747,8 +747,6 @@ protected:
     void warmupCompleted();
     void stopWarmup(void);
 
-private:
-
     void scheduleVBDeletion(RCPtr<VBucket> &vb,
                             const void* cookie,
                             double delay = 0);
index a26236d..fd19d9b 100644 (file)
@@ -1745,15 +1745,21 @@ extern "C" {
         }
         delete inital_tracking;
 
-        ep_current_time = api->core->get_current_time;
-        ep_abs_time = api->core->abstime;
-        ep_reltime = api->core->realtime;
+        initialize_time_functions(api->core);
 
         *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
 
         return ENGINE_SUCCESS;
     }
 
+    /*
+        This method is called prior to unloading of the shared-object.
+        Global clean-up should be performed from this method.
+     */
+    void destroy_engine() {
+        ExecutorPool::shutdown();
+    }
+
     static bool EvpGetItemInfo(ENGINE_HANDLE *, const void *,
                                const item* itm, item_info *itm_info)
     {
@@ -2020,8 +2026,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
         return ENGINE_ENOMEM;
     }
 
-    // Register the callback
-    registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
+    initializeEngineCallbacks();
 
     // Complete the initialization of the ep-store
     if (!epstore->initialize()) {
@@ -2843,6 +2848,11 @@ TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
     return rv;
 }
 
+void EventuallyPersistentEngine::initializeEngineCallbacks() {
+    // Register the callback
+    registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
+}
+
 ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
                                                             uint32_t seqno,
                                                             uint16_t status,
index 0eb5aa1..08ea991 100644 (file)
@@ -49,6 +49,10 @@ extern "C" {
     ENGINE_ERROR_CODE create_instance(uint64_t interface,
                                       GET_SERVER_API get_server_api,
                                       ENGINE_HANDLE **handle);
+
+    EXPORT_FUNCTION
+    void destroy_engine(void);
+
     void EvpNotifyPendingConns(void*arg);
 }
 
@@ -751,7 +755,6 @@ protected:
         getlMaxTimeout = value;
     }
 
-private:
     EventuallyPersistentEngine(GET_SERVER_API get_server_api);
     friend ENGINE_ERROR_CODE create_instance(uint64_t interface,
                                              GET_SERVER_API get_server_api,
@@ -884,6 +887,10 @@ private:
     // If this method returns NULL, you should return TAP_DISCONNECT
     TapProducer* getTapProducer(const void *cookie);
 
+    // Initialize all required callbacks of this engine with the underlying
+    // server.
+    void initializeEngineCallbacks();
+
     SERVER_HANDLE_V1 *serverApi;
     EventuallyPersistentStore *epstore;
     WorkLoadPolicy *workload;
index 104b627..1400f78 100644 (file)
@@ -36,6 +36,19 @@ static rel_time_t default_reltime(time_t notused) {
     return 0;
 }
 
+
+void initialize_time_functions(const SERVER_CORE_API* core_api) {
+    if (ep_current_time == uninitialized_current_time) {
+        ep_current_time = core_api->get_current_time;
+    }
+    if (ep_abs_time == default_abs_time) {
+        ep_abs_time = core_api->abstime;
+    }
+    if (ep_reltime == default_reltime) {
+        ep_reltime = core_api->realtime;
+    }
+}
+
 rel_time_t (*ep_current_time)(void) = uninitialized_current_time;
 time_t (*ep_abs_time)(rel_time_t) = default_abs_time;
 rel_time_t (*ep_reltime)(time_t) = default_reltime;
index 72fe27f..4a99bbd 100644 (file)
@@ -19,6 +19,7 @@
 
 #include "config.h"
 
+#include <memcached/server_api.h>
 #include <memcached/types.h>
 #include <time.h>
 
 extern "C" {
 #endif
 
+/* Initializes the below time functions using the function pointers
+ * provided by the specified SERVER_CORE_API. This function should be
+ * called before attempting to use them, typically by the first engine
+ * loaded.
+ * Note: Only the first call to this function will have any effect,
+ * i.e.  once initialized the functions should not be modified to
+ * prevent data races between the different threads which use them.
+ */
+void initialize_time_functions(const SERVER_CORE_API* core_api);
+
 extern rel_time_t (*ep_current_time)(void);
 extern time_t (*ep_abs_time)(rel_time_t);
 extern rel_time_t (*ep_reltime)(time_t);
index cfc0dcd..401c52e 100644 (file)
@@ -146,6 +146,13 @@ ExecutorPool *ExecutorPool::get(void) {
     return instance;
 }
 
+void ExecutorPool::shutdown(void) {
+    if (instance) {
+        delete instance;
+        instance = NULL;
+    }
+}
+
 ExecutorPool::ExecutorPool(size_t maxThreads, size_t nTaskSets,
                            size_t maxReaders, size_t maxWriters,
                            size_t maxAuxIO,   size_t maxNonIO) :
@@ -172,7 +179,8 @@ ExecutorPool::ExecutorPool(size_t maxThreads, size_t nTaskSets,
 
 ExecutorPool::~ExecutorPool(void) {
     delete [] curWorkers;
-    free(maxWorkers);
+    delete[] maxWorkers;
+    delete[] numReadyTasks;
     if (isHiPrioQset) {
         for (size_t i = 0; i < numTaskSets; i++) {
             delete hpTaskQ[i];
index 1ea16c5..cf839c7 100644 (file)
@@ -123,16 +123,18 @@ public:
 
     static ExecutorPool *get(void);
 
-private:
+    static void shutdown(void);
+
+protected:
 
     ExecutorPool(size_t t, size_t nTaskSets, size_t r, size_t w, size_t a,
                  size_t n);
-    ~ExecutorPool(void);
+    virtual ~ExecutorPool(void);
 
     TaskQueue* _nextTask(ExecutorThread &t, uint8_t tick);
     bool _cancel(size_t taskId, bool eraseTask=false);
     bool _wake(size_t taskId);
-    bool _startWorkers(void);
+    virtual bool _startWorkers(void);
     bool _snooze(size_t taskId, double tosleep);
     size_t _schedule(ExTask task, task_type_t qidx);
     void _registerBucket(EventuallyPersistentEngine *engine);
index c21957a..955bf3d 100644 (file)
@@ -123,7 +123,7 @@ public:
 
     const hrtime_t getCurTime(void) { return now; }
 
-private:
+protected:
 
     cb_thread_t thread;
     ExecutorPool *manager;
diff --git a/src/fakes/fake_executorpool.h b/src/fakes/fake_executorpool.h
new file mode 100644 (file)
index 0000000..63cc4ce
--- /dev/null
@@ -0,0 +1,107 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2016 Couchbase, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+/*
+ * FakeExecutorPool / FakeExecutorThread
+ *
+ * A pair of classes which act as a fake ExecutorPool for testing purposes.
+ * Only executes tasks when explicitly told, and only on the main thread.
+ *
+ * See SingleThreadedEPStoreTest for basic usage.
+ *
+ * TODO: Improve usage documentation.
+ */
+
+#pragma once
+
+#include "executorpool.h"
+#include "executorthread.h"
+
+#include <gtest/gtest.h>
+
+class SingleThreadedExecutorPool : public ExecutorPool {
+public:
+
+    /* Registers an instance of this class as "the" executorpool (i.e. what
+     * you get when you call ExecutorPool::get()).
+     *
+     * This *must* be called before the normal ExecutorPool is created.
+     */
+    static void replaceExecutorPoolWithFake() {
+        LockHolder lh(initGuard);
+        ExecutorPool* tmp = ExecutorPool::instance;
+        if (tmp != NULL) {
+            throw std::runtime_error("replaceExecutorPoolWithFake: "
+                    "ExecutorPool instance already created - cowardly refusing to continue!");
+        }
+
+        EventuallyPersistentEngine *epe =
+                ObjectRegistry::onSwitchThread(NULL, true);
+        tmp = new SingleThreadedExecutorPool(NUM_TASK_GROUPS);
+        ObjectRegistry::onSwitchThread(epe);
+        instance = tmp;
+    }
+
+    SingleThreadedExecutorPool(size_t nTaskSets)
+        : ExecutorPool(/*threads*/0, nTaskSets, 0, 0, 0, 0) {
+    }
+
+    bool _startWorkers() {
+        // Don't actually start any worker threads (all work will be done
+        // synchronously in the same thread) - but we do need to set
+        // maxWorkers to at least 1 otherwise ExecutorPool::tryNewWork() will
+        // never return any work.
+
+        maxWorkers[WRITER_TASK_IDX] = 1;
+        maxWorkers[READER_TASK_IDX] = 1;
+        maxWorkers[AUXIO_TASK_IDX]  = 1;
+        maxWorkers[NONIO_TASK_IDX]  = 1;
+
+        return true;
+    }
+
+    // Helper methods to access normally protected state of ExecutorPool
+
+    TaskQ& getLpTaskQ() {
+        return lpTaskQ;
+    }
+};
+
+/* A fake execution 'thread', to be used with the FakeExecutorPool Allows
+ * execution of tasks synchronously in the current thrad.
+ */
+class FakeExecutorThread : public ExecutorThread {
+public:
+    FakeExecutorThread(ExecutorPool* manager_, int startingQueue)
+        : ExecutorThread(manager_, startingQueue, "mock_executor") {
+    }
+
+    void runCurrentTask() {
+        // Only supports one-shot tasks
+        EXPECT_FALSE(currentTask->run());
+        manager->doneWork(curTaskType);
+        manager->cancel(currentTask->getId(), true);
+    }
+
+    ExTask& getCurrentTask() {
+        return currentTask;
+    }
+
+    void updateCurrentTime() {
+        now = gethrtime();
+    }
+};
index 40da8ca..d01e69b 100644 (file)
@@ -150,8 +150,10 @@ void Flusher::start() {
 }
 
 void Flusher::wake(void) {
-    cb_assert(taskId > 0);
-    ExecutorPool::get()->wake(taskId);
+    // taskId becomes zero if the flusher were stopped
+    if (taskId > 0) {
+        ExecutorPool::get()->wake(taskId);
+    }
 }
 
 bool Flusher::step(GlobalTask *task) {
index 4685584..419f16a 100644 (file)
@@ -98,6 +98,7 @@ MemoryTracker::~MemoryTracker() {
         tracking = false;
         cb_join_thread(statsThreadId);
     }
+    free(stats.ext_stats);
     instance = NULL;
 }
 
index d74322c..aa8c561 100644 (file)
@@ -113,7 +113,7 @@ public:
     /**
      * Puts the task to sleep for a given duration.
      */
-    void snooze(const double secs);
+    virtual void snooze(const double secs);
 
     /**
      * Returns the id of this task.
diff --git a/tests/mock/mock_dcp_consumer.h b/tests/mock/mock_dcp_consumer.h
new file mode 100644 (file)
index 0000000..1c50960
--- /dev/null
@@ -0,0 +1,44 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2016 Couchbase, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#pragma once
+
+#include "dcp-consumer.h"
+
+/*
+ * Mock of the DcpConsumer class.  Wraps the real DcpConsumer class
+ * and provides get/set access to lastMessageTime.
+ */
+class MockDcpConsumer: public DcpConsumer {
+public:
+    MockDcpConsumer(EventuallyPersistentEngine &theEngine, const void *cookie,
+                    const std::string &name)
+    : DcpConsumer(theEngine, cookie, name)
+    {}
+
+    void setLastMessageTime(const rel_time_t timeValue) {
+        lastMessageTime = timeValue;
+    }
+
+    rel_time_t getLastMessageTime() {
+        return lastMessageTime;
+    }
+
+    passive_stream_t getVbucketStream(uint16_t vbid) {
+        return streams[vbid];
+    }
+};
diff --git a/tests/mock/mock_dcp_producer.h b/tests/mock/mock_dcp_producer.h
new file mode 100644 (file)
index 0000000..72887ca
--- /dev/null
@@ -0,0 +1,57 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2016 Couchbase, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#pragma once
+
+#include "dcp-producer.h"
+
+/*
+ * Mock of the DcpProducer class.  Wraps the real DcpProducer, but exposes
+ * normally protected methods publically for test purposes.
+ */
+class MockDcpProducer: public DcpProducer {
+public:
+    MockDcpProducer(EventuallyPersistentEngine &theEngine, const void *cookie,
+                    const std::string &name, bool isNotifier)
+    : DcpProducer(theEngine, cookie, name, isNotifier)
+    {}
+
+    ENGINE_ERROR_CODE maybeSendNoop(struct dcp_message_producers* producers)
+    {
+        return DcpProducer::maybeSendNoop(producers);
+    }
+
+    void setNoopSendTime(const rel_time_t timeValue) {
+        noopCtx.sendTime = timeValue;
+    }
+
+    rel_time_t getNoopSendTime() {
+        return noopCtx.sendTime;
+    }
+
+    bool getNoopPendingRecv() {
+        return noopCtx.pendingRecv;
+    }
+
+    void setNoopEnabled(const bool booleanValue) {
+        noopCtx.enabled = booleanValue;
+    }
+
+    bool getNoopEnabled() {
+        return noopCtx.enabled;
+    }
+};
diff --git a/tests/module_tests/dcp_test.cc b/tests/module_tests/dcp_test.cc
new file mode 100644 (file)
index 0000000..4cca8e5
--- /dev/null
@@ -0,0 +1,558 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2016 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+/*
+ * Unit test for DCP-related classes.
+ *
+ * Due to the way our classes are structured, most of the different DCP classes
+ * need an instance of EventuallyPersistentStore & other related objects.
+ */
+
+#include "connmap.h"
+#include "dcp-stream.h"
+#include "dcp-response.h"
+#include "evp_engine_test.h"
+#include "programs/engine_testapp/mock_server.h"
+#include "../mock/mock_dcp.h"
+#include "../mock/mock_dcp_producer.h"
+#include "../mock/mock_dcp_consumer.h"
+
+#include <gtest/gtest.h>
+
+// Mock of the ActiveStream class. Wraps the real ActiveStream, but exposes
+// normally protected methods publically for test purposes.
+class MockActiveStream : public ActiveStream {
+public:
+    MockActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
+                     const std::string &name, uint32_t flags, uint32_t opaque,
+                     uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
+                     uint64_t vb_uuid, uint64_t snap_start_seqno,
+                     uint64_t snap_end_seqno)
+    : ActiveStream(e, p, name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
+                   snap_start_seqno, snap_end_seqno) {}
+
+    // Expose underlying protected ActiveStream methods as public
+    void public_getOutstandingItems(RCPtr<VBucket> &vb,
+                                    std::deque<queued_item> &items) {
+        getOutstandingItems(vb, items);
+    }
+
+    void public_processItems(std::deque<queued_item>& items) {
+        processItems(items);
+    }
+
+    bool public_nextCheckpointItem() {
+        return nextCheckpointItem();
+    }
+
+    const std::queue<DcpResponse*>& public_readyQ() {
+        return readyQ;
+    }
+
+    DcpResponse* public_nextQueuedItem() {
+        return nextQueuedItem();
+    }
+};
+
+/*
+ * Mock of the DcpConnMap class.  Wraps the real DcpConnMap, but exposes
+ * normally protected methods publically for test purposes.
+ */
+class MockDcpConnMap: public DcpConnMap {
+public:
+    MockDcpConnMap(EventuallyPersistentEngine &theEngine)
+    : DcpConnMap(theEngine)
+    {}
+
+    size_t getNumberOfDeadConnections() {
+        return deadConnections.size();
+    }
+
+    AtomicQueue<connection_t>& getPendingNotifications() {
+        return pendingNotifications;
+    }
+
+    void initialize(conn_notifier_type ntype) {
+        connNotifier_ = new ConnNotifier(ntype, *this);
+        // We do not create a ConnNotifierCallback task
+        // We do not create a ConnManager task
+        // The ConnNotifier is deleted in the DcpConnMap
+        // destructor
+    }
+};
+
+class DCPTest : public EventuallyPersistentEngineTest {
+protected:
+    void SetUp() {
+        EventuallyPersistentEngineTest::SetUp();
+
+        // Set AuxIO threads to zero, so that the producer's
+        // ActiveStreamCheckpointProcesserTask doesn't run.
+        ExecutorPool::get()->setMaxAuxIO(0);
+        // Set NonIO threads to zero, so the connManager
+        // task does not run.
+        ExecutorPool::get()->setMaxNonIO(0);
+    }
+
+    // Use TearDown from parent class
+};
+
+class StreamTest : public DCPTest {
+protected:
+    void TearDown() {
+        producer->cancelCheckpointProcessorTask();
+        // Destroy various engine objects
+        vb0.reset();
+        stream.reset();
+        producer.reset();
+        DCPTest::TearDown();
+    }
+
+    // Setup a DCP producer and attach a stream and cursor to it.
+    void setup_dcp_stream() {
+        producer = new DcpProducer(*engine, /*cookie*/NULL,
+                                   "test_producer", /*notifyOnly*/false);
+        stream = new MockActiveStream(engine, producer,
+                                      producer->getName(), /*flags*/0,
+                                      /*opaque*/0, vbid,
+                                      /*st_seqno*/0,
+                                      /*en_seqno*/~0,
+                                      /*vb_uuid*/0xabcd,
+                                      /*snap_start_seqno*/0,
+                                      /*snap_end_seqno*/~0);
+        vb0 = engine->getVBucket(0);
+        EXPECT_TRUE(vb0) << "Failed to get valid VBucket object for id 0";
+        EXPECT_FALSE(vb0->checkpointManager.registerTAPCursor(
+                producer->getName()))
+            << "Found an existing TAP cursor when attempting to register ours";
+    }
+
+    dcp_producer_t producer;
+    stream_t stream;
+    RCPtr<VBucket> vb0;
+};
+
+/* Regression test for MB-17766 - ensure that when an ActiveStream is preparing
+ * queued items to be sent out via a DCP consumer, that nextCheckpointItem()
+ * doesn't incorrectly return false (meaning that there are no more checkpoint
+ * items to send).
+ */
+TEST_F(StreamTest, test_mb17766) {
+
+    // Add an item.
+    store_item(vbid, "key", "value");
+
+    setup_dcp_stream();
+
+    // Should start with nextCheckpointItem() returning true.
+    MockActiveStream* mock_stream = static_cast<MockActiveStream*>(stream.get());
+    EXPECT_TRUE(mock_stream->public_nextCheckpointItem())
+        << "nextCheckpointItem() should initially be true.";
+
+    std::deque<queued_item> items;
+
+    // Get the set of outstanding items
+    mock_stream->public_getOutstandingItems(vb0, items);
+
+    // REGRESSION CHECK: nextCheckpointItem() should still return true
+    EXPECT_TRUE(mock_stream->public_nextCheckpointItem())
+        << "nextCheckpointItem() after getting outstanding items should be true.";
+
+    // Process the set of items
+    mock_stream->public_processItems(items);
+
+    // Should finish with nextCheckpointItem() returning false.
+    EXPECT_FALSE(mock_stream->public_nextCheckpointItem())
+        << "nextCheckpointItem() after processing items should be false.";
+}
+
+//
+// MB17653 test removed in 3.0.x backport, as MB not fixed in 3.0.x branch.
+//
+
+//
+// test_mb18625 test removed in 3.0.x backport, as MB not fixed in 3.0.x branch.
+//
+
+class ConnectionTest : public DCPTest {};
+
+ENGINE_ERROR_CODE mock_noop_return_engine_e2big(const void* cookie,uint32_t opaque) {
+    return ENGINE_E2BIG;
+}
+
+TEST_F(ConnectionTest, test_maybesendnoop_buffer_full) {
+    const void* cookie = create_mock_cookie();
+    // Create a Mock Dcp producer
+    MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+
+    struct dcp_message_producers producers = {NULL, NULL, NULL, NULL,
+        NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
+        mock_noop_return_engine_e2big, NULL, NULL};
+
+    producer.setNoopEnabled(true);
+    producer.setNoopSendTime(21);
+    ENGINE_ERROR_CODE ret = producer.maybeSendNoop(&producers);
+    EXPECT_EQ(ENGINE_E2BIG, ret)
+    << "maybeSendNoop not returning ENGINE_E2BIG";
+    EXPECT_FALSE(producer.getNoopPendingRecv())
+    << "Waiting for noop acknowledgement";
+    EXPECT_EQ(21, producer.getNoopSendTime())
+    << "SendTime has been updated";
+    destroy_mock_cookie(cookie);
+}
+
+TEST_F(ConnectionTest, test_maybesendnoop_send_noop) {
+    const void* cookie = create_mock_cookie();
+    // Create a Mock Dcp producer
+    MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+
+    dcp_message_producers* producers = get_dcp_producers();
+    producer.setNoopEnabled(true);
+    producer.setNoopSendTime(21);
+    ENGINE_ERROR_CODE ret = producer.maybeSendNoop(producers);
+    EXPECT_EQ(ENGINE_WANT_MORE, ret)
+    << "maybeSendNoop not returning ENGINE_WANT_MORE";
+    EXPECT_TRUE(producer.getNoopPendingRecv())
+    << "Not waiting for noop acknowledgement";
+    EXPECT_NE(21, producer.getNoopSendTime())
+    << "SendTime has not been updated";
+    destroy_mock_cookie(cookie);
+
+    free(producers);
+}
+
+TEST_F(ConnectionTest, test_maybesendnoop_noop_already_pending) {
+    const void* cookie = create_mock_cookie();
+    // Create a Mock Dcp producer
+    MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+
+    dcp_message_producers* producers = get_dcp_producers();
+    producer.setNoopEnabled(true);
+    producer.setNoopSendTime(21);
+    ENGINE_ERROR_CODE ret = producer.maybeSendNoop(producers);
+    EXPECT_EQ(ENGINE_WANT_MORE, ret)
+    << "maybeSendNoop not returning ENGINE_WANT_MORE";
+    EXPECT_TRUE(producer.getNoopPendingRecv())
+    << "Not awaiting noop acknowledgement";
+    EXPECT_NE(21, producer.getNoopSendTime())
+    << "SendTime has not been updated";
+    producer.setNoopSendTime(21);
+    ENGINE_ERROR_CODE ret2 = producer.maybeSendNoop(producers);
+    EXPECT_EQ(ENGINE_DISCONNECT, ret2)
+     << "maybeSendNoop not returning ENGINE_DISCONNECT";
+    EXPECT_TRUE(producer.getNoopPendingRecv())
+    << "Not waiting for noop acknowledgement";
+    EXPECT_EQ(21, producer.getNoopSendTime())
+    << "SendTime has been updated";
+    destroy_mock_cookie(cookie);
+
+    free(producers);
+}
+
+TEST_F(ConnectionTest, test_maybesendnoop_not_enabled) {
+    const void* cookie = create_mock_cookie();
+    // Create a Mock Dcp producer
+    MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+
+    dcp_message_producers* producers = get_dcp_producers();
+    producer.setNoopEnabled(false);
+    producer.setNoopSendTime(21);
+    ENGINE_ERROR_CODE ret = producer.maybeSendNoop(producers);
+    EXPECT_EQ(ENGINE_FAILED, ret)
+    << "maybeSendNoop not returning ENGINE_FAILED";
+    EXPECT_FALSE(producer.getNoopPendingRecv())
+    << "Waiting for noop acknowledgement";
+    EXPECT_EQ(21, producer.getNoopSendTime())
+    << "SendTime has been updated";
+    destroy_mock_cookie(cookie);
+
+    free(producers);
+}
+
+TEST_F(ConnectionTest, test_maybesendnoop_not_sufficient_time_passed) {
+    const void* cookie = create_mock_cookie();
+    // Create a Mock Dcp producer
+    MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+
+    dcp_message_producers* producers = get_dcp_producers();
+    producer.setNoopEnabled(true);
+    rel_time_t current_time = ep_current_time();
+    producer.setNoopSendTime(current_time);
+    ENGINE_ERROR_CODE ret = producer.maybeSendNoop(producers);
+    EXPECT_EQ(ENGINE_FAILED, ret)
+    << "maybeSendNoop not returning ENGINE_FAILED";
+    EXPECT_FALSE(producer.getNoopPendingRecv())
+    << "Waiting for noop acknowledgement";
+    EXPECT_EQ(current_time, producer.getNoopSendTime())
+    << "SendTime has been incremented";
+    destroy_mock_cookie(cookie);
+
+    free(producers);
+}
+
+TEST_F(ConnectionTest, test_deadConnections) {
+    MockDcpConnMap connMap(*engine);
+    connMap.initialize(DCP_CONN_NOTIFIER);
+    const void *cookie = create_mock_cookie();
+    // Create a new Dcp producer
+    dcp_producer_t producer = connMap.newProducer(cookie, "test_producer",
+                                    /*notifyOnly*/false);
+
+    // Disconnect the producer connection
+    connMap.disconnect(cookie);
+    EXPECT_EQ(1, connMap.getNumberOfDeadConnections())
+        << "Unexpected number of dead connections";
+    connMap.manageConnections();
+    // Should be zero deadConnections
+    EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
+        << "Dead connections still remain";
+}
+
+//
+// test_mb17042* removed in 3.0.x backport, as MB not fixed in 3.0.x
+//
+
+TEST_F(ConnectionTest, test_update_of_last_message_time_in_consumer) {
+    const void* cookie = create_mock_cookie();
+    // Create a Mock Dcp consumer
+    MockDcpConsumer *consumer = new MockDcpConsumer(*engine, cookie, "test_consumer");
+    consumer->setLastMessageTime(1234);
+    consumer->addStream(/*opaque*/0, /*vbucket*/0, /*flags*/0);
+    EXPECT_NE(1234, consumer->getLastMessageTime())
+        << "lastMessagerTime not updated for addStream";
+    consumer->setLastMessageTime(1234);
+    consumer->closeStream(/*opaque*/0, /*vbucket*/0);
+    EXPECT_NE(1234, consumer->getLastMessageTime())
+        << "lastMessagerTime not updated for closeStream";
+    consumer->setLastMessageTime(1234);
+    consumer->streamEnd(/*opaque*/0, /*vbucket*/0, /*flags*/0);
+    EXPECT_NE(1234, consumer->getLastMessageTime())
+        << "lastMessagerTime not updated for streamEnd";
+    consumer->mutation(/*opaque*/0,
+                       /*key*/NULL,
+                       /*nkey*/0,
+                       /*value*/NULL,
+                       /*nvalue*/0,
+                       /*cas*/0,
+                       /*vbucket*/0,
+                       /*flags*/0,
+                       /*datatype*/0,
+                       /*locktime*/0,
+                       /*bySeqno*/0,
+                       /*revSeqno*/0,
+                       /*exprtime*/0,
+                       /*nru*/0,
+                       /*meta*/NULL,
+                       /*nmeta*/0);
+    EXPECT_NE(1234, consumer->getLastMessageTime())
+        << "lastMessagerTime not updated for mutation";
+    consumer->setLastMessageTime(1234);
+    consumer->deletion(/*opaque*/0,
+                       /*key*/NULL,
+                       /*nkey*/0,
+                       /*cas*/0,
+                       /*vbucket*/0,
+                       /*bySeqno*/0,
+                       /*revSeqno*/0,
+                       /*meta*/NULL,
+                       /*nmeta*/0);
+    EXPECT_NE(1234, consumer->getLastMessageTime())
+        << "lastMessagerTime not updated for deletion";
+    consumer->setLastMessageTime(1234);
+    consumer->expiration(/*opaque*/0,
+                         /*key*/NULL,
+                         /*nkey*/0,
+                         /*cas*/0,
+                         /*vbucket*/0,
+                         /*bySeqno*/0,
+                         /*revSeqno*/0,
+                         /*meta*/NULL,
+                         /*nmeta*/0);
+    EXPECT_NE(1234, consumer->getLastMessageTime())
+        << "lastMessagerTime not updated for expiration";
+    consumer->setLastMessageTime(1234);
+    consumer->snapshotMarker(/*opaque*/0,
+                             /*vbucket*/0,
+                             /*start_seqno*/0,
+                             /*end_seqno*/0,
+                             /*flags*/0);
+    EXPECT_NE(1234, consumer->getLastMessageTime())
+        << "lastMessagerTime not updated for snapshotMarker";
+    consumer->setLastMessageTime(1234);
+    consumer->noop(/*opaque*/0);
+    EXPECT_NE(1234, consumer->getLastMessageTime())
+        << "lastMessagerTime not updated for noop";
+    consumer->setLastMessageTime(1234);
+    consumer->flush(/*opaque*/0, /*vbucket*/0);
+    EXPECT_NE(1234, consumer->getLastMessageTime())
+        << "lastMessagerTime not updated for flush";
+    consumer->setLastMessageTime(1234);
+    consumer->setVBucketState(/*opaque*/0,
+                              /*vbucket*/0,
+                              /*state*/vbucket_state_active);
+    EXPECT_NE(1234, consumer->getLastMessageTime())
+        << "lastMessagerTime not updated for setVBucketState";
+    destroy_mock_cookie(cookie);
+}
+
+class NotifyTest : public DCPTest {
+protected:
+    void SetUp() {
+        // The test is going to replace a server API method, we must
+        // be able to undo that
+        sapi = *get_mock_server_api();
+        scookie_api = *get_mock_server_api()->cookie;
+        DCPTest::SetUp();
+    }
+
+    void TearDown() {
+        // Reset the server_api for other tests
+        *get_mock_server_api() = sapi;
+        *get_mock_server_api()->cookie = scookie_api;
+        DCPTest::TearDown();
+    }
+
+    SERVER_HANDLE_V1 sapi;
+    SERVER_COOKIE_API scookie_api;
+    dcp_producer_t producer;
+    int callbacks;
+};
+
+class ConnMapNotifyTest {
+public:
+    ConnMapNotifyTest(EventuallyPersistentEngine& engine)
+        : connMap(new MockDcpConnMap(engine)),
+          callbacks(0) {
+        connMap->initialize(DCP_CONN_NOTIFIER);
+
+        // Use 'this' instead of a mock cookie
+        producer = connMap->newProducer(static_cast<void*>(this),
+                                        "test_producer",
+                                        /*notifyOnly*/false);
+    }
+
+    ~ConnMapNotifyTest() {
+        delete connMap;
+    }
+
+    void notify() {
+        callbacks++;
+        connMap->notifyPausedConnection(producer.get(), /*schedule*/true);
+    }
+
+    int getCallbacks() {
+        return callbacks;
+    }
+
+
+    static void dcp_test_notify_io_complete(const void *cookie,
+                                            ENGINE_ERROR_CODE status) {
+        const ConnMapNotifyTest* notifyTest = reinterpret_cast<const ConnMapNotifyTest*>(cookie);
+        // 3. Call notifyPausedConnection again. We're now interleaved inside
+        //    of notifyAllPausedConnections, a second notification should occur.
+        const_cast<ConnMapNotifyTest*>(notifyTest)->notify();
+    }
+
+    MockDcpConnMap* connMap;
+    dcp_producer_t producer;
+
+private:
+    int callbacks;
+
+};
+
+
+TEST_F(NotifyTest, test_mb19503_connmap_notify) {
+    ConnMapNotifyTest notifyTest(*engine);
+
+    // Hook into notify_io_complete
+    SERVER_COOKIE_API* scapi = get_mock_server_api()->cookie;
+    scapi->notify_io_complete = ConnMapNotifyTest::dcp_test_notify_io_complete;
+
+    // Should be 0 when we begin
+    ASSERT_EQ(0, notifyTest.getCallbacks());
+    ASSERT_TRUE(notifyTest.producer->isPaused());
+    ASSERT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
+
+    // 1. Call notifyPausedConnection with schedule = true
+    //    this will queue the producer
+    notifyTest.connMap->notifyPausedConnection(notifyTest.producer.get(),
+                                               /*schedule*/true);
+    EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
+
+    // 2. Call notifyAllPausedConnections this will invoke notifyIOComplete
+    //    which we've hooked into. For step 3 go to dcp_test_notify_io_complete
+    notifyTest.connMap->notifyAllPausedConnections();
+
+    // 2.1 One callback should of occurred, and we should still have one
+    //     notification pending (see dcp_test_notify_io_complete).
+    EXPECT_EQ(1, notifyTest.getCallbacks());
+    EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
+
+    // 4. Call notifyAllPausedConnections again, is there a new connection?
+    notifyTest.connMap->notifyAllPausedConnections();
+
+    // 5. There should of been 2 callbacks
+    EXPECT_EQ(2, notifyTest.getCallbacks());
+}
+
+// Variation on test_mb19503_connmap_notify - check that notification is correct
+// when notifiable is not paused.
+TEST_F(NotifyTest, test_mb19503_connmap_notify_paused) {
+    ConnMapNotifyTest notifyTest(*engine);
+
+    // Hook into notify_io_complete
+    SERVER_COOKIE_API* scapi = get_mock_server_api()->cookie;
+    scapi->notify_io_complete = ConnMapNotifyTest::dcp_test_notify_io_complete;
+
+    // Should be 0 when we begin
+    ASSERT_EQ(notifyTest.getCallbacks(), 0);
+    ASSERT_TRUE(notifyTest.producer->isPaused());
+    ASSERT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
+
+    // 1. Call notifyPausedConnection with schedule = true
+    //    this will queue the producer
+    notifyTest.connMap->notifyPausedConnection(notifyTest.producer.get(),
+                                               /*schedule*/true);
+    EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
+
+    // 2. Mark connection as not paused.
+    notifyTest.producer->setPaused(false);
+
+    // 3. Call notifyAllPausedConnections - as the connection is not paused
+    // this should *not* invoke notifyIOComplete.
+    notifyTest.connMap->notifyAllPausedConnections();
+
+    // 3.1 Should have not had any callbacks.
+    EXPECT_EQ(0, notifyTest.getCallbacks());
+    // 3.2 Should have no pending notifications.
+    EXPECT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
+
+    // 4. Now mark the connection as paused.
+    ASSERT_FALSE(notifyTest.producer->isPaused());
+    notifyTest.producer->setPaused(true);
+
+    // 4. Add another notification - should queue the producer again.
+    notifyTest.connMap->notifyPausedConnection(notifyTest.producer.get(),
+                                               /*schedule*/true);
+    EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
+
+    // 5. Call notifyAllPausedConnections a second time - as connection is
+    //    paused this time we *should* get a callback.
+    notifyTest.connMap->notifyAllPausedConnections();
+    EXPECT_EQ(1, notifyTest.getCallbacks());
+}
diff --git a/tests/module_tests/ep_unit_tests_main.cc b/tests/module_tests/ep_unit_tests_main.cc
new file mode 100644 (file)
index 0000000..2f299d6
--- /dev/null
@@ -0,0 +1,46 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2016 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+/*
+ * Main function & globals for the ep_unit_test target.
+ */
+
+#include <memcached/extension_loggers.h>
+#include "programs/engine_testapp/mock_server.h"
+
+#include <getopt.h>
+#include <gtest/gtest.h>
+
+#include "configuration.h"
+#include "stored-value.h"
+
+/* static storage for environment variable set by putenv(). */
+static char allow_no_stats_env[] = "ALLOW_NO_STATS_UPDATE=yeah";
+
+int main(int argc, char **argv) {
+    putenv(allow_no_stats_env);
+
+    init_mock_server(false);
+
+    // Default number of hashtable locks is too large for TSan to
+    // track. Use the value in configuration.json (47 at time of
+    // writing).
+    HashTable::setDefaultNumLocks(Configuration().getHtLocks());
+
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/tests/module_tests/evp_engine_test.cc b/tests/module_tests/evp_engine_test.cc
new file mode 100644 (file)
index 0000000..b365a0f
--- /dev/null
@@ -0,0 +1,76 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2016 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+/*
+ * Unit tests for the EventuallyPersistentEngine class.
+ */
+
+#include "evp_engine_test.h"
+
+#include "ep_engine.h"
+#include "programs/engine_testapp/mock_server.h"
+#include <platform/dirutils.h>
+
+void EventuallyPersistentEngineTest::SetUp() {
+    // Paranoia - kill any existing files in case they are left over
+    // from a previous run.
+    CouchbaseDirectoryUtilities::rmrf(test_dbname);
+
+    vbid = 0;
+
+    // Setup an engine with a single active vBucket.
+    EXPECT_EQ(ENGINE_SUCCESS,
+              create_instance(1, get_mock_server_api, &handle))
+        << "Failed to create ep engine instance";
+    EXPECT_EQ(1, handle->interface) << "Unexpected engine handle version";
+    engine_v1 = reinterpret_cast<ENGINE_HANDLE_V1*>(handle);
+
+    engine = reinterpret_cast<EventuallyPersistentEngine*>(handle);
+    ObjectRegistry::onSwitchThread(engine);
+    std::string config = "dbname=" + std::string(test_dbname);
+    EXPECT_EQ(ENGINE_SUCCESS, engine->initialize(config.c_str()))
+        << "Failed to initialize engine.";
+
+    engine->setVBucketState(vbid, vbucket_state_active, false);
+
+    // Wait for warmup to complete.
+    while (engine->getEpStore()->isWarmingUp()) {
+        usleep(10);
+    }
+}
+
+void EventuallyPersistentEngineTest::TearDown() {
+    // Need to force the destroy (i.e. pass true) because
+    // NonIO threads may have been disabled (see DCPTest subclass).
+    engine_v1->destroy(handle, true);
+    destroy_mock_event_callbacks();
+    destroy_engine();
+    // Cleanup any files we created.
+    CouchbaseDirectoryUtilities::rmrf(test_dbname);
+}
+
+void EventuallyPersistentEngineTest::store_item(uint16_t vbid,
+                                                const std::string& key,
+                                                const std::string& value) {
+    Item item(key.c_str(), key.size(), /*flags*/0, /*exp*/0, value.c_str(),
+              value.size());
+    uint64_t cas;
+    EXPECT_EQ(ENGINE_SUCCESS,
+              engine->store(NULL, &item, &cas, OPERATION_SET, vbid));
+}
+
+const char EventuallyPersistentEngineTest::test_dbname[] = "ep_engine_ep_unit_tests_db";
diff --git a/tests/module_tests/evp_engine_test.h b/tests/module_tests/evp_engine_test.h
new file mode 100644 (file)
index 0000000..4bc1247
--- /dev/null
@@ -0,0 +1,54 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2016 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+/*
+ * Unit tests for the EventuallyPersistentEngine class.
+ */
+
+#pragma once
+
+#include "config.h"
+
+#include <memcached/engine.h>
+
+#include <gtest/gtest.h>
+
+class EventuallyPersistentEngine;
+
+class EventuallyPersistentEngineTest : public ::testing::Test {
+protected:
+
+    void SetUp();
+
+    void TearDown();
+
+    /* Helper methods for tests */
+
+    /* Stores an item into the given vbucket. */
+    void store_item(uint16_t vbid, const std::string& key,
+                    const std::string& value);
+
+    std::string config_string;
+
+    static const char test_dbname[];
+
+    uint16_t vbid;
+
+    ENGINE_HANDLE* handle;
+    ENGINE_HANDLE_V1* engine_v1;
+    EventuallyPersistentEngine* engine;
+};
diff --git a/tests/module_tests/evp_store_single_threaded_test.cc b/tests/module_tests/evp_store_single_threaded_test.cc
new file mode 100644 (file)
index 0000000..0115490
--- /dev/null
@@ -0,0 +1,33 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2013 Couchbase, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#include "evp_store_test.h"
+
+#include "fakes/fake_executorpool.h"
+#include "taskqueue.h"
+
+/*
+ * A subclass of EventuallyPersistentStoreTest which uses a fake ExecutorPool,
+ * which will not spawn ExecutorThreads and hence not run any tasks
+ * automatically in the background. All tasks must be manually run().
+ */
+class SingleThreadedEPStoreTest : public EventuallyPersistentStoreTest {
+    void SetUp() {
+        SingleThreadedExecutorPool::replaceExecutorPoolWithFake();
+        EventuallyPersistentStoreTest::SetUp();
+    }
+};
diff --git a/tests/module_tests/evp_store_test.cc b/tests/module_tests/evp_store_test.cc
new file mode 100644 (file)
index 0000000..ef1408f
--- /dev/null
@@ -0,0 +1,165 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2016 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+/*
+ * Unit tests for the EventuallyPersistentStore class.
+ *
+ * Note that these test do *not* have the normal Tasks running (BGFetcher,
+ * flusher etc) as we do not initialise EPEngine. This means that such tasks
+ * need to be manually run. This can be very helpful as it essentially gives us
+ * synchronous control of EPStore.
+ */
+
+#include "evp_store_test.h"
+
+#include "bgfetcher.h"
+#include "checkpoint.h"
+#include "checkpoint_remover.h"
+#include "connmap.h"
+#include "ep_engine.h"
+#include "flusher.h"
+#include "../mock/mock_dcp_producer.h"
+
+#include "programs/engine_testapp/mock_server.h"
+#include <platform/dirutils.h>
+
+SynchronousEPEngine::SynchronousEPEngine(const std::string& extra_config)
+    : EventuallyPersistentEngine(get_mock_server_api) {
+    maxFailoverEntries = 1;
+
+    // Merge any extra config into the main configuration.
+    if (extra_config.size() > 0) {
+        if (!configuration.parseConfiguration(extra_config.c_str(),
+                                              serverApi)) {
+            throw std::invalid_argument("Unable to parse config string: " +
+                                        extra_config);
+        }
+    }
+
+    // workload is needed by EPStore's constructor (to construct the
+    // VBucketMap).
+    workload = new WorkLoadPolicy(/*workers*/1, /*shards*/1);
+
+    // dcpConnMap_ is needed by EPStore's constructor.
+    dcpConnMap_ = new DcpConnMap(*this);
+
+    // tapConnMap is needed by queueDirty.
+    tapConnMap = new TapConnMap(*this);
+
+    // checkpointConfig is needed by CheckpointManager (via EPStore).
+    checkpointConfig = new CheckpointConfig(*this);
+}
+
+void SynchronousEPEngine::setEPStore(EventuallyPersistentStore* store) {
+    cb_assert(epstore == NULL);
+    epstore = store;
+}
+
+MockEPStore::MockEPStore(EventuallyPersistentEngine &theEngine)
+    : EventuallyPersistentStore(theEngine) {
+    // Perform a limited set of setup (normally done by EPStore::initialize) -
+    // enough such that objects which are assumed to exist are present.
+
+    // Create the closed checkpoint removed task. Note we do _not_ schedule
+    // it, unlike EPStore::initialize
+    chkTask = new ClosedUnrefCheckpointRemoverTask
+            (&engine, stats, theEngine.getConfiguration().getChkRemoverStime());
+}
+
+VBucketMap& MockEPStore::getVbMap() {
+    return vbMap;
+}
+
+/* Mock Task class. Doesn't actually run() or snooze() - they both do nothing.
+ */
+class MockGlobalTask : public GlobalTask {
+public:
+    MockGlobalTask(EventuallyPersistentEngine* e, const Priority &p)
+        : GlobalTask(e, p) {}
+
+    bool run() { return false; }
+    std::string getDescription() { return "MockGlobalTask"; }
+
+    void snooze(const double secs) {}
+};
+
+void EventuallyPersistentStoreTest::SetUp() {
+    // Paranoia - kill any existing files in case they are left over
+    // from a previous run.
+    CouchbaseDirectoryUtilities::rmrf(test_dbname);
+
+    // Add dbname to config string.
+    std::string config = config_string;
+    if (config.size() > 0) {
+        config += ";";
+    }
+    config += "dbname=" + std::string(test_dbname);
+
+    vbid = 0;
+
+    engine = new SynchronousEPEngine(config);
+    ObjectRegistry::onSwitchThread(engine);
+
+    store = new MockEPStore(*engine);
+    engine->setEPStore(store);
+
+    // Ensure that EPEngine is hold about necessary server callbacks
+    // (client disconnect, bucket delete).
+    engine->public_initializeEngineCallbacks();
+
+    // Need to initialize ep_real_time and friends.
+    initialize_time_functions(get_mock_server_api()->core);
+
+    cookie = create_mock_cookie();
+}
+
+void EventuallyPersistentStoreTest::TearDown() {
+    destroy_mock_cookie(cookie);
+    destroy_mock_event_callbacks();
+    engine->getDcpConnMap().manageConnections();
+
+    // Need to have the current engine valid before deleting (this is what
+    // EvpDestroy does normally; however we have a smart ptr to the engine
+    // so must delete via that).
+    ObjectRegistry::onSwitchThread(engine);
+    delete engine;
+
+    // Shutdown the ExecutorPool singleton (initialized when we create
+    // an EventuallyPersistentStore object). Must happen after engine
+    // has been destroyed (to allow the tasks the engine has
+    // registered a chance to be unregistered).
+    ExecutorPool::shutdown();
+}
+
+void EventuallyPersistentStoreTest::store_item(uint16_t vbid,
+                                               const std::string& key,
+                                               const std::string& value) {
+    Item item(key.c_str(), key.size(), /*flags*/0, /*exp*/0, value.c_str(),
+              value.size());
+    item.setVBucketId(vbid);
+    EXPECT_EQ(ENGINE_SUCCESS, store->set(item, NULL));
+}
+
+
+//
+// EPStoreEvictionTest disabled in 3.0.x backport - there's an unknown
+// bug where onSwitchThread() ends up NULL, meaning that we eventually hit
+// an assert and crash.
+//
+
+
+const char EventuallyPersistentStoreTest::test_dbname[] = "ep_engine_ep_unit_tests_db";
diff --git a/tests/module_tests/evp_store_test.h b/tests/module_tests/evp_store_test.h
new file mode 100644 (file)
index 0000000..aa5a396
--- /dev/null
@@ -0,0 +1,113 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2016 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+/*
+ * Unit tests for the EventuallyPersistentStore class.
+ */
+
+#pragma once
+
+#include "config.h"
+
+#include "ep.h"
+#include "ep_engine.h"
+
+#include <memcached/engine.h>
+
+#include <gtest/gtest.h>
+#include <memory>
+
+class MockEPStore;
+
+/* A class which subclasses the real EPEngine. Its main purpose is to allow
+ * us to construct and setup an EPStore without starting all the various
+ * background tasks which are normally started by EPEngine as part of creating
+ * EPStore (in the initialize() method).
+ *
+ * The net result is a (mostly) synchronous environment - while the
+ * ExecutorPool's threads exist, none of the normally-created background Tasks
+ * should be running. Note however that /if/ any new tasks are created, they
+ * will be scheduled on the ExecutorPools' threads asynchronously.
+ */
+class SynchronousEPEngine : public EventuallyPersistentEngine {
+public:
+    SynchronousEPEngine(const std::string& extra_config);
+
+    void setEPStore(EventuallyPersistentStore* store);
+
+    /* Allow us to call normally protected methods */
+
+    ENGINE_ERROR_CODE public_doTapVbTakeoverStats(const void *cookie,
+                                                  ADD_STAT add_stat,
+                                                  std::string& key,
+                                                  uint16_t vbid) {
+        return doTapVbTakeoverStats(cookie, add_stat, key, vbid);
+    }
+
+    ENGINE_ERROR_CODE public_doDcpVbTakeoverStats(const void *cookie,
+                                                  ADD_STAT add_stat,
+                                                  std::string& key,
+                                                  uint16_t vbid) {
+        return doDcpVbTakeoverStats(cookie, add_stat, key, vbid);
+    }
+
+    void public_initializeEngineCallbacks() {
+        return initializeEngineCallbacks();
+    }
+};
+
+/* Subclass of EPStore to expose normally non-public members for test
+ * purposes.
+ */
+class MockEPStore : public EventuallyPersistentStore {
+public:
+    MockEPStore(EventuallyPersistentEngine &theEngine);
+
+    VBucketMap& getVbMap();
+
+    void public_stopWarmup() {
+        stopWarmup();
+    }
+};
+
+/* Actual test fixture class */
+class EventuallyPersistentStoreTest : public ::testing::Test {
+protected:
+    void SetUp();
+
+    void TearDown();
+
+    /* Stores an item into the given vbucket. */
+    void store_item(uint16_t vbid, const std::string& key,
+                    const std::string& value);
+
+    static const char test_dbname[];
+
+    std::string config_string;
+
+    uint16_t vbid;
+
+    // The mock engine (needed to construct the store).
+    SynchronousEPEngine* engine;
+
+    // The store under test. Wrapped in a mock to expose some normally
+    // protected members. Uses a raw pointer as this is owned by the engine.
+    MockEPStore* store;
+
+    // The (mock) server cookie.
+    const void* cookie;
+};
diff --git a/tests/module_tests/stream_test.cc b/tests/module_tests/stream_test.cc
deleted file mode 100644 (file)
index b25a555..0000000
+++ /dev/null
@@ -1,165 +0,0 @@
-/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/*
- *     Copyright 2016 Couchbase, Inc
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-#include "dcp-producer.h"
-#include "dcp-stream.h"
-#include "../programs/engine_testapp/mock_server.h"
-
-#include <platform/dirutils.h>
-
-// Simple backport of the GTest-style EXPECT_EQ macro.
-#define EXPECT_EQ(a, b, msg) \
-    do { \
-        if ((a) != (b)) { \
-            throw std::runtime_error(msg); \
-        } \
-    } while(0)
-
-// Mock of the ActiveStream class. Wraps the real ActiveStream, but exposes
-// normally protected methods publically for test purposes.
-class MockActiveStream : public ActiveStream {
-public:
-    MockActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
-                     const std::string &name, uint32_t flags, uint32_t opaque,
-                     uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
-                     uint64_t vb_uuid, uint64_t snap_start_seqno,
-                     uint64_t snap_end_seqno)
-    : ActiveStream(e, p, name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
-                   snap_start_seqno, snap_end_seqno) {}
-
-    // Expose underlying protected ActiveStream methods as public
-    void public_getOutstandingItems(RCPtr<VBucket> &vb,
-                                    std::deque<queued_item> &items) {
-        getOutstandingItems(vb, items);
-    }
-
-    void public_processItems(std::deque<queued_item>& items) {
-        processItems(items);
-    }
-
-    bool public_nextCheckpointItem() {
-        return nextCheckpointItem();
-    }
-};
-
-/* Regression test for MB-17766 - ensure that when an ActiveStream is preparing
- * queued items to be sent out via a DCP consumer, that nextCheckpointItem()
- * doesn't incorrectly return false (meaning that there are no more checkpoint
- * items to send).
- */
-static void test_mb17766(const std::string& test_dbname) {
-    // Setup an engine with a single active vBucket.
-    ENGINE_HANDLE* handle;
-    EXPECT_EQ(ENGINE_SUCCESS,
-              create_instance(1, get_mock_server_api, &handle),
-              "Failed to created ep engine instance");
-
-    // Init mock server to initialize time_mutex used in
-    // mock_get_current_time & mock_time_travel apis.
-    init_mock_server(handle);
-
-    EventuallyPersistentEngine* engine =
-            reinterpret_cast<EventuallyPersistentEngine*>(handle);
-    ObjectRegistry::onSwitchThread(engine);
-    std::string config = "dbname=" + test_dbname;
-    EXPECT_EQ(ENGINE_SUCCESS,
-              engine->initialize(config.c_str()),
-              "Failed to initialize engine.");
-
-    const uint16_t vbid = 0;
-    engine->setVBucketState(vbid, vbucket_state_active, false);
-
-    // Wait for warmup to complete.
-    while (engine->getEpStore()->isWarmingUp()) {
-        usleep(10);
-    }
-
-    // Set AuxIO threads to zero, so that the producer's
-    // ActiveStreamCheckpointProcesserTask doesn't run.
-    ExecutorPool::get()->setMaxAuxIO(0);
-
-    // Add an item.
-    std::string value("value");
-    Item item("key", /*flags*/0, /*exp*/0, value.c_str(), value.size());
-
-    uint64_t cas;
-    EXPECT_EQ(ENGINE_SUCCESS,
-              engine->store(NULL, &item, &cas, OPERATION_SET, vbid),
-              "Store failed");
-
-    // Create a DCP producer and register with checkpoint Manager.
-    dcp_producer_t producer = new DcpProducer(*engine, /*cookie*/NULL,
-                                              "test_mb_17766_producer",
-                                              /*notifyOnly*/false);
-    stream_t stream = new MockActiveStream(engine, producer,
-                                           producer->getName(),
-                                           /*flags*/0,
-                                            /*opaque*/0, vbid,
-                                            /*st_seqno*/0,
-                                            /*en_seqno*/~0,
-                                            /*vb_uuid*/0xabcd,
-                                            /*snap_start_seqno*/0,
-                                            /*snap_end_seqno*/~0);
-
-    RCPtr<VBucket> vb0 = engine->getVBucket(0);
-    EXPECT_EQ(true, vb0, "Failed to get valid VBucket object for id 0");
-    vb0->checkpointManager.registerTAPCursor(producer->getName());
-
-    // Should start with nextCheckpointItem() returning true.
-    MockActiveStream* mock_stream = static_cast<MockActiveStream*>(stream.get());
-    EXPECT_EQ(true,
-              mock_stream->public_nextCheckpointItem(),
-              "nextCheckpointItem() should initially be true.");
-
-    std::deque<queued_item> items;
-
-    // Get the set of outstanding items
-    mock_stream->public_getOutstandingItems(vb0, items);
-
-    // REGRESSION CHECK: nextCheckpointItem() should still return true
-    EXPECT_EQ(true,
-              mock_stream->public_nextCheckpointItem(),
-              "nextCheckpointItem() after getting outstanding items should be true.");
-
-    // Process the set of items
-    mock_stream->public_processItems(items);
-
-    // Should finish with nextCheckpointItem() returning false.
-    EXPECT_EQ(false,
-              mock_stream->public_nextCheckpointItem(),
-              "nextCheckpointItem() after processing items should be false.");
-
-    producer->cancelCheckpointProcessorTask();
-}
-
-int main(int argc, char **argv) {
-    (void)argc; (void)argv;
-    putenv(strdup("ALLOW_NO_STATS_UPDATE=yeah"));
-
-    bool success = true;
-    try {
-        test_mb17766("stream_test_db");
-    } catch (std::exception& e) {
-        std::cerr << "FAILED: " << e.what() << std::endl;
-        success = false;
-    }
-
-    // Cleanup any files we created.
-    CouchbaseDirectoryUtilities::rmrf("stream_test_db");
-
-    return success ? 0 : 1;
-}