${CMAKE_CURRENT_BINARY_DIR}/src
${SNAPPY_INCLUDE_DIR}
${Platform_SOURCE_DIR}/include
+ ${Memcached_SOURCE_DIR}
${Memcached_SOURCE_DIR}/include
${Couchstore_SOURCE_DIR}/include
${CMAKE_CURRENT_BINARY_DIR})
+INCLUDE_DIRECTORIES(AFTER
+ ${gtest_SOURCE_DIR}/include)
+
CHECK_INCLUDE_FILES("arpa/inet.h" HAVE_ARPA_INET_H)
CHECK_INCLUDE_FILES("unistd.h" HAVE_UNISTD_H)
CHECK_INCLUDE_FILES("netdb.h" HAVE_NETDB_H)
SET_TARGET_PROPERTIES(ep PROPERTIES PREFIX "")
TARGET_LINK_LIBRARIES(ep cJSON JSON_checker couchstore dirutils platform ${LIBEVENT_LIBRARIES})
-ADD_EXECUTABLE(ep-engine_stream_test
- tests/module_tests/stream_test.cc
+# Single executable containing all class-level unit tests involving
+# EventuallyPersistentEngine driven by GoogleTest.
+# (We end up compiling most of the src/ files of ep-engine for these unit tests,
+# so simpler / quicker just to link them into a single executable).
+ADD_EXECUTABLE(ep-engine_ep_unit_tests
+ tests/mock/mock_dcp.cc
+ tests/module_tests/ep_unit_tests_main.cc
+ tests/module_tests/dcp_test.cc
+ tests/module_tests/evp_engine_test.cc
+ tests/module_tests/evp_store_test.cc
+ tests/module_tests/evp_store_single_threaded_test.cc
src/access_scanner.cc
src/atomic.cc
src/backfill.cc
${KVSTORE_SOURCE}
${COUCH_KVSTORE_SOURCE}
${Memcached_SOURCE_DIR}/programs/engine_testapp/mock_server.c)
-TARGET_LINK_LIBRARIES(ep-engine_stream_test couchstore cJSON dirutils JSON_checker mcd_util platform)
+TARGET_LINK_LIBRARIES(ep-engine_ep_unit_tests couchstore cJSON dirutils gtest JSON_checker mcd_util platform)
ADD_EXECUTABLE(ep-engine_atomic_ptr_test
tests/module_tests/atomic_ptr_test.cc
ADD_TEST(ep-engine_atomic_test ep-engine_atomic_test)
ADD_TEST(ep-engine_checkpoint_test ep-engine_checkpoint_test)
ADD_TEST(ep-engine_chunk_creation_test ep-engine_chunk_creation_test)
+ADD_TEST(ep-engine_ep_unit_tests ep-engine_ep_unit_tests)
ADD_TEST(ep-engine_failover_table_test ep-engine_failover_table_test)
ADD_TEST(ep-engine_hash_table_test ep-engine_hash_table_test)
ADD_TEST(ep-engine_histo_test ep-engine_histo_test)
ADD_TEST(ep-engine_mutex_test ep-engine_mutex_test)
ADD_TEST(ep-engine_priority_test ep-engine_priority_test)
ADD_TEST(ep-engine_ringbuffer_test ep-engine_ringbuffer_test)
-ADD_TEST(ep-engine_stream_test ep-engine_stream_test)
ADD_LIBRARY(timing_tests SHARED tests/module_tests/timing_tests.cc)
SET_TARGET_PROPERTIES(timing_tests PROPERTIES PREFIX "")
#ifndef SRC_ATOMICQUEUE_H_
#define SRC_ATOMICQUEUE_H_ 1
+#include "atomic.h"
+
#ifdef _MSC_VER
#include <queue>
void BgFetcher::stop() {
bool inverse = true;
pendingFetch.compare_exchange_strong(inverse, false);
- cb_assert(taskId > 0);
ExecutorPool::get()->cancel(taskId);
}
};
ConnMap::ConnMap(EventuallyPersistentEngine &theEngine)
- : engine(theEngine) {
+ : engine(theEngine),
+ connNotifier_(NULL) {
Configuration &config = engine.getConfiguration();
vbConnLocks = new SpinLock[vbConnLockNum];
ConnMap::~ConnMap() {
delete [] vbConnLocks;
- connNotifier_->stop();
+ if (connNotifier_ != NULL) {
+ connNotifier_->stop();
+ }
delete connNotifier_;
}
void addStats(ADD_STAT add_stat, const void *c);
-private:
+protected:
bool isPassiveStreamConnected_UNLOCKED(uint16_t vbucket);
bool isStreamPresent(uint16_t vbucket);
-private:
+protected:
DcpResponse* getNextItem();
*/
void cancelCheckpointProcessorTask();
-private:
+protected:
/**
* DcpProducerReadyQueue is a std::queue wrapper for managing a
bool nextCheckpointItem();
-private:
-
void transitionState(stream_state_t newState);
DcpResponse* backfillPhase();
void warmupCompleted();
void stopWarmup(void);
-private:
-
void scheduleVBDeletion(RCPtr<VBucket> &vb,
const void* cookie,
double delay = 0);
}
delete inital_tracking;
- ep_current_time = api->core->get_current_time;
- ep_abs_time = api->core->abstime;
- ep_reltime = api->core->realtime;
+ initialize_time_functions(api->core);
*handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
return ENGINE_SUCCESS;
}
+ /*
+ This method is called prior to unloading of the shared-object.
+ Global clean-up should be performed from this method.
+ */
+ void destroy_engine() {
+ ExecutorPool::shutdown();
+ }
+
static bool EvpGetItemInfo(ENGINE_HANDLE *, const void *,
const item* itm, item_info *itm_info)
{
return ENGINE_ENOMEM;
}
- // Register the callback
- registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
+ initializeEngineCallbacks();
// Complete the initialization of the ep-store
if (!epstore->initialize()) {
return rv;
}
+void EventuallyPersistentEngine::initializeEngineCallbacks() {
+ // Register the callback
+ registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
+}
+
ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
uint32_t seqno,
uint16_t status,
ENGINE_ERROR_CODE create_instance(uint64_t interface,
GET_SERVER_API get_server_api,
ENGINE_HANDLE **handle);
+
+ EXPORT_FUNCTION
+ void destroy_engine(void);
+
void EvpNotifyPendingConns(void*arg);
}
getlMaxTimeout = value;
}
-private:
EventuallyPersistentEngine(GET_SERVER_API get_server_api);
friend ENGINE_ERROR_CODE create_instance(uint64_t interface,
GET_SERVER_API get_server_api,
// If this method returns NULL, you should return TAP_DISCONNECT
TapProducer* getTapProducer(const void *cookie);
+ // Initialize all required callbacks of this engine with the underlying
+ // server.
+ void initializeEngineCallbacks();
+
SERVER_HANDLE_V1 *serverApi;
EventuallyPersistentStore *epstore;
WorkLoadPolicy *workload;
return 0;
}
+
+void initialize_time_functions(const SERVER_CORE_API* core_api) {
+ if (ep_current_time == uninitialized_current_time) {
+ ep_current_time = core_api->get_current_time;
+ }
+ if (ep_abs_time == default_abs_time) {
+ ep_abs_time = core_api->abstime;
+ }
+ if (ep_reltime == default_reltime) {
+ ep_reltime = core_api->realtime;
+ }
+}
+
rel_time_t (*ep_current_time)(void) = uninitialized_current_time;
time_t (*ep_abs_time)(rel_time_t) = default_abs_time;
rel_time_t (*ep_reltime)(time_t) = default_reltime;
#include "config.h"
+#include <memcached/server_api.h>
#include <memcached/types.h>
#include <time.h>
extern "C" {
#endif
+/* Initializes the below time functions using the function pointers
+ * provided by the specified SERVER_CORE_API. This function should be
+ * called before attempting to use them, typically by the first engine
+ * loaded.
+ * Note: Only the first call to this function will have any effect,
+ * i.e. once initialized the functions should not be modified to
+ * prevent data races between the different threads which use them.
+ */
+void initialize_time_functions(const SERVER_CORE_API* core_api);
+
extern rel_time_t (*ep_current_time)(void);
extern time_t (*ep_abs_time)(rel_time_t);
extern rel_time_t (*ep_reltime)(time_t);
return instance;
}
+void ExecutorPool::shutdown(void) {
+ if (instance) {
+ delete instance;
+ instance = NULL;
+ }
+}
+
ExecutorPool::ExecutorPool(size_t maxThreads, size_t nTaskSets,
size_t maxReaders, size_t maxWriters,
size_t maxAuxIO, size_t maxNonIO) :
ExecutorPool::~ExecutorPool(void) {
delete [] curWorkers;
- free(maxWorkers);
+ delete[] maxWorkers;
+ delete[] numReadyTasks;
if (isHiPrioQset) {
for (size_t i = 0; i < numTaskSets; i++) {
delete hpTaskQ[i];
static ExecutorPool *get(void);
-private:
+ static void shutdown(void);
+
+protected:
ExecutorPool(size_t t, size_t nTaskSets, size_t r, size_t w, size_t a,
size_t n);
- ~ExecutorPool(void);
+ virtual ~ExecutorPool(void);
TaskQueue* _nextTask(ExecutorThread &t, uint8_t tick);
bool _cancel(size_t taskId, bool eraseTask=false);
bool _wake(size_t taskId);
- bool _startWorkers(void);
+ virtual bool _startWorkers(void);
bool _snooze(size_t taskId, double tosleep);
size_t _schedule(ExTask task, task_type_t qidx);
void _registerBucket(EventuallyPersistentEngine *engine);
const hrtime_t getCurTime(void) { return now; }
-private:
+protected:
cb_thread_t thread;
ExecutorPool *manager;
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2016 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * FakeExecutorPool / FakeExecutorThread
+ *
+ * A pair of classes which act as a fake ExecutorPool for testing purposes.
+ * Only executes tasks when explicitly told, and only on the main thread.
+ *
+ * See SingleThreadedEPStoreTest for basic usage.
+ *
+ * TODO: Improve usage documentation.
+ */
+
+#pragma once
+
+#include "executorpool.h"
+#include "executorthread.h"
+
+#include <gtest/gtest.h>
+
+class SingleThreadedExecutorPool : public ExecutorPool {
+public:
+
+ /* Registers an instance of this class as "the" executorpool (i.e. what
+ * you get when you call ExecutorPool::get()).
+ *
+ * This *must* be called before the normal ExecutorPool is created.
+ */
+ static void replaceExecutorPoolWithFake() {
+ LockHolder lh(initGuard);
+ ExecutorPool* tmp = ExecutorPool::instance;
+ if (tmp != NULL) {
+ throw std::runtime_error("replaceExecutorPoolWithFake: "
+ "ExecutorPool instance already created - cowardly refusing to continue!");
+ }
+
+ EventuallyPersistentEngine *epe =
+ ObjectRegistry::onSwitchThread(NULL, true);
+ tmp = new SingleThreadedExecutorPool(NUM_TASK_GROUPS);
+ ObjectRegistry::onSwitchThread(epe);
+ instance = tmp;
+ }
+
+ SingleThreadedExecutorPool(size_t nTaskSets)
+ : ExecutorPool(/*threads*/0, nTaskSets, 0, 0, 0, 0) {
+ }
+
+ bool _startWorkers() {
+ // Don't actually start any worker threads (all work will be done
+ // synchronously in the same thread) - but we do need to set
+ // maxWorkers to at least 1 otherwise ExecutorPool::tryNewWork() will
+ // never return any work.
+
+ maxWorkers[WRITER_TASK_IDX] = 1;
+ maxWorkers[READER_TASK_IDX] = 1;
+ maxWorkers[AUXIO_TASK_IDX] = 1;
+ maxWorkers[NONIO_TASK_IDX] = 1;
+
+ return true;
+ }
+
+ // Helper methods to access normally protected state of ExecutorPool
+
+ TaskQ& getLpTaskQ() {
+ return lpTaskQ;
+ }
+};
+
+/* A fake execution 'thread', to be used with the FakeExecutorPool Allows
+ * execution of tasks synchronously in the current thrad.
+ */
+class FakeExecutorThread : public ExecutorThread {
+public:
+ FakeExecutorThread(ExecutorPool* manager_, int startingQueue)
+ : ExecutorThread(manager_, startingQueue, "mock_executor") {
+ }
+
+ void runCurrentTask() {
+ // Only supports one-shot tasks
+ EXPECT_FALSE(currentTask->run());
+ manager->doneWork(curTaskType);
+ manager->cancel(currentTask->getId(), true);
+ }
+
+ ExTask& getCurrentTask() {
+ return currentTask;
+ }
+
+ void updateCurrentTime() {
+ now = gethrtime();
+ }
+};
}
void Flusher::wake(void) {
- cb_assert(taskId > 0);
- ExecutorPool::get()->wake(taskId);
+ // taskId becomes zero if the flusher were stopped
+ if (taskId > 0) {
+ ExecutorPool::get()->wake(taskId);
+ }
}
bool Flusher::step(GlobalTask *task) {
tracking = false;
cb_join_thread(statsThreadId);
}
+ free(stats.ext_stats);
instance = NULL;
}
/**
* Puts the task to sleep for a given duration.
*/
- void snooze(const double secs);
+ virtual void snooze(const double secs);
/**
* Returns the id of this task.
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2016 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "dcp-consumer.h"
+
+/*
+ * Mock of the DcpConsumer class. Wraps the real DcpConsumer class
+ * and provides get/set access to lastMessageTime.
+ */
+class MockDcpConsumer: public DcpConsumer {
+public:
+ MockDcpConsumer(EventuallyPersistentEngine &theEngine, const void *cookie,
+ const std::string &name)
+ : DcpConsumer(theEngine, cookie, name)
+ {}
+
+ void setLastMessageTime(const rel_time_t timeValue) {
+ lastMessageTime = timeValue;
+ }
+
+ rel_time_t getLastMessageTime() {
+ return lastMessageTime;
+ }
+
+ passive_stream_t getVbucketStream(uint16_t vbid) {
+ return streams[vbid];
+ }
+};
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2016 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "dcp-producer.h"
+
+/*
+ * Mock of the DcpProducer class. Wraps the real DcpProducer, but exposes
+ * normally protected methods publically for test purposes.
+ */
+class MockDcpProducer: public DcpProducer {
+public:
+ MockDcpProducer(EventuallyPersistentEngine &theEngine, const void *cookie,
+ const std::string &name, bool isNotifier)
+ : DcpProducer(theEngine, cookie, name, isNotifier)
+ {}
+
+ ENGINE_ERROR_CODE maybeSendNoop(struct dcp_message_producers* producers)
+ {
+ return DcpProducer::maybeSendNoop(producers);
+ }
+
+ void setNoopSendTime(const rel_time_t timeValue) {
+ noopCtx.sendTime = timeValue;
+ }
+
+ rel_time_t getNoopSendTime() {
+ return noopCtx.sendTime;
+ }
+
+ bool getNoopPendingRecv() {
+ return noopCtx.pendingRecv;
+ }
+
+ void setNoopEnabled(const bool booleanValue) {
+ noopCtx.enabled = booleanValue;
+ }
+
+ bool getNoopEnabled() {
+ return noopCtx.enabled;
+ }
+};
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2016 Couchbase, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Unit test for DCP-related classes.
+ *
+ * Due to the way our classes are structured, most of the different DCP classes
+ * need an instance of EventuallyPersistentStore & other related objects.
+ */
+
+#include "connmap.h"
+#include "dcp-stream.h"
+#include "dcp-response.h"
+#include "evp_engine_test.h"
+#include "programs/engine_testapp/mock_server.h"
+#include "../mock/mock_dcp.h"
+#include "../mock/mock_dcp_producer.h"
+#include "../mock/mock_dcp_consumer.h"
+
+#include <gtest/gtest.h>
+
+// Mock of the ActiveStream class. Wraps the real ActiveStream, but exposes
+// normally protected methods publically for test purposes.
+class MockActiveStream : public ActiveStream {
+public:
+ MockActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
+ const std::string &name, uint32_t flags, uint32_t opaque,
+ uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
+ uint64_t vb_uuid, uint64_t snap_start_seqno,
+ uint64_t snap_end_seqno)
+ : ActiveStream(e, p, name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
+ snap_start_seqno, snap_end_seqno) {}
+
+ // Expose underlying protected ActiveStream methods as public
+ void public_getOutstandingItems(RCPtr<VBucket> &vb,
+ std::deque<queued_item> &items) {
+ getOutstandingItems(vb, items);
+ }
+
+ void public_processItems(std::deque<queued_item>& items) {
+ processItems(items);
+ }
+
+ bool public_nextCheckpointItem() {
+ return nextCheckpointItem();
+ }
+
+ const std::queue<DcpResponse*>& public_readyQ() {
+ return readyQ;
+ }
+
+ DcpResponse* public_nextQueuedItem() {
+ return nextQueuedItem();
+ }
+};
+
+/*
+ * Mock of the DcpConnMap class. Wraps the real DcpConnMap, but exposes
+ * normally protected methods publically for test purposes.
+ */
+class MockDcpConnMap: public DcpConnMap {
+public:
+ MockDcpConnMap(EventuallyPersistentEngine &theEngine)
+ : DcpConnMap(theEngine)
+ {}
+
+ size_t getNumberOfDeadConnections() {
+ return deadConnections.size();
+ }
+
+ AtomicQueue<connection_t>& getPendingNotifications() {
+ return pendingNotifications;
+ }
+
+ void initialize(conn_notifier_type ntype) {
+ connNotifier_ = new ConnNotifier(ntype, *this);
+ // We do not create a ConnNotifierCallback task
+ // We do not create a ConnManager task
+ // The ConnNotifier is deleted in the DcpConnMap
+ // destructor
+ }
+};
+
+class DCPTest : public EventuallyPersistentEngineTest {
+protected:
+ void SetUp() {
+ EventuallyPersistentEngineTest::SetUp();
+
+ // Set AuxIO threads to zero, so that the producer's
+ // ActiveStreamCheckpointProcesserTask doesn't run.
+ ExecutorPool::get()->setMaxAuxIO(0);
+ // Set NonIO threads to zero, so the connManager
+ // task does not run.
+ ExecutorPool::get()->setMaxNonIO(0);
+ }
+
+ // Use TearDown from parent class
+};
+
+class StreamTest : public DCPTest {
+protected:
+ void TearDown() {
+ producer->cancelCheckpointProcessorTask();
+ // Destroy various engine objects
+ vb0.reset();
+ stream.reset();
+ producer.reset();
+ DCPTest::TearDown();
+ }
+
+ // Setup a DCP producer and attach a stream and cursor to it.
+ void setup_dcp_stream() {
+ producer = new DcpProducer(*engine, /*cookie*/NULL,
+ "test_producer", /*notifyOnly*/false);
+ stream = new MockActiveStream(engine, producer,
+ producer->getName(), /*flags*/0,
+ /*opaque*/0, vbid,
+ /*st_seqno*/0,
+ /*en_seqno*/~0,
+ /*vb_uuid*/0xabcd,
+ /*snap_start_seqno*/0,
+ /*snap_end_seqno*/~0);
+ vb0 = engine->getVBucket(0);
+ EXPECT_TRUE(vb0) << "Failed to get valid VBucket object for id 0";
+ EXPECT_FALSE(vb0->checkpointManager.registerTAPCursor(
+ producer->getName()))
+ << "Found an existing TAP cursor when attempting to register ours";
+ }
+
+ dcp_producer_t producer;
+ stream_t stream;
+ RCPtr<VBucket> vb0;
+};
+
+/* Regression test for MB-17766 - ensure that when an ActiveStream is preparing
+ * queued items to be sent out via a DCP consumer, that nextCheckpointItem()
+ * doesn't incorrectly return false (meaning that there are no more checkpoint
+ * items to send).
+ */
+TEST_F(StreamTest, test_mb17766) {
+
+ // Add an item.
+ store_item(vbid, "key", "value");
+
+ setup_dcp_stream();
+
+ // Should start with nextCheckpointItem() returning true.
+ MockActiveStream* mock_stream = static_cast<MockActiveStream*>(stream.get());
+ EXPECT_TRUE(mock_stream->public_nextCheckpointItem())
+ << "nextCheckpointItem() should initially be true.";
+
+ std::deque<queued_item> items;
+
+ // Get the set of outstanding items
+ mock_stream->public_getOutstandingItems(vb0, items);
+
+ // REGRESSION CHECK: nextCheckpointItem() should still return true
+ EXPECT_TRUE(mock_stream->public_nextCheckpointItem())
+ << "nextCheckpointItem() after getting outstanding items should be true.";
+
+ // Process the set of items
+ mock_stream->public_processItems(items);
+
+ // Should finish with nextCheckpointItem() returning false.
+ EXPECT_FALSE(mock_stream->public_nextCheckpointItem())
+ << "nextCheckpointItem() after processing items should be false.";
+}
+
+//
+// MB17653 test removed in 3.0.x backport, as MB not fixed in 3.0.x branch.
+//
+
+//
+// test_mb18625 test removed in 3.0.x backport, as MB not fixed in 3.0.x branch.
+//
+
+class ConnectionTest : public DCPTest {};
+
+ENGINE_ERROR_CODE mock_noop_return_engine_e2big(const void* cookie,uint32_t opaque) {
+ return ENGINE_E2BIG;
+}
+
+TEST_F(ConnectionTest, test_maybesendnoop_buffer_full) {
+ const void* cookie = create_mock_cookie();
+ // Create a Mock Dcp producer
+ MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+
+ struct dcp_message_producers producers = {NULL, NULL, NULL, NULL,
+ NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
+ mock_noop_return_engine_e2big, NULL, NULL};
+
+ producer.setNoopEnabled(true);
+ producer.setNoopSendTime(21);
+ ENGINE_ERROR_CODE ret = producer.maybeSendNoop(&producers);
+ EXPECT_EQ(ENGINE_E2BIG, ret)
+ << "maybeSendNoop not returning ENGINE_E2BIG";
+ EXPECT_FALSE(producer.getNoopPendingRecv())
+ << "Waiting for noop acknowledgement";
+ EXPECT_EQ(21, producer.getNoopSendTime())
+ << "SendTime has been updated";
+ destroy_mock_cookie(cookie);
+}
+
+TEST_F(ConnectionTest, test_maybesendnoop_send_noop) {
+ const void* cookie = create_mock_cookie();
+ // Create a Mock Dcp producer
+ MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+
+ dcp_message_producers* producers = get_dcp_producers();
+ producer.setNoopEnabled(true);
+ producer.setNoopSendTime(21);
+ ENGINE_ERROR_CODE ret = producer.maybeSendNoop(producers);
+ EXPECT_EQ(ENGINE_WANT_MORE, ret)
+ << "maybeSendNoop not returning ENGINE_WANT_MORE";
+ EXPECT_TRUE(producer.getNoopPendingRecv())
+ << "Not waiting for noop acknowledgement";
+ EXPECT_NE(21, producer.getNoopSendTime())
+ << "SendTime has not been updated";
+ destroy_mock_cookie(cookie);
+
+ free(producers);
+}
+
+TEST_F(ConnectionTest, test_maybesendnoop_noop_already_pending) {
+ const void* cookie = create_mock_cookie();
+ // Create a Mock Dcp producer
+ MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+
+ dcp_message_producers* producers = get_dcp_producers();
+ producer.setNoopEnabled(true);
+ producer.setNoopSendTime(21);
+ ENGINE_ERROR_CODE ret = producer.maybeSendNoop(producers);
+ EXPECT_EQ(ENGINE_WANT_MORE, ret)
+ << "maybeSendNoop not returning ENGINE_WANT_MORE";
+ EXPECT_TRUE(producer.getNoopPendingRecv())
+ << "Not awaiting noop acknowledgement";
+ EXPECT_NE(21, producer.getNoopSendTime())
+ << "SendTime has not been updated";
+ producer.setNoopSendTime(21);
+ ENGINE_ERROR_CODE ret2 = producer.maybeSendNoop(producers);
+ EXPECT_EQ(ENGINE_DISCONNECT, ret2)
+ << "maybeSendNoop not returning ENGINE_DISCONNECT";
+ EXPECT_TRUE(producer.getNoopPendingRecv())
+ << "Not waiting for noop acknowledgement";
+ EXPECT_EQ(21, producer.getNoopSendTime())
+ << "SendTime has been updated";
+ destroy_mock_cookie(cookie);
+
+ free(producers);
+}
+
+TEST_F(ConnectionTest, test_maybesendnoop_not_enabled) {
+ const void* cookie = create_mock_cookie();
+ // Create a Mock Dcp producer
+ MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+
+ dcp_message_producers* producers = get_dcp_producers();
+ producer.setNoopEnabled(false);
+ producer.setNoopSendTime(21);
+ ENGINE_ERROR_CODE ret = producer.maybeSendNoop(producers);
+ EXPECT_EQ(ENGINE_FAILED, ret)
+ << "maybeSendNoop not returning ENGINE_FAILED";
+ EXPECT_FALSE(producer.getNoopPendingRecv())
+ << "Waiting for noop acknowledgement";
+ EXPECT_EQ(21, producer.getNoopSendTime())
+ << "SendTime has been updated";
+ destroy_mock_cookie(cookie);
+
+ free(producers);
+}
+
+TEST_F(ConnectionTest, test_maybesendnoop_not_sufficient_time_passed) {
+ const void* cookie = create_mock_cookie();
+ // Create a Mock Dcp producer
+ MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+
+ dcp_message_producers* producers = get_dcp_producers();
+ producer.setNoopEnabled(true);
+ rel_time_t current_time = ep_current_time();
+ producer.setNoopSendTime(current_time);
+ ENGINE_ERROR_CODE ret = producer.maybeSendNoop(producers);
+ EXPECT_EQ(ENGINE_FAILED, ret)
+ << "maybeSendNoop not returning ENGINE_FAILED";
+ EXPECT_FALSE(producer.getNoopPendingRecv())
+ << "Waiting for noop acknowledgement";
+ EXPECT_EQ(current_time, producer.getNoopSendTime())
+ << "SendTime has been incremented";
+ destroy_mock_cookie(cookie);
+
+ free(producers);
+}
+
+TEST_F(ConnectionTest, test_deadConnections) {
+ MockDcpConnMap connMap(*engine);
+ connMap.initialize(DCP_CONN_NOTIFIER);
+ const void *cookie = create_mock_cookie();
+ // Create a new Dcp producer
+ dcp_producer_t producer = connMap.newProducer(cookie, "test_producer",
+ /*notifyOnly*/false);
+
+ // Disconnect the producer connection
+ connMap.disconnect(cookie);
+ EXPECT_EQ(1, connMap.getNumberOfDeadConnections())
+ << "Unexpected number of dead connections";
+ connMap.manageConnections();
+ // Should be zero deadConnections
+ EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
+ << "Dead connections still remain";
+}
+
+//
+// test_mb17042* removed in 3.0.x backport, as MB not fixed in 3.0.x
+//
+
+TEST_F(ConnectionTest, test_update_of_last_message_time_in_consumer) {
+ const void* cookie = create_mock_cookie();
+ // Create a Mock Dcp consumer
+ MockDcpConsumer *consumer = new MockDcpConsumer(*engine, cookie, "test_consumer");
+ consumer->setLastMessageTime(1234);
+ consumer->addStream(/*opaque*/0, /*vbucket*/0, /*flags*/0);
+ EXPECT_NE(1234, consumer->getLastMessageTime())
+ << "lastMessagerTime not updated for addStream";
+ consumer->setLastMessageTime(1234);
+ consumer->closeStream(/*opaque*/0, /*vbucket*/0);
+ EXPECT_NE(1234, consumer->getLastMessageTime())
+ << "lastMessagerTime not updated for closeStream";
+ consumer->setLastMessageTime(1234);
+ consumer->streamEnd(/*opaque*/0, /*vbucket*/0, /*flags*/0);
+ EXPECT_NE(1234, consumer->getLastMessageTime())
+ << "lastMessagerTime not updated for streamEnd";
+ consumer->mutation(/*opaque*/0,
+ /*key*/NULL,
+ /*nkey*/0,
+ /*value*/NULL,
+ /*nvalue*/0,
+ /*cas*/0,
+ /*vbucket*/0,
+ /*flags*/0,
+ /*datatype*/0,
+ /*locktime*/0,
+ /*bySeqno*/0,
+ /*revSeqno*/0,
+ /*exprtime*/0,
+ /*nru*/0,
+ /*meta*/NULL,
+ /*nmeta*/0);
+ EXPECT_NE(1234, consumer->getLastMessageTime())
+ << "lastMessagerTime not updated for mutation";
+ consumer->setLastMessageTime(1234);
+ consumer->deletion(/*opaque*/0,
+ /*key*/NULL,
+ /*nkey*/0,
+ /*cas*/0,
+ /*vbucket*/0,
+ /*bySeqno*/0,
+ /*revSeqno*/0,
+ /*meta*/NULL,
+ /*nmeta*/0);
+ EXPECT_NE(1234, consumer->getLastMessageTime())
+ << "lastMessagerTime not updated for deletion";
+ consumer->setLastMessageTime(1234);
+ consumer->expiration(/*opaque*/0,
+ /*key*/NULL,
+ /*nkey*/0,
+ /*cas*/0,
+ /*vbucket*/0,
+ /*bySeqno*/0,
+ /*revSeqno*/0,
+ /*meta*/NULL,
+ /*nmeta*/0);
+ EXPECT_NE(1234, consumer->getLastMessageTime())
+ << "lastMessagerTime not updated for expiration";
+ consumer->setLastMessageTime(1234);
+ consumer->snapshotMarker(/*opaque*/0,
+ /*vbucket*/0,
+ /*start_seqno*/0,
+ /*end_seqno*/0,
+ /*flags*/0);
+ EXPECT_NE(1234, consumer->getLastMessageTime())
+ << "lastMessagerTime not updated for snapshotMarker";
+ consumer->setLastMessageTime(1234);
+ consumer->noop(/*opaque*/0);
+ EXPECT_NE(1234, consumer->getLastMessageTime())
+ << "lastMessagerTime not updated for noop";
+ consumer->setLastMessageTime(1234);
+ consumer->flush(/*opaque*/0, /*vbucket*/0);
+ EXPECT_NE(1234, consumer->getLastMessageTime())
+ << "lastMessagerTime not updated for flush";
+ consumer->setLastMessageTime(1234);
+ consumer->setVBucketState(/*opaque*/0,
+ /*vbucket*/0,
+ /*state*/vbucket_state_active);
+ EXPECT_NE(1234, consumer->getLastMessageTime())
+ << "lastMessagerTime not updated for setVBucketState";
+ destroy_mock_cookie(cookie);
+}
+
+class NotifyTest : public DCPTest {
+protected:
+ void SetUp() {
+ // The test is going to replace a server API method, we must
+ // be able to undo that
+ sapi = *get_mock_server_api();
+ scookie_api = *get_mock_server_api()->cookie;
+ DCPTest::SetUp();
+ }
+
+ void TearDown() {
+ // Reset the server_api for other tests
+ *get_mock_server_api() = sapi;
+ *get_mock_server_api()->cookie = scookie_api;
+ DCPTest::TearDown();
+ }
+
+ SERVER_HANDLE_V1 sapi;
+ SERVER_COOKIE_API scookie_api;
+ dcp_producer_t producer;
+ int callbacks;
+};
+
+class ConnMapNotifyTest {
+public:
+ ConnMapNotifyTest(EventuallyPersistentEngine& engine)
+ : connMap(new MockDcpConnMap(engine)),
+ callbacks(0) {
+ connMap->initialize(DCP_CONN_NOTIFIER);
+
+ // Use 'this' instead of a mock cookie
+ producer = connMap->newProducer(static_cast<void*>(this),
+ "test_producer",
+ /*notifyOnly*/false);
+ }
+
+ ~ConnMapNotifyTest() {
+ delete connMap;
+ }
+
+ void notify() {
+ callbacks++;
+ connMap->notifyPausedConnection(producer.get(), /*schedule*/true);
+ }
+
+ int getCallbacks() {
+ return callbacks;
+ }
+
+
+ static void dcp_test_notify_io_complete(const void *cookie,
+ ENGINE_ERROR_CODE status) {
+ const ConnMapNotifyTest* notifyTest = reinterpret_cast<const ConnMapNotifyTest*>(cookie);
+ // 3. Call notifyPausedConnection again. We're now interleaved inside
+ // of notifyAllPausedConnections, a second notification should occur.
+ const_cast<ConnMapNotifyTest*>(notifyTest)->notify();
+ }
+
+ MockDcpConnMap* connMap;
+ dcp_producer_t producer;
+
+private:
+ int callbacks;
+
+};
+
+
+TEST_F(NotifyTest, test_mb19503_connmap_notify) {
+ ConnMapNotifyTest notifyTest(*engine);
+
+ // Hook into notify_io_complete
+ SERVER_COOKIE_API* scapi = get_mock_server_api()->cookie;
+ scapi->notify_io_complete = ConnMapNotifyTest::dcp_test_notify_io_complete;
+
+ // Should be 0 when we begin
+ ASSERT_EQ(0, notifyTest.getCallbacks());
+ ASSERT_TRUE(notifyTest.producer->isPaused());
+ ASSERT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
+
+ // 1. Call notifyPausedConnection with schedule = true
+ // this will queue the producer
+ notifyTest.connMap->notifyPausedConnection(notifyTest.producer.get(),
+ /*schedule*/true);
+ EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
+
+ // 2. Call notifyAllPausedConnections this will invoke notifyIOComplete
+ // which we've hooked into. For step 3 go to dcp_test_notify_io_complete
+ notifyTest.connMap->notifyAllPausedConnections();
+
+ // 2.1 One callback should of occurred, and we should still have one
+ // notification pending (see dcp_test_notify_io_complete).
+ EXPECT_EQ(1, notifyTest.getCallbacks());
+ EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
+
+ // 4. Call notifyAllPausedConnections again, is there a new connection?
+ notifyTest.connMap->notifyAllPausedConnections();
+
+ // 5. There should of been 2 callbacks
+ EXPECT_EQ(2, notifyTest.getCallbacks());
+}
+
+// Variation on test_mb19503_connmap_notify - check that notification is correct
+// when notifiable is not paused.
+TEST_F(NotifyTest, test_mb19503_connmap_notify_paused) {
+ ConnMapNotifyTest notifyTest(*engine);
+
+ // Hook into notify_io_complete
+ SERVER_COOKIE_API* scapi = get_mock_server_api()->cookie;
+ scapi->notify_io_complete = ConnMapNotifyTest::dcp_test_notify_io_complete;
+
+ // Should be 0 when we begin
+ ASSERT_EQ(notifyTest.getCallbacks(), 0);
+ ASSERT_TRUE(notifyTest.producer->isPaused());
+ ASSERT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
+
+ // 1. Call notifyPausedConnection with schedule = true
+ // this will queue the producer
+ notifyTest.connMap->notifyPausedConnection(notifyTest.producer.get(),
+ /*schedule*/true);
+ EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
+
+ // 2. Mark connection as not paused.
+ notifyTest.producer->setPaused(false);
+
+ // 3. Call notifyAllPausedConnections - as the connection is not paused
+ // this should *not* invoke notifyIOComplete.
+ notifyTest.connMap->notifyAllPausedConnections();
+
+ // 3.1 Should have not had any callbacks.
+ EXPECT_EQ(0, notifyTest.getCallbacks());
+ // 3.2 Should have no pending notifications.
+ EXPECT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
+
+ // 4. Now mark the connection as paused.
+ ASSERT_FALSE(notifyTest.producer->isPaused());
+ notifyTest.producer->setPaused(true);
+
+ // 4. Add another notification - should queue the producer again.
+ notifyTest.connMap->notifyPausedConnection(notifyTest.producer.get(),
+ /*schedule*/true);
+ EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
+
+ // 5. Call notifyAllPausedConnections a second time - as connection is
+ // paused this time we *should* get a callback.
+ notifyTest.connMap->notifyAllPausedConnections();
+ EXPECT_EQ(1, notifyTest.getCallbacks());
+}
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2016 Couchbase, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Main function & globals for the ep_unit_test target.
+ */
+
+#include <memcached/extension_loggers.h>
+#include "programs/engine_testapp/mock_server.h"
+
+#include <getopt.h>
+#include <gtest/gtest.h>
+
+#include "configuration.h"
+#include "stored-value.h"
+
+/* static storage for environment variable set by putenv(). */
+static char allow_no_stats_env[] = "ALLOW_NO_STATS_UPDATE=yeah";
+
+int main(int argc, char **argv) {
+ putenv(allow_no_stats_env);
+
+ init_mock_server(false);
+
+ // Default number of hashtable locks is too large for TSan to
+ // track. Use the value in configuration.json (47 at time of
+ // writing).
+ HashTable::setDefaultNumLocks(Configuration().getHtLocks());
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2016 Couchbase, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Unit tests for the EventuallyPersistentEngine class.
+ */
+
+#include "evp_engine_test.h"
+
+#include "ep_engine.h"
+#include "programs/engine_testapp/mock_server.h"
+#include <platform/dirutils.h>
+
+void EventuallyPersistentEngineTest::SetUp() {
+ // Paranoia - kill any existing files in case they are left over
+ // from a previous run.
+ CouchbaseDirectoryUtilities::rmrf(test_dbname);
+
+ vbid = 0;
+
+ // Setup an engine with a single active vBucket.
+ EXPECT_EQ(ENGINE_SUCCESS,
+ create_instance(1, get_mock_server_api, &handle))
+ << "Failed to create ep engine instance";
+ EXPECT_EQ(1, handle->interface) << "Unexpected engine handle version";
+ engine_v1 = reinterpret_cast<ENGINE_HANDLE_V1*>(handle);
+
+ engine = reinterpret_cast<EventuallyPersistentEngine*>(handle);
+ ObjectRegistry::onSwitchThread(engine);
+ std::string config = "dbname=" + std::string(test_dbname);
+ EXPECT_EQ(ENGINE_SUCCESS, engine->initialize(config.c_str()))
+ << "Failed to initialize engine.";
+
+ engine->setVBucketState(vbid, vbucket_state_active, false);
+
+ // Wait for warmup to complete.
+ while (engine->getEpStore()->isWarmingUp()) {
+ usleep(10);
+ }
+}
+
+void EventuallyPersistentEngineTest::TearDown() {
+ // Need to force the destroy (i.e. pass true) because
+ // NonIO threads may have been disabled (see DCPTest subclass).
+ engine_v1->destroy(handle, true);
+ destroy_mock_event_callbacks();
+ destroy_engine();
+ // Cleanup any files we created.
+ CouchbaseDirectoryUtilities::rmrf(test_dbname);
+}
+
+void EventuallyPersistentEngineTest::store_item(uint16_t vbid,
+ const std::string& key,
+ const std::string& value) {
+ Item item(key.c_str(), key.size(), /*flags*/0, /*exp*/0, value.c_str(),
+ value.size());
+ uint64_t cas;
+ EXPECT_EQ(ENGINE_SUCCESS,
+ engine->store(NULL, &item, &cas, OPERATION_SET, vbid));
+}
+
+const char EventuallyPersistentEngineTest::test_dbname[] = "ep_engine_ep_unit_tests_db";
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2016 Couchbase, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Unit tests for the EventuallyPersistentEngine class.
+ */
+
+#pragma once
+
+#include "config.h"
+
+#include <memcached/engine.h>
+
+#include <gtest/gtest.h>
+
+class EventuallyPersistentEngine;
+
+class EventuallyPersistentEngineTest : public ::testing::Test {
+protected:
+
+ void SetUp();
+
+ void TearDown();
+
+ /* Helper methods for tests */
+
+ /* Stores an item into the given vbucket. */
+ void store_item(uint16_t vbid, const std::string& key,
+ const std::string& value);
+
+ std::string config_string;
+
+ static const char test_dbname[];
+
+ uint16_t vbid;
+
+ ENGINE_HANDLE* handle;
+ ENGINE_HANDLE_V1* engine_v1;
+ EventuallyPersistentEngine* engine;
+};
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2013 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "evp_store_test.h"
+
+#include "fakes/fake_executorpool.h"
+#include "taskqueue.h"
+
+/*
+ * A subclass of EventuallyPersistentStoreTest which uses a fake ExecutorPool,
+ * which will not spawn ExecutorThreads and hence not run any tasks
+ * automatically in the background. All tasks must be manually run().
+ */
+class SingleThreadedEPStoreTest : public EventuallyPersistentStoreTest {
+ void SetUp() {
+ SingleThreadedExecutorPool::replaceExecutorPoolWithFake();
+ EventuallyPersistentStoreTest::SetUp();
+ }
+};
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2016 Couchbase, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Unit tests for the EventuallyPersistentStore class.
+ *
+ * Note that these test do *not* have the normal Tasks running (BGFetcher,
+ * flusher etc) as we do not initialise EPEngine. This means that such tasks
+ * need to be manually run. This can be very helpful as it essentially gives us
+ * synchronous control of EPStore.
+ */
+
+#include "evp_store_test.h"
+
+#include "bgfetcher.h"
+#include "checkpoint.h"
+#include "checkpoint_remover.h"
+#include "connmap.h"
+#include "ep_engine.h"
+#include "flusher.h"
+#include "../mock/mock_dcp_producer.h"
+
+#include "programs/engine_testapp/mock_server.h"
+#include <platform/dirutils.h>
+
+SynchronousEPEngine::SynchronousEPEngine(const std::string& extra_config)
+ : EventuallyPersistentEngine(get_mock_server_api) {
+ maxFailoverEntries = 1;
+
+ // Merge any extra config into the main configuration.
+ if (extra_config.size() > 0) {
+ if (!configuration.parseConfiguration(extra_config.c_str(),
+ serverApi)) {
+ throw std::invalid_argument("Unable to parse config string: " +
+ extra_config);
+ }
+ }
+
+ // workload is needed by EPStore's constructor (to construct the
+ // VBucketMap).
+ workload = new WorkLoadPolicy(/*workers*/1, /*shards*/1);
+
+ // dcpConnMap_ is needed by EPStore's constructor.
+ dcpConnMap_ = new DcpConnMap(*this);
+
+ // tapConnMap is needed by queueDirty.
+ tapConnMap = new TapConnMap(*this);
+
+ // checkpointConfig is needed by CheckpointManager (via EPStore).
+ checkpointConfig = new CheckpointConfig(*this);
+}
+
+void SynchronousEPEngine::setEPStore(EventuallyPersistentStore* store) {
+ cb_assert(epstore == NULL);
+ epstore = store;
+}
+
+MockEPStore::MockEPStore(EventuallyPersistentEngine &theEngine)
+ : EventuallyPersistentStore(theEngine) {
+ // Perform a limited set of setup (normally done by EPStore::initialize) -
+ // enough such that objects which are assumed to exist are present.
+
+ // Create the closed checkpoint removed task. Note we do _not_ schedule
+ // it, unlike EPStore::initialize
+ chkTask = new ClosedUnrefCheckpointRemoverTask
+ (&engine, stats, theEngine.getConfiguration().getChkRemoverStime());
+}
+
+VBucketMap& MockEPStore::getVbMap() {
+ return vbMap;
+}
+
+/* Mock Task class. Doesn't actually run() or snooze() - they both do nothing.
+ */
+class MockGlobalTask : public GlobalTask {
+public:
+ MockGlobalTask(EventuallyPersistentEngine* e, const Priority &p)
+ : GlobalTask(e, p) {}
+
+ bool run() { return false; }
+ std::string getDescription() { return "MockGlobalTask"; }
+
+ void snooze(const double secs) {}
+};
+
+void EventuallyPersistentStoreTest::SetUp() {
+ // Paranoia - kill any existing files in case they are left over
+ // from a previous run.
+ CouchbaseDirectoryUtilities::rmrf(test_dbname);
+
+ // Add dbname to config string.
+ std::string config = config_string;
+ if (config.size() > 0) {
+ config += ";";
+ }
+ config += "dbname=" + std::string(test_dbname);
+
+ vbid = 0;
+
+ engine = new SynchronousEPEngine(config);
+ ObjectRegistry::onSwitchThread(engine);
+
+ store = new MockEPStore(*engine);
+ engine->setEPStore(store);
+
+ // Ensure that EPEngine is hold about necessary server callbacks
+ // (client disconnect, bucket delete).
+ engine->public_initializeEngineCallbacks();
+
+ // Need to initialize ep_real_time and friends.
+ initialize_time_functions(get_mock_server_api()->core);
+
+ cookie = create_mock_cookie();
+}
+
+void EventuallyPersistentStoreTest::TearDown() {
+ destroy_mock_cookie(cookie);
+ destroy_mock_event_callbacks();
+ engine->getDcpConnMap().manageConnections();
+
+ // Need to have the current engine valid before deleting (this is what
+ // EvpDestroy does normally; however we have a smart ptr to the engine
+ // so must delete via that).
+ ObjectRegistry::onSwitchThread(engine);
+ delete engine;
+
+ // Shutdown the ExecutorPool singleton (initialized when we create
+ // an EventuallyPersistentStore object). Must happen after engine
+ // has been destroyed (to allow the tasks the engine has
+ // registered a chance to be unregistered).
+ ExecutorPool::shutdown();
+}
+
+void EventuallyPersistentStoreTest::store_item(uint16_t vbid,
+ const std::string& key,
+ const std::string& value) {
+ Item item(key.c_str(), key.size(), /*flags*/0, /*exp*/0, value.c_str(),
+ value.size());
+ item.setVBucketId(vbid);
+ EXPECT_EQ(ENGINE_SUCCESS, store->set(item, NULL));
+}
+
+
+//
+// EPStoreEvictionTest disabled in 3.0.x backport - there's an unknown
+// bug where onSwitchThread() ends up NULL, meaning that we eventually hit
+// an assert and crash.
+//
+
+
+const char EventuallyPersistentStoreTest::test_dbname[] = "ep_engine_ep_unit_tests_db";
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2016 Couchbase, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Unit tests for the EventuallyPersistentStore class.
+ */
+
+#pragma once
+
+#include "config.h"
+
+#include "ep.h"
+#include "ep_engine.h"
+
+#include <memcached/engine.h>
+
+#include <gtest/gtest.h>
+#include <memory>
+
+class MockEPStore;
+
+/* A class which subclasses the real EPEngine. Its main purpose is to allow
+ * us to construct and setup an EPStore without starting all the various
+ * background tasks which are normally started by EPEngine as part of creating
+ * EPStore (in the initialize() method).
+ *
+ * The net result is a (mostly) synchronous environment - while the
+ * ExecutorPool's threads exist, none of the normally-created background Tasks
+ * should be running. Note however that /if/ any new tasks are created, they
+ * will be scheduled on the ExecutorPools' threads asynchronously.
+ */
+class SynchronousEPEngine : public EventuallyPersistentEngine {
+public:
+ SynchronousEPEngine(const std::string& extra_config);
+
+ void setEPStore(EventuallyPersistentStore* store);
+
+ /* Allow us to call normally protected methods */
+
+ ENGINE_ERROR_CODE public_doTapVbTakeoverStats(const void *cookie,
+ ADD_STAT add_stat,
+ std::string& key,
+ uint16_t vbid) {
+ return doTapVbTakeoverStats(cookie, add_stat, key, vbid);
+ }
+
+ ENGINE_ERROR_CODE public_doDcpVbTakeoverStats(const void *cookie,
+ ADD_STAT add_stat,
+ std::string& key,
+ uint16_t vbid) {
+ return doDcpVbTakeoverStats(cookie, add_stat, key, vbid);
+ }
+
+ void public_initializeEngineCallbacks() {
+ return initializeEngineCallbacks();
+ }
+};
+
+/* Subclass of EPStore to expose normally non-public members for test
+ * purposes.
+ */
+class MockEPStore : public EventuallyPersistentStore {
+public:
+ MockEPStore(EventuallyPersistentEngine &theEngine);
+
+ VBucketMap& getVbMap();
+
+ void public_stopWarmup() {
+ stopWarmup();
+ }
+};
+
+/* Actual test fixture class */
+class EventuallyPersistentStoreTest : public ::testing::Test {
+protected:
+ void SetUp();
+
+ void TearDown();
+
+ /* Stores an item into the given vbucket. */
+ void store_item(uint16_t vbid, const std::string& key,
+ const std::string& value);
+
+ static const char test_dbname[];
+
+ std::string config_string;
+
+ uint16_t vbid;
+
+ // The mock engine (needed to construct the store).
+ SynchronousEPEngine* engine;
+
+ // The store under test. Wrapped in a mock to expose some normally
+ // protected members. Uses a raw pointer as this is owned by the engine.
+ MockEPStore* store;
+
+ // The (mock) server cookie.
+ const void* cookie;
+};
+++ /dev/null
-/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/*
- * Copyright 2016 Couchbase, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "dcp-producer.h"
-#include "dcp-stream.h"
-#include "../programs/engine_testapp/mock_server.h"
-
-#include <platform/dirutils.h>
-
-// Simple backport of the GTest-style EXPECT_EQ macro.
-#define EXPECT_EQ(a, b, msg) \
- do { \
- if ((a) != (b)) { \
- throw std::runtime_error(msg); \
- } \
- } while(0)
-
-// Mock of the ActiveStream class. Wraps the real ActiveStream, but exposes
-// normally protected methods publically for test purposes.
-class MockActiveStream : public ActiveStream {
-public:
- MockActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
- const std::string &name, uint32_t flags, uint32_t opaque,
- uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
- uint64_t vb_uuid, uint64_t snap_start_seqno,
- uint64_t snap_end_seqno)
- : ActiveStream(e, p, name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
- snap_start_seqno, snap_end_seqno) {}
-
- // Expose underlying protected ActiveStream methods as public
- void public_getOutstandingItems(RCPtr<VBucket> &vb,
- std::deque<queued_item> &items) {
- getOutstandingItems(vb, items);
- }
-
- void public_processItems(std::deque<queued_item>& items) {
- processItems(items);
- }
-
- bool public_nextCheckpointItem() {
- return nextCheckpointItem();
- }
-};
-
-/* Regression test for MB-17766 - ensure that when an ActiveStream is preparing
- * queued items to be sent out via a DCP consumer, that nextCheckpointItem()
- * doesn't incorrectly return false (meaning that there are no more checkpoint
- * items to send).
- */
-static void test_mb17766(const std::string& test_dbname) {
- // Setup an engine with a single active vBucket.
- ENGINE_HANDLE* handle;
- EXPECT_EQ(ENGINE_SUCCESS,
- create_instance(1, get_mock_server_api, &handle),
- "Failed to created ep engine instance");
-
- // Init mock server to initialize time_mutex used in
- // mock_get_current_time & mock_time_travel apis.
- init_mock_server(handle);
-
- EventuallyPersistentEngine* engine =
- reinterpret_cast<EventuallyPersistentEngine*>(handle);
- ObjectRegistry::onSwitchThread(engine);
- std::string config = "dbname=" + test_dbname;
- EXPECT_EQ(ENGINE_SUCCESS,
- engine->initialize(config.c_str()),
- "Failed to initialize engine.");
-
- const uint16_t vbid = 0;
- engine->setVBucketState(vbid, vbucket_state_active, false);
-
- // Wait for warmup to complete.
- while (engine->getEpStore()->isWarmingUp()) {
- usleep(10);
- }
-
- // Set AuxIO threads to zero, so that the producer's
- // ActiveStreamCheckpointProcesserTask doesn't run.
- ExecutorPool::get()->setMaxAuxIO(0);
-
- // Add an item.
- std::string value("value");
- Item item("key", /*flags*/0, /*exp*/0, value.c_str(), value.size());
-
- uint64_t cas;
- EXPECT_EQ(ENGINE_SUCCESS,
- engine->store(NULL, &item, &cas, OPERATION_SET, vbid),
- "Store failed");
-
- // Create a DCP producer and register with checkpoint Manager.
- dcp_producer_t producer = new DcpProducer(*engine, /*cookie*/NULL,
- "test_mb_17766_producer",
- /*notifyOnly*/false);
- stream_t stream = new MockActiveStream(engine, producer,
- producer->getName(),
- /*flags*/0,
- /*opaque*/0, vbid,
- /*st_seqno*/0,
- /*en_seqno*/~0,
- /*vb_uuid*/0xabcd,
- /*snap_start_seqno*/0,
- /*snap_end_seqno*/~0);
-
- RCPtr<VBucket> vb0 = engine->getVBucket(0);
- EXPECT_EQ(true, vb0, "Failed to get valid VBucket object for id 0");
- vb0->checkpointManager.registerTAPCursor(producer->getName());
-
- // Should start with nextCheckpointItem() returning true.
- MockActiveStream* mock_stream = static_cast<MockActiveStream*>(stream.get());
- EXPECT_EQ(true,
- mock_stream->public_nextCheckpointItem(),
- "nextCheckpointItem() should initially be true.");
-
- std::deque<queued_item> items;
-
- // Get the set of outstanding items
- mock_stream->public_getOutstandingItems(vb0, items);
-
- // REGRESSION CHECK: nextCheckpointItem() should still return true
- EXPECT_EQ(true,
- mock_stream->public_nextCheckpointItem(),
- "nextCheckpointItem() after getting outstanding items should be true.");
-
- // Process the set of items
- mock_stream->public_processItems(items);
-
- // Should finish with nextCheckpointItem() returning false.
- EXPECT_EQ(false,
- mock_stream->public_nextCheckpointItem(),
- "nextCheckpointItem() after processing items should be false.");
-
- producer->cancelCheckpointProcessorTask();
-}
-
-int main(int argc, char **argv) {
- (void)argc; (void)argv;
- putenv(strdup("ALLOW_NO_STATS_UPDATE=yeah"));
-
- bool success = true;
- try {
- test_mb17766("stream_test_db");
- } catch (std::exception& e) {
- std::cerr << "FAILED: " << e.what() << std::endl;
- success = false;
- }
-
- // Cleanup any files we created.
- CouchbaseDirectoryUtilities::rmrf("stream_test_db");
-
- return success ? 0 : 1;
-}