Merge remote-tracking branch 'couchbase/3.0.x' into sherlock 13/65613/3
authorDave Rigby <daver@couchbase.com>
Fri, 8 Jul 2016 09:49:51 +0000 (10:49 +0100)
committerDave Rigby <daver@couchbase.com>
Fri, 8 Jul 2016 14:27:10 +0000 (15:27 +0100)
* couchbase/3.0.x:
  MB-20054: Backport ep-engine_unit_tests from watson to 3.0.x

Change-Id: I811e0b796be8611a4a574ab6b6a488ef50219bbf

19 files changed:
1  2 
CMakeLists.txt
src/atomicqueue.h
src/bgfetcher.cc
src/connmap.cc
src/connmap.h
src/dcp-consumer.h
src/dcp-producer.h
src/dcp-stream.h
src/ep.h
src/ep_engine.cc
src/ep_engine.h
src/executorpool.cc
src/executorpool.h
src/executorthread.h
src/memory_tracker.cc
src/tasks.h
tests/module_tests/dcp_test.cc
tests/module_tests/ep_unit_tests_main.cc
tests/module_tests/evp_store_test.cc

diff --cc CMakeLists.txt
@@@ -145,21 -141,17 +145,30 @@@ ADD_LIBRARY(ep SHARE
  SET_TARGET_PROPERTIES(ep PROPERTIES PREFIX "")
  TARGET_LINK_LIBRARIES(ep cJSON JSON_checker couchstore dirutils platform ${LIBEVENT_LIBRARIES})
  
- ADD_EXECUTABLE(ep-engine_stream_test
-   tests/module_tests/stream_test.cc
 +IF (APPLE)
 +    SET(MEMORY_TRACKING_SRCS ${Memcached_SOURCE_DIR}/daemon/alloc_hooks.c
 +                             ${Memcached_SOURCE_DIR}/daemon/darwin_zone.c)
 +ELSE (APPLE)
 +    SET(MEMORY_TRACKING_SRCS ${Memcached_SOURCE_DIR}/daemon/alloc_hooks.c)
 +ENDIF (APPLE)
 +
 +IF (MEMORY_ALLOCATOR)
 +    INCLUDE_DIRECTORIES(AFTER ${MALLOC_INCLUDE_DIR})
 +ELSE (MEMORY_ALLOCATOR)
 +    SET(MALLOC_LIBRARIES "")
 +ENDIF (MEMORY_ALLOCATOR)
 +
+ # Single executable containing all class-level unit tests involving
+ # EventuallyPersistentEngine driven by GoogleTest.
+ # (We end up compiling most of the src/ files of ep-engine for these unit tests,
+ # so simpler / quicker just to link them into a single executable).
+ ADD_EXECUTABLE(ep-engine_ep_unit_tests
+   tests/mock/mock_dcp.cc
+   tests/module_tests/ep_unit_tests_main.cc
+   tests/module_tests/dcp_test.cc
+   tests/module_tests/evp_engine_test.cc
+   tests/module_tests/evp_store_test.cc
+   tests/module_tests/evp_store_single_threaded_test.cc
    src/access_scanner.cc
    src/atomic.cc
    src/backfill.cc
    ${CONFIG_SOURCE}
    ${KVSTORE_SOURCE}
    ${COUCH_KVSTORE_SOURCE}
 +  ${MEMORY_TRACKING_SRCS}
    ${Memcached_SOURCE_DIR}/programs/engine_testapp/mock_server.c)
- TARGET_LINK_LIBRARIES(ep-engine_stream_test couchstore cJSON dirutils JSON_checker mcd_util platform ${MALLOC_LIBRARIES})
 -TARGET_LINK_LIBRARIES(ep-engine_ep_unit_tests couchstore cJSON dirutils gtest JSON_checker mcd_util platform)
++TARGET_LINK_LIBRARIES(ep-engine_ep_unit_tests couchstore cJSON dirutils gtest JSON_checker mcd_util platform ${MALLOC_LIBRARIES})
  
  ADD_EXECUTABLE(ep-engine_atomic_ptr_test
    tests/module_tests/atomic_ptr_test.cc
@@@ -329,9 -259,8 +339,8 @@@ ADD_TEST(ep-engine_histo_test ep-engine
  ADD_TEST(ep-engine_hrtime_test ep-engine_hrtime_test)
  ADD_TEST(ep-engine_misc_test ep-engine_misc_test)
  ADD_TEST(ep-engine_mutex_test ep-engine_mutex_test)
 -ADD_TEST(ep-engine_priority_test ep-engine_priority_test)
  ADD_TEST(ep-engine_ringbuffer_test ep-engine_ringbuffer_test)
- ADD_TEST(ep-engine_stream_test ep-engine_stream_test)
 +ADD_TEST(ep-engine_kvstore_test ep-engine_kvstore_test)
  
  ADD_LIBRARY(timing_tests SHARED tests/module_tests/timing_tests.cc)
  SET_TARGET_PROPERTIES(timing_tests PROPERTIES PREFIX "")
  #ifndef SRC_ATOMICQUEUE_H_
  #define SRC_ATOMICQUEUE_H_ 1
  
 -#include "atomic.h"
++#include <queue>
  #ifdef _MSC_VER
  
--#include <queue>
  #include <thread>
  #include <mutex>
  
@@@ -49,6 -51,6 +50,14 @@@ public
          return queue.empty();
      }
  
++    /**
++     * Return the number of queued items.
++     */
++    size_t size() {
++        std::lock_guard<std::mutex> lock(mutex);
++        return queue.size();
++    }
++
  private:
      std::queue<T> queue;
      std::mutex mutex;
Simple merge
diff --cc src/connmap.cc
@@@ -109,8 -109,8 +109,9 @@@ void ConnNotifier::stop() 
  void ConnNotifier::notifyMutationEvent(void) {
      bool inverse = false;
      if (pendingNotification.compare_exchange_strong(inverse, true)) {
--        cb_assert(task > 0);
--        ExecutorPool::get()->wake(task);
++        if (task > 0) {
++            ExecutorPool::get()->wake(task);
++        }
      }
  }
  
diff --cc src/connmap.h
@@@ -509,17 -482,9 +509,17 @@@ public
      ENGINE_ERROR_CODE addPassiveStream(ConnHandler* conn, uint32_t opaque,
                                         uint16_t vbucket, uint32_t flags);
  
 -    void addStats(ADD_STAT add_stat, const void *c);
 +    /*
 +     * Change the value at which a DcpConsumer::Processor task will yield
 +     */
 +    void consumerYieldConfigChanged(size_t newValue);
 +
 +    /*
 +     * Change the batchsize that the DcpConsumer::Processor operates with
 +     */
 +    void consumerBatchSizeConfigChanged(size_t newValue);
  
- private:
+ protected:
  
      bool isPassiveStreamConnected_UNLOCKED(uint16_t vbucket);
  
@@@ -95,15 -95,7 +95,15 @@@ public
  
      bool isStreamPresent(uint16_t vbucket);
  
- private:
 +    void setProcessorYieldThreshold(size_t newValue) {
 +        processBufferedMessagesYieldThreshold = newValue;
 +    }
 +
 +    void setProcessBufferedMessagesBatchSize(size_t newValue) {
 +        processBufferedMessagesBatchSize = newValue;
 +    }
 +
+ protected:
  
      DcpResponse* getNextItem();
  
@@@ -204,11 -189,12 +204,11 @@@ public
      void scheduleCheckpointProcessorTask(stream_t s);
  
      /*
 -        Clears active stream checkpoint processor task's queue,
 -        and cancels the task.
 +        Clears active stream checkpoint processor task's queue.
      */
 -    void cancelCheckpointProcessorTask();
 +    void clearCheckpointProcessorTaskQueues();
  
- private:
+ protected:
  
      /**
       * DcpProducerReadyQueue is a std::queue wrapper for managing a
Simple merge
diff --cc src/ep.h
Simple merge
@@@ -1840,7 -1752,15 +1838,15 @@@ extern "C" 
          return ENGINE_SUCCESS;
      }
  
 -    }
+     /*
+         This method is called prior to unloading of the shared-object.
+         Global clean-up should be performed from this method.
+      */
+     void destroy_engine() {
+         ExecutorPool::shutdown();
 -    static bool EvpGetItemInfo(ENGINE_HANDLE *, const void *,
++     }
 +    static bool EvpGetItemInfo(ENGINE_HANDLE *handle, const void *,
                                 const item* itm, item_info *itm_info)
      {
          const Item *it = reinterpret_cast<const Item*>(itm);
diff --cc src/ep_engine.h
Simple merge
Simple merge
Simple merge
Simple merge
Simple merge
diff --cc src/tasks.h
Simple merge
index 0000000,4cca8e5..80eb3d7
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,558 +1,558 @@@
 -                                    std::deque<queued_item> &items) {
+ /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+ /*
+  *     Copyright 2016 Couchbase, Inc
+  *
+  *   Licensed under the Apache License, Version 2.0 (the "License");
+  *   you may not use this file except in compliance with the License.
+  *   You may obtain a copy of the License at
+  *
+  *       http://www.apache.org/licenses/LICENSE-2.0
+  *
+  *   Unless required by applicable law or agreed to in writing, software
+  *   distributed under the License is distributed on an "AS IS" BASIS,
+  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  *   See the License for the specific language governing permissions and
+  *   limitations under the License.
+  */
+ /*
+  * Unit test for DCP-related classes.
+  *
+  * Due to the way our classes are structured, most of the different DCP classes
+  * need an instance of EventuallyPersistentStore & other related objects.
+  */
+ #include "connmap.h"
+ #include "dcp-stream.h"
+ #include "dcp-response.h"
+ #include "evp_engine_test.h"
+ #include "programs/engine_testapp/mock_server.h"
+ #include "../mock/mock_dcp.h"
+ #include "../mock/mock_dcp_producer.h"
+ #include "../mock/mock_dcp_consumer.h"
+ #include <gtest/gtest.h>
+ // Mock of the ActiveStream class. Wraps the real ActiveStream, but exposes
+ // normally protected methods publically for test purposes.
+ class MockActiveStream : public ActiveStream {
+ public:
+     MockActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
+                      const std::string &name, uint32_t flags, uint32_t opaque,
+                      uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
+                      uint64_t vb_uuid, uint64_t snap_start_seqno,
+                      uint64_t snap_end_seqno)
+     : ActiveStream(e, p, name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
+                    snap_start_seqno, snap_end_seqno) {}
+     // Expose underlying protected ActiveStream methods as public
+     void public_getOutstandingItems(RCPtr<VBucket> &vb,
 -    void public_processItems(std::deque<queued_item>& items) {
++                                    std::vector<queued_item> &items) {
+         getOutstandingItems(vb, items);
+     }
 -        producer->cancelCheckpointProcessorTask();
++    void public_processItems(std::vector<queued_item>& items) {
+         processItems(items);
+     }
+     bool public_nextCheckpointItem() {
+         return nextCheckpointItem();
+     }
+     const std::queue<DcpResponse*>& public_readyQ() {
+         return readyQ;
+     }
+     DcpResponse* public_nextQueuedItem() {
+         return nextQueuedItem();
+     }
+ };
+ /*
+  * Mock of the DcpConnMap class.  Wraps the real DcpConnMap, but exposes
+  * normally protected methods publically for test purposes.
+  */
+ class MockDcpConnMap: public DcpConnMap {
+ public:
+     MockDcpConnMap(EventuallyPersistentEngine &theEngine)
+     : DcpConnMap(theEngine)
+     {}
+     size_t getNumberOfDeadConnections() {
+         return deadConnections.size();
+     }
+     AtomicQueue<connection_t>& getPendingNotifications() {
+         return pendingNotifications;
+     }
+     void initialize(conn_notifier_type ntype) {
+         connNotifier_ = new ConnNotifier(ntype, *this);
+         // We do not create a ConnNotifierCallback task
+         // We do not create a ConnManager task
+         // The ConnNotifier is deleted in the DcpConnMap
+         // destructor
+     }
+ };
+ class DCPTest : public EventuallyPersistentEngineTest {
+ protected:
+     void SetUp() {
+         EventuallyPersistentEngineTest::SetUp();
+         // Set AuxIO threads to zero, so that the producer's
+         // ActiveStreamCheckpointProcesserTask doesn't run.
+         ExecutorPool::get()->setMaxAuxIO(0);
+         // Set NonIO threads to zero, so the connManager
+         // task does not run.
+         ExecutorPool::get()->setMaxNonIO(0);
+     }
+     // Use TearDown from parent class
+ };
+ class StreamTest : public DCPTest {
+ protected:
+     void TearDown() {
 -        EXPECT_FALSE(vb0->checkpointManager.registerTAPCursor(
++        producer->clearCheckpointProcessorTaskQueues();
+         // Destroy various engine objects
+         vb0.reset();
+         stream.reset();
+         producer.reset();
+         DCPTest::TearDown();
+     }
+     // Setup a DCP producer and attach a stream and cursor to it.
+     void setup_dcp_stream() {
+         producer = new DcpProducer(*engine, /*cookie*/NULL,
+                                    "test_producer", /*notifyOnly*/false);
+         stream = new MockActiveStream(engine, producer,
+                                       producer->getName(), /*flags*/0,
+                                       /*opaque*/0, vbid,
+                                       /*st_seqno*/0,
+                                       /*en_seqno*/~0,
+                                       /*vb_uuid*/0xabcd,
+                                       /*snap_start_seqno*/0,
+                                       /*snap_end_seqno*/~0);
+         vb0 = engine->getVBucket(0);
+         EXPECT_TRUE(vb0) << "Failed to get valid VBucket object for id 0";
 -    std::deque<queued_item> items;
++        EXPECT_FALSE(vb0->checkpointManager.registerCursor(
+                 producer->getName()))
+             << "Found an existing TAP cursor when attempting to register ours";
+     }
+     dcp_producer_t producer;
+     stream_t stream;
+     RCPtr<VBucket> vb0;
+ };
+ /* Regression test for MB-17766 - ensure that when an ActiveStream is preparing
+  * queued items to be sent out via a DCP consumer, that nextCheckpointItem()
+  * doesn't incorrectly return false (meaning that there are no more checkpoint
+  * items to send).
+  */
+ TEST_F(StreamTest, test_mb17766) {
+     // Add an item.
+     store_item(vbid, "key", "value");
+     setup_dcp_stream();
+     // Should start with nextCheckpointItem() returning true.
+     MockActiveStream* mock_stream = static_cast<MockActiveStream*>(stream.get());
+     EXPECT_TRUE(mock_stream->public_nextCheckpointItem())
+         << "nextCheckpointItem() should initially be true.";
++    std::vector<queued_item> items;
+     // Get the set of outstanding items
+     mock_stream->public_getOutstandingItems(vb0, items);
+     // REGRESSION CHECK: nextCheckpointItem() should still return true
+     EXPECT_TRUE(mock_stream->public_nextCheckpointItem())
+         << "nextCheckpointItem() after getting outstanding items should be true.";
+     // Process the set of items
+     mock_stream->public_processItems(items);
+     // Should finish with nextCheckpointItem() returning false.
+     EXPECT_FALSE(mock_stream->public_nextCheckpointItem())
+         << "nextCheckpointItem() after processing items should be false.";
+ }
+ //
+ // MB17653 test removed in 3.0.x backport, as MB not fixed in 3.0.x branch.
+ //
+ //
+ // test_mb18625 test removed in 3.0.x backport, as MB not fixed in 3.0.x branch.
+ //
+ class ConnectionTest : public DCPTest {};
+ ENGINE_ERROR_CODE mock_noop_return_engine_e2big(const void* cookie,uint32_t opaque) {
+     return ENGINE_E2BIG;
+ }
+ TEST_F(ConnectionTest, test_maybesendnoop_buffer_full) {
+     const void* cookie = create_mock_cookie();
+     // Create a Mock Dcp producer
+     MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+     struct dcp_message_producers producers = {NULL, NULL, NULL, NULL,
+         NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
+         mock_noop_return_engine_e2big, NULL, NULL};
+     producer.setNoopEnabled(true);
+     producer.setNoopSendTime(21);
+     ENGINE_ERROR_CODE ret = producer.maybeSendNoop(&producers);
+     EXPECT_EQ(ENGINE_E2BIG, ret)
+     << "maybeSendNoop not returning ENGINE_E2BIG";
+     EXPECT_FALSE(producer.getNoopPendingRecv())
+     << "Waiting for noop acknowledgement";
+     EXPECT_EQ(21, producer.getNoopSendTime())
+     << "SendTime has been updated";
+     destroy_mock_cookie(cookie);
+ }
+ TEST_F(ConnectionTest, test_maybesendnoop_send_noop) {
+     const void* cookie = create_mock_cookie();
+     // Create a Mock Dcp producer
+     MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+     dcp_message_producers* producers = get_dcp_producers();
+     producer.setNoopEnabled(true);
+     producer.setNoopSendTime(21);
+     ENGINE_ERROR_CODE ret = producer.maybeSendNoop(producers);
+     EXPECT_EQ(ENGINE_WANT_MORE, ret)
+     << "maybeSendNoop not returning ENGINE_WANT_MORE";
+     EXPECT_TRUE(producer.getNoopPendingRecv())
+     << "Not waiting for noop acknowledgement";
+     EXPECT_NE(21, producer.getNoopSendTime())
+     << "SendTime has not been updated";
+     destroy_mock_cookie(cookie);
+     free(producers);
+ }
+ TEST_F(ConnectionTest, test_maybesendnoop_noop_already_pending) {
+     const void* cookie = create_mock_cookie();
+     // Create a Mock Dcp producer
+     MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+     dcp_message_producers* producers = get_dcp_producers();
+     producer.setNoopEnabled(true);
+     producer.setNoopSendTime(21);
+     ENGINE_ERROR_CODE ret = producer.maybeSendNoop(producers);
+     EXPECT_EQ(ENGINE_WANT_MORE, ret)
+     << "maybeSendNoop not returning ENGINE_WANT_MORE";
+     EXPECT_TRUE(producer.getNoopPendingRecv())
+     << "Not awaiting noop acknowledgement";
+     EXPECT_NE(21, producer.getNoopSendTime())
+     << "SendTime has not been updated";
+     producer.setNoopSendTime(21);
+     ENGINE_ERROR_CODE ret2 = producer.maybeSendNoop(producers);
+     EXPECT_EQ(ENGINE_DISCONNECT, ret2)
+      << "maybeSendNoop not returning ENGINE_DISCONNECT";
+     EXPECT_TRUE(producer.getNoopPendingRecv())
+     << "Not waiting for noop acknowledgement";
+     EXPECT_EQ(21, producer.getNoopSendTime())
+     << "SendTime has been updated";
+     destroy_mock_cookie(cookie);
+     free(producers);
+ }
+ TEST_F(ConnectionTest, test_maybesendnoop_not_enabled) {
+     const void* cookie = create_mock_cookie();
+     // Create a Mock Dcp producer
+     MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+     dcp_message_producers* producers = get_dcp_producers();
+     producer.setNoopEnabled(false);
+     producer.setNoopSendTime(21);
+     ENGINE_ERROR_CODE ret = producer.maybeSendNoop(producers);
+     EXPECT_EQ(ENGINE_FAILED, ret)
+     << "maybeSendNoop not returning ENGINE_FAILED";
+     EXPECT_FALSE(producer.getNoopPendingRecv())
+     << "Waiting for noop acknowledgement";
+     EXPECT_EQ(21, producer.getNoopSendTime())
+     << "SendTime has been updated";
+     destroy_mock_cookie(cookie);
+     free(producers);
+ }
+ TEST_F(ConnectionTest, test_maybesendnoop_not_sufficient_time_passed) {
+     const void* cookie = create_mock_cookie();
+     // Create a Mock Dcp producer
+     MockDcpProducer producer(*engine, cookie, "test_producer", /*notifyOnly*/false);
+     dcp_message_producers* producers = get_dcp_producers();
+     producer.setNoopEnabled(true);
+     rel_time_t current_time = ep_current_time();
+     producer.setNoopSendTime(current_time);
+     ENGINE_ERROR_CODE ret = producer.maybeSendNoop(producers);
+     EXPECT_EQ(ENGINE_FAILED, ret)
+     << "maybeSendNoop not returning ENGINE_FAILED";
+     EXPECT_FALSE(producer.getNoopPendingRecv())
+     << "Waiting for noop acknowledgement";
+     EXPECT_EQ(current_time, producer.getNoopSendTime())
+     << "SendTime has been incremented";
+     destroy_mock_cookie(cookie);
+     free(producers);
+ }
+ TEST_F(ConnectionTest, test_deadConnections) {
+     MockDcpConnMap connMap(*engine);
+     connMap.initialize(DCP_CONN_NOTIFIER);
+     const void *cookie = create_mock_cookie();
+     // Create a new Dcp producer
+     dcp_producer_t producer = connMap.newProducer(cookie, "test_producer",
+                                     /*notifyOnly*/false);
+     // Disconnect the producer connection
+     connMap.disconnect(cookie);
+     EXPECT_EQ(1, connMap.getNumberOfDeadConnections())
+         << "Unexpected number of dead connections";
+     connMap.manageConnections();
+     // Should be zero deadConnections
+     EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
+         << "Dead connections still remain";
+ }
+ //
+ // test_mb17042* removed in 3.0.x backport, as MB not fixed in 3.0.x
+ //
+ TEST_F(ConnectionTest, test_update_of_last_message_time_in_consumer) {
+     const void* cookie = create_mock_cookie();
+     // Create a Mock Dcp consumer
+     MockDcpConsumer *consumer = new MockDcpConsumer(*engine, cookie, "test_consumer");
+     consumer->setLastMessageTime(1234);
+     consumer->addStream(/*opaque*/0, /*vbucket*/0, /*flags*/0);
+     EXPECT_NE(1234, consumer->getLastMessageTime())
+         << "lastMessagerTime not updated for addStream";
+     consumer->setLastMessageTime(1234);
+     consumer->closeStream(/*opaque*/0, /*vbucket*/0);
+     EXPECT_NE(1234, consumer->getLastMessageTime())
+         << "lastMessagerTime not updated for closeStream";
+     consumer->setLastMessageTime(1234);
+     consumer->streamEnd(/*opaque*/0, /*vbucket*/0, /*flags*/0);
+     EXPECT_NE(1234, consumer->getLastMessageTime())
+         << "lastMessagerTime not updated for streamEnd";
+     consumer->mutation(/*opaque*/0,
+                        /*key*/NULL,
+                        /*nkey*/0,
+                        /*value*/NULL,
+                        /*nvalue*/0,
+                        /*cas*/0,
+                        /*vbucket*/0,
+                        /*flags*/0,
+                        /*datatype*/0,
+                        /*locktime*/0,
+                        /*bySeqno*/0,
+                        /*revSeqno*/0,
+                        /*exprtime*/0,
+                        /*nru*/0,
+                        /*meta*/NULL,
+                        /*nmeta*/0);
+     EXPECT_NE(1234, consumer->getLastMessageTime())
+         << "lastMessagerTime not updated for mutation";
+     consumer->setLastMessageTime(1234);
+     consumer->deletion(/*opaque*/0,
+                        /*key*/NULL,
+                        /*nkey*/0,
+                        /*cas*/0,
+                        /*vbucket*/0,
+                        /*bySeqno*/0,
+                        /*revSeqno*/0,
+                        /*meta*/NULL,
+                        /*nmeta*/0);
+     EXPECT_NE(1234, consumer->getLastMessageTime())
+         << "lastMessagerTime not updated for deletion";
+     consumer->setLastMessageTime(1234);
+     consumer->expiration(/*opaque*/0,
+                          /*key*/NULL,
+                          /*nkey*/0,
+                          /*cas*/0,
+                          /*vbucket*/0,
+                          /*bySeqno*/0,
+                          /*revSeqno*/0,
+                          /*meta*/NULL,
+                          /*nmeta*/0);
+     EXPECT_NE(1234, consumer->getLastMessageTime())
+         << "lastMessagerTime not updated for expiration";
+     consumer->setLastMessageTime(1234);
+     consumer->snapshotMarker(/*opaque*/0,
+                              /*vbucket*/0,
+                              /*start_seqno*/0,
+                              /*end_seqno*/0,
+                              /*flags*/0);
+     EXPECT_NE(1234, consumer->getLastMessageTime())
+         << "lastMessagerTime not updated for snapshotMarker";
+     consumer->setLastMessageTime(1234);
+     consumer->noop(/*opaque*/0);
+     EXPECT_NE(1234, consumer->getLastMessageTime())
+         << "lastMessagerTime not updated for noop";
+     consumer->setLastMessageTime(1234);
+     consumer->flush(/*opaque*/0, /*vbucket*/0);
+     EXPECT_NE(1234, consumer->getLastMessageTime())
+         << "lastMessagerTime not updated for flush";
+     consumer->setLastMessageTime(1234);
+     consumer->setVBucketState(/*opaque*/0,
+                               /*vbucket*/0,
+                               /*state*/vbucket_state_active);
+     EXPECT_NE(1234, consumer->getLastMessageTime())
+         << "lastMessagerTime not updated for setVBucketState";
+     destroy_mock_cookie(cookie);
+ }
+ class NotifyTest : public DCPTest {
+ protected:
+     void SetUp() {
+         // The test is going to replace a server API method, we must
+         // be able to undo that
+         sapi = *get_mock_server_api();
+         scookie_api = *get_mock_server_api()->cookie;
+         DCPTest::SetUp();
+     }
+     void TearDown() {
+         // Reset the server_api for other tests
+         *get_mock_server_api() = sapi;
+         *get_mock_server_api()->cookie = scookie_api;
+         DCPTest::TearDown();
+     }
+     SERVER_HANDLE_V1 sapi;
+     SERVER_COOKIE_API scookie_api;
+     dcp_producer_t producer;
+     int callbacks;
+ };
+ class ConnMapNotifyTest {
+ public:
+     ConnMapNotifyTest(EventuallyPersistentEngine& engine)
+         : connMap(new MockDcpConnMap(engine)),
+           callbacks(0) {
+         connMap->initialize(DCP_CONN_NOTIFIER);
+         // Use 'this' instead of a mock cookie
+         producer = connMap->newProducer(static_cast<void*>(this),
+                                         "test_producer",
+                                         /*notifyOnly*/false);
+     }
+     ~ConnMapNotifyTest() {
+         delete connMap;
+     }
+     void notify() {
+         callbacks++;
+         connMap->notifyPausedConnection(producer.get(), /*schedule*/true);
+     }
+     int getCallbacks() {
+         return callbacks;
+     }
+     static void dcp_test_notify_io_complete(const void *cookie,
+                                             ENGINE_ERROR_CODE status) {
+         const ConnMapNotifyTest* notifyTest = reinterpret_cast<const ConnMapNotifyTest*>(cookie);
+         // 3. Call notifyPausedConnection again. We're now interleaved inside
+         //    of notifyAllPausedConnections, a second notification should occur.
+         const_cast<ConnMapNotifyTest*>(notifyTest)->notify();
+     }
+     MockDcpConnMap* connMap;
+     dcp_producer_t producer;
+ private:
+     int callbacks;
+ };
+ TEST_F(NotifyTest, test_mb19503_connmap_notify) {
+     ConnMapNotifyTest notifyTest(*engine);
+     // Hook into notify_io_complete
+     SERVER_COOKIE_API* scapi = get_mock_server_api()->cookie;
+     scapi->notify_io_complete = ConnMapNotifyTest::dcp_test_notify_io_complete;
+     // Should be 0 when we begin
+     ASSERT_EQ(0, notifyTest.getCallbacks());
+     ASSERT_TRUE(notifyTest.producer->isPaused());
+     ASSERT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
+     // 1. Call notifyPausedConnection with schedule = true
+     //    this will queue the producer
+     notifyTest.connMap->notifyPausedConnection(notifyTest.producer.get(),
+                                                /*schedule*/true);
+     EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
+     // 2. Call notifyAllPausedConnections this will invoke notifyIOComplete
+     //    which we've hooked into. For step 3 go to dcp_test_notify_io_complete
+     notifyTest.connMap->notifyAllPausedConnections();
+     // 2.1 One callback should of occurred, and we should still have one
+     //     notification pending (see dcp_test_notify_io_complete).
+     EXPECT_EQ(1, notifyTest.getCallbacks());
+     EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
+     // 4. Call notifyAllPausedConnections again, is there a new connection?
+     notifyTest.connMap->notifyAllPausedConnections();
+     // 5. There should of been 2 callbacks
+     EXPECT_EQ(2, notifyTest.getCallbacks());
+ }
+ // Variation on test_mb19503_connmap_notify - check that notification is correct
+ // when notifiable is not paused.
+ TEST_F(NotifyTest, test_mb19503_connmap_notify_paused) {
+     ConnMapNotifyTest notifyTest(*engine);
+     // Hook into notify_io_complete
+     SERVER_COOKIE_API* scapi = get_mock_server_api()->cookie;
+     scapi->notify_io_complete = ConnMapNotifyTest::dcp_test_notify_io_complete;
+     // Should be 0 when we begin
+     ASSERT_EQ(notifyTest.getCallbacks(), 0);
+     ASSERT_TRUE(notifyTest.producer->isPaused());
+     ASSERT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
+     // 1. Call notifyPausedConnection with schedule = true
+     //    this will queue the producer
+     notifyTest.connMap->notifyPausedConnection(notifyTest.producer.get(),
+                                                /*schedule*/true);
+     EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
+     // 2. Mark connection as not paused.
+     notifyTest.producer->setPaused(false);
+     // 3. Call notifyAllPausedConnections - as the connection is not paused
+     // this should *not* invoke notifyIOComplete.
+     notifyTest.connMap->notifyAllPausedConnections();
+     // 3.1 Should have not had any callbacks.
+     EXPECT_EQ(0, notifyTest.getCallbacks());
+     // 3.2 Should have no pending notifications.
+     EXPECT_EQ(0, notifyTest.connMap->getPendingNotifications().size());
+     // 4. Now mark the connection as paused.
+     ASSERT_FALSE(notifyTest.producer->isPaused());
+     notifyTest.producer->setPaused(true);
+     // 4. Add another notification - should queue the producer again.
+     notifyTest.connMap->notifyPausedConnection(notifyTest.producer.get(),
+                                                /*schedule*/true);
+     EXPECT_EQ(1, notifyTest.connMap->getPendingNotifications().size());
+     // 5. Call notifyAllPausedConnections a second time - as connection is
+     //    paused this time we *should* get a callback.
+     notifyTest.connMap->notifyAllPausedConnections();
+     EXPECT_EQ(1, notifyTest.getCallbacks());
+ }
index 0000000,2f299d6..e79a482
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,46 +1,47 @@@
+ /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+ /*
+  *     Copyright 2016 Couchbase, Inc
+  *
+  *   Licensed under the Apache License, Version 2.0 (the "License");
+  *   you may not use this file except in compliance with the License.
+  *   You may obtain a copy of the License at
+  *
+  *       http://www.apache.org/licenses/LICENSE-2.0
+  *
+  *   Unless required by applicable law or agreed to in writing, software
+  *   distributed under the License is distributed on an "AS IS" BASIS,
+  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  *   See the License for the specific language governing permissions and
+  *   limitations under the License.
+  */
+ /*
+  * Main function & globals for the ep_unit_test target.
+  */
+ #include <memcached/extension_loggers.h>
+ #include "programs/engine_testapp/mock_server.h"
+ #include <getopt.h>
+ #include <gtest/gtest.h>
+ #include "configuration.h"
+ #include "stored-value.h"
+ /* static storage for environment variable set by putenv(). */
+ static char allow_no_stats_env[] = "ALLOW_NO_STATS_UPDATE=yeah";
+ int main(int argc, char **argv) {
+     putenv(allow_no_stats_env);
+     init_mock_server(false);
++    mock_init_alloc_hooks();
+     // Default number of hashtable locks is too large for TSan to
+     // track. Use the value in configuration.json (47 at time of
+     // writing).
+     HashTable::setDefaultNumLocks(Configuration().getHtLocks());
+     ::testing::InitGoogleTest(&argc, argv);
+     return RUN_ALL_TESTS();
+ }
index 0000000,ef1408f..a49ab31
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,165 +1,165 @@@
 -    MockGlobalTask(EventuallyPersistentEngine* e, const Priority &p)
 -        : GlobalTask(e, p) {}
+ /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+ /*
+  *     Copyright 2016 Couchbase, Inc
+  *
+  *   Licensed under the Apache License, Version 2.0 (the "License");
+  *   you may not use this file except in compliance with the License.
+  *   You may obtain a copy of the License at
+  *
+  *       http://www.apache.org/licenses/LICENSE-2.0
+  *
+  *   Unless required by applicable law or agreed to in writing, software
+  *   distributed under the License is distributed on an "AS IS" BASIS,
+  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  *   See the License for the specific language governing permissions and
+  *   limitations under the License.
+  */
+ /*
+  * Unit tests for the EventuallyPersistentStore class.
+  *
+  * Note that these test do *not* have the normal Tasks running (BGFetcher,
+  * flusher etc) as we do not initialise EPEngine. This means that such tasks
+  * need to be manually run. This can be very helpful as it essentially gives us
+  * synchronous control of EPStore.
+  */
+ #include "evp_store_test.h"
+ #include "bgfetcher.h"
+ #include "checkpoint.h"
+ #include "checkpoint_remover.h"
+ #include "connmap.h"
+ #include "ep_engine.h"
+ #include "flusher.h"
+ #include "../mock/mock_dcp_producer.h"
+ #include "programs/engine_testapp/mock_server.h"
+ #include <platform/dirutils.h>
+ SynchronousEPEngine::SynchronousEPEngine(const std::string& extra_config)
+     : EventuallyPersistentEngine(get_mock_server_api) {
+     maxFailoverEntries = 1;
+     // Merge any extra config into the main configuration.
+     if (extra_config.size() > 0) {
+         if (!configuration.parseConfiguration(extra_config.c_str(),
+                                               serverApi)) {
+             throw std::invalid_argument("Unable to parse config string: " +
+                                         extra_config);
+         }
+     }
+     // workload is needed by EPStore's constructor (to construct the
+     // VBucketMap).
+     workload = new WorkLoadPolicy(/*workers*/1, /*shards*/1);
+     // dcpConnMap_ is needed by EPStore's constructor.
+     dcpConnMap_ = new DcpConnMap(*this);
+     // tapConnMap is needed by queueDirty.
+     tapConnMap = new TapConnMap(*this);
+     // checkpointConfig is needed by CheckpointManager (via EPStore).
+     checkpointConfig = new CheckpointConfig(*this);
+ }
+ void SynchronousEPEngine::setEPStore(EventuallyPersistentStore* store) {
+     cb_assert(epstore == NULL);
+     epstore = store;
+ }
+ MockEPStore::MockEPStore(EventuallyPersistentEngine &theEngine)
+     : EventuallyPersistentStore(theEngine) {
+     // Perform a limited set of setup (normally done by EPStore::initialize) -
+     // enough such that objects which are assumed to exist are present.
+     // Create the closed checkpoint removed task. Note we do _not_ schedule
+     // it, unlike EPStore::initialize
+     chkTask = new ClosedUnrefCheckpointRemoverTask
+             (&engine, stats, theEngine.getConfiguration().getChkRemoverStime());
+ }
+ VBucketMap& MockEPStore::getVbMap() {
+     return vbMap;
+ }
+ /* Mock Task class. Doesn't actually run() or snooze() - they both do nothing.
+  */
+ class MockGlobalTask : public GlobalTask {
+ public:
++    MockGlobalTask(EventuallyPersistentEngine* e, TaskId t)
++        : GlobalTask(e, t) {}
+     bool run() { return false; }
+     std::string getDescription() { return "MockGlobalTask"; }
+     void snooze(const double secs) {}
+ };
+ void EventuallyPersistentStoreTest::SetUp() {
+     // Paranoia - kill any existing files in case they are left over
+     // from a previous run.
+     CouchbaseDirectoryUtilities::rmrf(test_dbname);
+     // Add dbname to config string.
+     std::string config = config_string;
+     if (config.size() > 0) {
+         config += ";";
+     }
+     config += "dbname=" + std::string(test_dbname);
+     vbid = 0;
+     engine = new SynchronousEPEngine(config);
+     ObjectRegistry::onSwitchThread(engine);
+     store = new MockEPStore(*engine);
+     engine->setEPStore(store);
+     // Ensure that EPEngine is hold about necessary server callbacks
+     // (client disconnect, bucket delete).
+     engine->public_initializeEngineCallbacks();
+     // Need to initialize ep_real_time and friends.
+     initialize_time_functions(get_mock_server_api()->core);
+     cookie = create_mock_cookie();
+ }
+ void EventuallyPersistentStoreTest::TearDown() {
+     destroy_mock_cookie(cookie);
+     destroy_mock_event_callbacks();
+     engine->getDcpConnMap().manageConnections();
+     // Need to have the current engine valid before deleting (this is what
+     // EvpDestroy does normally; however we have a smart ptr to the engine
+     // so must delete via that).
+     ObjectRegistry::onSwitchThread(engine);
+     delete engine;
+     // Shutdown the ExecutorPool singleton (initialized when we create
+     // an EventuallyPersistentStore object). Must happen after engine
+     // has been destroyed (to allow the tasks the engine has
+     // registered a chance to be unregistered).
+     ExecutorPool::shutdown();
+ }
+ void EventuallyPersistentStoreTest::store_item(uint16_t vbid,
+                                                const std::string& key,
+                                                const std::string& value) {
+     Item item(key.c_str(), key.size(), /*flags*/0, /*exp*/0, value.c_str(),
+               value.size());
+     item.setVBucketId(vbid);
+     EXPECT_EQ(ENGINE_SUCCESS, store->set(item, NULL));
+ }
+ //
+ // EPStoreEvictionTest disabled in 3.0.x backport - there's an unknown
+ // bug where onSwitchThread() ends up NULL, meaning that we eventually hit
+ // an assert and crash.
+ //
+ const char EventuallyPersistentStoreTest::test_dbname[] = "ep_engine_ep_unit_tests_db";