MB-20852 [3/N]: checkpoint_test enhancements 13/69013/6
authorDave Rigby <daver@couchbase.com>
Mon, 10 Oct 2016 13:44:44 +0000 (14:44 +0100)
committerDave Rigby <daver@couchbase.com>
Mon, 31 Oct 2016 15:08:24 +0000 (15:08 +0000)
Tighten up existing checkpoint tests by adding additional EXPECT
checks.

Add a new CursorMovementReplicaMerge test which tests merging of
cursors on a replica VBucket.

Change-Id: Ia46812c2cd61eeea821a4b2ea46f645e1c66cf7e
Reviewed-on: http://review.couchbase.org/69013
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Jim Walker <jim@couchbase.com>
Reviewed-by: Trond Norbye <trond.norbye@gmail.com>
CMakeLists.txt
tests/module_tests/checkpoint_test.cc
tests/module_tests/dcp_test.cc

index 660c6ed..8a6ca91 100644 (file)
@@ -27,6 +27,7 @@ INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include
                            ${CMAKE_CURRENT_BINARY_DIR})
 
 INCLUDE_DIRECTORIES(AFTER
+                    ${gmock_SOURCE_DIR}/include
                     ${gtest_SOURCE_DIR}/include)
 
 CHECK_INCLUDE_FILES("alloca.h" HAVE_ALLOCA_H)
@@ -245,7 +246,7 @@ ADD_EXECUTABLE(ep-engine_ep_unit_tests
   ${FOREST_KVSTORE_SOURCE}
   $<TARGET_OBJECTS:memory_tracking>
   ${Memcached_SOURCE_DIR}/programs/engine_testapp/mock_server.cc)
-TARGET_LINK_LIBRARIES(ep-engine_ep_unit_tests couchstore cJSON dirutils forestdb gtest JSON_checker mcd_util platform
+TARGET_LINK_LIBRARIES(ep-engine_ep_unit_tests couchstore cJSON dirutils forestdb gmock gtest JSON_checker mcd_util platform
                       ${MALLOC_LIBRARIES})
 
 ADD_EXECUTABLE(ep-engine_atomic_ptr_test
index 3a554eb..44d2d1c 100644 (file)
@@ -27,6 +27,7 @@
 #include "vbucket.h"
 
 #include <gtest/gtest.h>
+#include <gmock/gmock.h>
 
 #define NUM_TAP_THREADS 3
 #define NUM_SET_THREADS 4
@@ -81,6 +82,17 @@ protected:
                                             callback));
     }
 
+    // Creates a new item with the given key and queues it into the checkpoint
+    // manager.
+    bool queueNewItem(const std::string& key) {
+
+        queued_item qi{new Item(key, vbucket->getId(), queue_op::set,
+                                /*revSeq*/0, /*bySeq*/0)};
+        return manager->queueDirty(vbucket, qi,
+                                   GenerateBySeqno::Yes, GenerateCas::Yes);
+    }
+
+
     EPStats global_stats;
     CheckpointConfig checkpoint_config;
     Configuration config;
@@ -288,54 +300,45 @@ TEST_F(CheckpointTest, basic_chk_test) {
 }
 
 TEST_F(CheckpointTest, reset_checkpoint_id) {
-    std::shared_ptr<Callback<uint16_t> > cb(new DummyCB());
-    RCPtr<VBucket> vbucket(new VBucket(0, vbucket_state_active, global_stats,
-                                       checkpoint_config, NULL, 0, 0, 0, NULL,
-                                       cb, config));
-    CheckpointManager *manager =
-        new CheckpointManager(global_stats, 0, checkpoint_config, 1, 0, 0, cb);
-
     int i;
     for (i = 0; i < 10; ++i) {
-        std::stringstream key;
-        key << "key-" << i;
-        queued_item qi(new Item(key.str(), vbucket->getId(), queue_op::set,
-                                0, 0));
-        manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes);
+        EXPECT_TRUE(queueNewItem("key-" + std::to_string(i)));
     }
-    manager->createNewCheckpoint();
+    EXPECT_EQ(11, manager->getNumOpenChkItems());
+    EXPECT_EQ(10, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+    EXPECT_EQ(2, manager->createNewCheckpoint());
 
     size_t itemPos;
-    uint64_t chk = 1;
     size_t lastMutationId = 0;
     std::vector<queued_item> items;
     const std::string cursor(CheckpointManager::pCursorName);
-    manager->getAllItemsForCursor(cursor, items);
-    for(itemPos = 0; itemPos < items.size(); ++itemPos) {
+    auto range = manager->getAllItemsForCursor(cursor, items);
+    EXPECT_EQ(0, range.start);
+    EXPECT_EQ(1010, range.end);
+    EXPECT_EQ(13, items.size());
+    EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
+    // Check that the next 10 items are all SET operations.
+    for(itemPos = 1; itemPos < 11; ++itemPos) {
         queued_item qi = items.at(itemPos);
-        if (qi->getOperation() != queue_op::checkpoint_start &&
-            qi->getOperation() != queue_op::checkpoint_end) {
-            size_t mid = qi->getBySeqno();
-            EXPECT_GT(mid, lastMutationId);
-            lastMutationId = qi->getBySeqno();
-        }
-        if (itemPos == 0 || itemPos == (items.size() - 1)) {
-            EXPECT_EQ(queue_op::checkpoint_start, qi->getOperation()) << "For itemPos:" << itemPos;
-        } else if (itemPos == (items.size() - 2)) {
-            EXPECT_EQ(queue_op::checkpoint_end, qi->getOperation()) << "For itemPos:" << itemPos;
-            chk++;
-        } else {
-            EXPECT_EQ(queue_op::set, qi->getOperation()) << "For itemPos:" << itemPos;
-        }
+        EXPECT_EQ(queue_op::set, qi->getOperation());
+        size_t mid = qi->getBySeqno();
+        EXPECT_GT(mid, lastMutationId);
+        lastMutationId = qi->getBySeqno();
     }
-    EXPECT_EQ(13, items.size());
+
+    // Check that the following items are checkpoint end, followed by a
+    // checkpoint start.
+    EXPECT_EQ(queue_op::checkpoint_end, items.at(11)->getOperation());
+    EXPECT_EQ(queue_op::checkpoint_start, items.at(12)->getOperation());
+
     items.clear();
 
     manager->checkAndAddNewCheckpoint(1, vbucket);
-    manager->getAllItemsForCursor(cursor, items);
+    range = manager->getAllItemsForCursor(cursor, items);
+    EXPECT_EQ(1001, range.start);
+    EXPECT_EQ(1010, range.end);
     EXPECT_EQ(0, items.size());
-
-    delete manager;
 }
 
 // Sanity check test fixture
@@ -349,8 +352,19 @@ TEST_F(CheckpointTest, CheckFixture) {
     }
     // Should initially be zero items to persist.
     EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+    // Check that the items fetched matches the number we were told to expect.
+    std::vector<queued_item> items;
+    auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
+                                                items);
+    EXPECT_EQ(0, result.start);
+    EXPECT_EQ(0, result.end);
+    EXPECT_EQ(1, items.size());
+    EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
 }
 
+MATCHER_P(HasOperation, op, "") { return arg->getOperation() == op; }
+
 // Basic test of a single, open checkpoint.
 TEST_F(CheckpointTest, OneOpenCkpt) {
 
@@ -386,6 +400,17 @@ TEST_F(CheckpointTest, OneOpenCkpt) {
     EXPECT_EQ(1003, qi3->getBySeqno());
     EXPECT_EQ(0, qi3->getRevSeqno());
     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+    // Check that the items fetched matches the number we were told to expect.
+    std::vector<queued_item> items;
+    auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
+                                                items);
+    EXPECT_EQ(0, result.start);
+    EXPECT_EQ(1003, result.end);
+    EXPECT_EQ(3, items.size());
+    testing::ElementsAre(HasOperation(queue_op::checkpoint_start),
+                         HasOperation(queue_op::set),
+                         HasOperation(queue_op::set));
 }
 
 // Test with one open and one closed checkpoint.
@@ -393,13 +418,10 @@ TEST_F(CheckpointTest, OneOpenOneClosed) {
 
     // Add some items to the initial (open) checkpoint.
     for (auto i : {1,2}) {
-        queued_item qi(new Item("key" + std::to_string(i), vbucket->getId(),
-                                queue_op::set, /*revSeq*/0, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+        EXPECT_TRUE(queueNewItem("key" + std::to_string(i)));
     }
     EXPECT_EQ(1, manager->getNumCheckpoints());
     EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
-    EXPECT_EQ(3, manager->getNumItems());
     const uint64_t ckpt_id1 = manager->getOpenCheckpointId();
 
     // Create a new checkpoint (closing the current open one).
@@ -411,17 +433,29 @@ TEST_F(CheckpointTest, OneOpenOneClosed) {
     // Add some items to the newly-opened checkpoint (note same keys as 1st
     // ckpt).
     for (auto ii : {1,2}) {
-        queued_item qi(new Item("key" + std::to_string(ii), vbucket->getId(),
-                                queue_op::set, /*revSeq*/1, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+        EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
     }
     EXPECT_EQ(2, manager->getNumCheckpoints());
     EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
-    EXPECT_EQ(3 + 4,
-              manager->getNumItems()); // open items + 1x op_ckpt_start, 2x op_set, 1x op_ckpt_end
 
     // Examine the items - should be 2 lots of two keys.
     EXPECT_EQ(4, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+    // Check that the items fetched matches the number we were told to expect.
+    std::vector<queued_item> items;
+    auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
+                                                items);
+    EXPECT_EQ(0, result.start);
+    EXPECT_EQ(1004, result.end);
+    EXPECT_EQ(7, items.size());
+    EXPECT_THAT(items,
+                testing::ElementsAre(HasOperation(queue_op::checkpoint_start),
+                                     HasOperation(queue_op::set),
+                                     HasOperation(queue_op::set),
+                                     HasOperation(queue_op::checkpoint_end),
+                                     HasOperation(queue_op::checkpoint_start),
+                                     HasOperation(queue_op::set),
+                                     HasOperation(queue_op::set)));
 }
 
 // Test the automatic creation of checkpoints based on the number of items.
@@ -447,17 +481,13 @@ TEST_F(CheckpointTest, ItemBasedCheckpointCreation) {
     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
         EXPECT_EQ(ii + 1, manager->getNumOpenChkItems()); /* +1 for op_ckpt_start */
 
-        qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
-                          queue_op::set, /*revSeq*/0, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+        EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
         EXPECT_EQ(1, manager->getNumCheckpoints());
 
     }
 
     // Add one more - should create a new checkpoint.
-    qi.reset(new Item("key_epoch", vbucket->getId(), queue_op::set, /*revSeq*/0,
-                      /*bySeq*/0));
-    EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+    EXPECT_TRUE(queueNewItem("key_epoch"));
     EXPECT_EQ(2, manager->getNumCheckpoints());
     EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_ckpt_start, 1x op_set
 
@@ -465,17 +495,14 @@ TEST_F(CheckpointTest, ItemBasedCheckpointCreation) {
     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS - 1; ii++) {
         EXPECT_EQ(ii + 2, manager->getNumOpenChkItems()); /* +2 op_ckpt_start, key_epoch */
 
-        qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
-                                queue_op::set, /*revSeq*/1, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+        EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
+
         EXPECT_EQ(2, manager->getNumCheckpoints());
     }
 
     // Add one more - as we have hit maximum checkpoints should *not* create a
     // new one.
-    qi.reset(new Item("key_epoch2", vbucket->getId(), queue_op::set,
-                      /*revSeq*/1, /*bySeq*/0));
-    EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+    EXPECT_TRUE(queueNewItem("key_epoch2"));
     EXPECT_EQ(2, manager->getNumCheckpoints());
     EXPECT_EQ(12, // 1x op_ckpt_start, 1x key_epoch, 9x key_X, 1x key_epoch2
               manager->getNumOpenChkItems());
@@ -488,14 +515,17 @@ TEST_F(CheckpointTest, ItemBasedCheckpointCreation) {
     std::vector<queued_item> items;
     range = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
                                          items);
+
+    EXPECT_EQ(0, range.start);
+    EXPECT_EQ(1021, range.end);
+    EXPECT_EQ(24, items.size());
+
     // Should still have the same number of checkpoints and open items.
     EXPECT_EQ(2, manager->getNumCheckpoints());
     EXPECT_EQ(12, manager->getNumOpenChkItems());
 
     // But adding a new item will create a new one.
-    qi.reset(new Item("key_epoch3", vbucket->getId(), queue_op::set,
-                      /*revSeq*/1, /*bySeq*/0));
-    EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+    EXPECT_TRUE(queueNewItem("key_epoch3"));
     EXPECT_EQ(3, manager->getNumCheckpoints());
     EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_ckpt_start, 1x op_set
 }
@@ -506,13 +536,10 @@ TEST_F(CheckpointTest, CursorOffsetOnCheckpointClose) {
 
     // Add two items to the initial (open) checkpoint.
     for (auto i : {1,2}) {
-        queued_item qi(new Item("key" + std::to_string(i), vbucket->getId(),
-                                queue_op::set, /*revSeq*/0, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+        EXPECT_TRUE(queueNewItem("key" + std::to_string(i)));
     }
     EXPECT_EQ(1, manager->getNumCheckpoints());
     EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
-    EXPECT_EQ(3, manager->getNumItems());
 
     // Use the existing persistence cursor for this test:
     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
@@ -520,9 +547,7 @@ TEST_F(CheckpointTest, CursorOffsetOnCheckpointClose) {
 
     // Check de-dupe counting - after adding another item with the same key,
     // should still see two items.
-    queued_item qi(new Item("key1", vbucket->getId(),
-                            queue_op::set, /*revSeq*/0, /*bySeq*/0));
-    EXPECT_FALSE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes))
+    EXPECT_FALSE(queueNewItem("key1"))
         << "Adding a duplicate key to open checkpoint should not increase queue size";
 
     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
@@ -533,6 +558,8 @@ TEST_F(CheckpointTest, CursorOffsetOnCheckpointClose) {
     EXPECT_EQ(1, manager->getNumOpenChkItems())
         << "Expected 1 item (1x op_checkpoint_start)";
     EXPECT_EQ(2, manager->getNumCheckpoints());
+    EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
+        << "Expected 2 items for cursor after creating new checkpoint";
 
     // Advance cursor - first to get the 'checkpoint_start' meta item,
     // and a second time to get the a 'proper' mutation.
@@ -540,6 +567,8 @@ TEST_F(CheckpointTest, CursorOffsetOnCheckpointClose) {
     auto item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
     EXPECT_TRUE(item->isCheckPointMetaItem());
     EXPECT_FALSE(isLastMutationItem);
+    EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
+        << "Expected 2 items for cursor after advancing one item";
 
     item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
     EXPECT_FALSE(item->isCheckPointMetaItem());
@@ -550,9 +579,7 @@ TEST_F(CheckpointTest, CursorOffsetOnCheckpointClose) {
     // Add two items to the newly-opened checkpoint. Same keys as 1st ckpt,
     // but cannot de-dupe across checkpoints.
     for (auto ii : {1,2}) {
-        queued_item qi(new Item("key" + std::to_string(ii), vbucket->getId(),
-                                queue_op::set, /*revSeq*/1, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+        EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
     }
 
     EXPECT_EQ(3, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
@@ -617,9 +644,7 @@ TEST_F(CheckpointTest, ItemsForCheckpointCursor) {
     /* Add items such that we have 2 checkpoints */
     queued_item qi;
     for (unsigned int ii = 0; ii < 2 * MIN_CHECKPOINT_ITEMS; ii++) {
-        qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
-                          queue_op::set, /*revSeq*/0, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+        EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
     }
 
     /* Check if we have desired number of checkpoints and desired number of
@@ -668,9 +693,7 @@ TEST_F(CheckpointTest, CursorMovement) {
        Adding another would open new checkpoint */
     queued_item qi;
     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
-        qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
-                          queue_op::set, /*revSeq*/0, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+        EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
     }
 
     /* Check if we have desired number of checkpoints and desired number of
@@ -720,17 +743,23 @@ TEST_F(CheckpointTest, CursorMovement) {
     EXPECT_EQ(curr_open_chkpt_id + 1, manager->getOpenCheckpointId_UNLOCKED());
 
     /* Get items for persistence cursor */
+    EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
+        << "Expected to have no normal (only meta) items";
     items.clear();
     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
 
     /* We should have got op_ckpt_start item */
     EXPECT_EQ(1, items.size());
+    EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
 
     /* Get items for DCP replication cursor */
+    EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
+        << "Expected to have no normal (only meta) items";
     items.clear();
     manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
     /* Expecting only 1 op_ckpt_start item */
     EXPECT_EQ(1, items.size());
+    EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
 
     /* Get item for TAP cursor. We expect TAP to send op_ckpt_end of last
        checkpoint. TAP unlike DCP cannot skip the op_ckpt_end message */
@@ -741,6 +770,122 @@ TEST_F(CheckpointTest, CursorMovement) {
 
 }
 
+// Test the checkpoint cursor movement for replica vBuckets (where we can
+// perform more checkpoint collapsing)
+TEST_F(CheckpointTest, CursorMovementReplicaMerge) {
+
+    vbucket->setState(vbucket_state_replica);
+
+    /* We want to have items across 2 checkpoints. Size down the default number
+     of items to create a new checkpoint and recreate the manager */
+    checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
+                                         MIN_CHECKPOINT_ITEMS,
+                                         /*numCheckpoints*/2,
+                                         /*itemBased*/true,
+                                         /*keepClosed*/false,
+                                         /*enableMerge*/true);
+
+    // Add items such that we have a checkpoint at half-capacity.
+    for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS / 2; ii++) {
+        EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
+    }
+
+    /* Check if we have desired number of checkpoints and desired number of
+        items */
+    EXPECT_EQ(1, manager->getNumCheckpoints());
+    // +1 for checkpoint_start.
+    EXPECT_EQ((MIN_CHECKPOINT_ITEMS / 2) + 1, manager->getNumOpenChkItems());
+
+    // Register DCP replication cursor, which will be moved into the middle of
+    // first checkpoint and then left there.
+    std::string dcp_cursor{DCP_CURSOR_PREFIX + std::to_string(1)};
+    manager->registerCursorBySeqno(dcp_cursor.c_str(), 0,
+                                   MustSendCheckpointEnd::NO);
+
+    std::vector<queued_item> items;
+    manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
+    EXPECT_EQ((MIN_CHECKPOINT_ITEMS / 2) + 1, items.size());
+
+    // Add more items so this checkpoint is now full.
+    for (unsigned int ii = MIN_CHECKPOINT_ITEMS / 2; ii < MIN_CHECKPOINT_ITEMS;
+            ii++) {
+        EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
+    }
+    EXPECT_EQ(1, manager->getNumCheckpoints())
+        << "Should still only have 1 checkpoint after adding "
+                "MIN_CHECKPOINT_ITEMS total";
+    EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, manager->getNumOpenChkItems());
+
+    /* Get items for persistence cursor - this will move the persistence cursor
+     * out of the initial checkpoint.
+     */
+    items.clear();
+    manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+
+    /* We should have got (MIN_CHECKPOINT_ITEMS + op_ckpt_start) items. */
+    EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
+
+    EXPECT_EQ(1, manager->getOpenCheckpointId_UNLOCKED());
+
+    // Create a new checkpoint.
+    EXPECT_EQ(2, manager->createNewCheckpoint());
+
+    // Add another MIN_CHECKPOINT_ITEMS. This should fill up the second
+    // checkpoint.
+    for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
+        EXPECT_TRUE(queueNewItem("keyB_" + std::to_string(ii)));
+    }
+
+    // Move the persistence cursor through these new items.
+    EXPECT_EQ(MIN_CHECKPOINT_ITEMS,
+              manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+    items.clear();
+    manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+    EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
+
+    // Create a third checkpoint.
+    EXPECT_EQ(3, manager->createNewCheckpoint());
+
+    // Move persistence cursor into third checkpoint.
+    items.clear();
+    manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+    EXPECT_EQ(1, items.size())
+        << "Expected to get a single meta item";
+    EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
+
+    // We now have an unoccupied second checkpoint. We should be able to
+    // collapse this, and move the dcp_cursor into the merged checkpoint.
+    bool newCheckpointCreated;
+    manager->removeClosedUnrefCheckpoints(vbucket, newCheckpointCreated);
+
+    /* Get items for DCP cursor */
+    EXPECT_EQ(MIN_CHECKPOINT_ITEMS/2 + MIN_CHECKPOINT_ITEMS,
+              manager->getNumItemsForCursor(dcp_cursor))
+        << "DCP cursor remaining items should have been recalculated after "
+                "close of unref checkpoints.";
+
+    items.clear();
+    auto range = manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
+    EXPECT_EQ(1001, range.start);
+    EXPECT_EQ(1020, range.end);
+
+    // Check we have received correct items (done in chunks because
+    // EXPECT_THAT maxes out at 10 elements).
+    std::vector<queued_item> items_a(items.begin(), items.begin() + 5);
+    // Remainder of first checkpoint.
+    EXPECT_THAT(items_a, testing::Each(HasOperation(queue_op::set)));
+
+    // Second checkpoint's data- 10x set.
+    std::vector<queued_item> items_b(items.begin() + 5, items.begin() + 15);
+    EXPECT_THAT(items_b, testing::Each(HasOperation(queue_op::set)));
+
+    // end of second checkpoint and start of third.
+    std::vector<queued_item> items_c(items.begin() + 15, items.end());
+    EXPECT_THAT(items_c,
+                testing::ElementsAre(HasOperation(queue_op::checkpoint_end),
+                                     HasOperation(queue_op::checkpoint_start)));
+}
+
 //
 // It's critical that the HLC (CAS) is ordered with seqno generation
 // otherwise XDCR may drop a newer bySeqno mutation because the CAS is not
index c5eb313..519b6aa 100644 (file)
@@ -185,6 +185,11 @@ TEST_F(StreamTest, test_mb17766) {
 // by de-duplication.
 TEST_F(StreamTest, MB17653_ItemsRemaining) {
 
+    auto& manager = engine->getEpStore()->getVBucket(vbid)->checkpointManager;
+
+    ASSERT_EQ(1, manager.getNumOpenChkItems())
+        << "Expected one item before population (checkpoint_start)";
+
     // Create 10 mutations to the same key which, while increasing the high
     // seqno by 10 will result in de-duplication and hence only one actual
     // mutation being added to the checkpoint items.
@@ -193,6 +198,9 @@ TEST_F(StreamTest, MB17653_ItemsRemaining) {
         store_item(vbid, "key", "value");
     }
 
+    ASSERT_EQ(2, manager.getNumOpenChkItems())
+        << "Expected 2 items after population (checkpoint_start & set)";
+
     setup_dcp_stream();
 
     // Should start with one item remaining.