#include "vbucket.h"
#include <gtest/gtest.h>
+#include <gmock/gmock.h>
#define NUM_TAP_THREADS 3
#define NUM_SET_THREADS 4
callback));
}
+ // Creates a new item with the given key and queues it into the checkpoint
+ // manager.
+ bool queueNewItem(const std::string& key) {
+
+ queued_item qi{new Item(key, vbucket->getId(), queue_op::set,
+ /*revSeq*/0, /*bySeq*/0)};
+ return manager->queueDirty(vbucket, qi,
+ GenerateBySeqno::Yes, GenerateCas::Yes);
+ }
+
+
EPStats global_stats;
CheckpointConfig checkpoint_config;
Configuration config;
}
TEST_F(CheckpointTest, reset_checkpoint_id) {
- std::shared_ptr<Callback<uint16_t> > cb(new DummyCB());
- RCPtr<VBucket> vbucket(new VBucket(0, vbucket_state_active, global_stats,
- checkpoint_config, NULL, 0, 0, 0, NULL,
- cb, config));
- CheckpointManager *manager =
- new CheckpointManager(global_stats, 0, checkpoint_config, 1, 0, 0, cb);
-
int i;
for (i = 0; i < 10; ++i) {
- std::stringstream key;
- key << "key-" << i;
- queued_item qi(new Item(key.str(), vbucket->getId(), queue_op::set,
- 0, 0));
- manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes);
+ EXPECT_TRUE(queueNewItem("key-" + std::to_string(i)));
}
- manager->createNewCheckpoint();
+ EXPECT_EQ(11, manager->getNumOpenChkItems());
+ EXPECT_EQ(10, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+ EXPECT_EQ(2, manager->createNewCheckpoint());
size_t itemPos;
- uint64_t chk = 1;
size_t lastMutationId = 0;
std::vector<queued_item> items;
const std::string cursor(CheckpointManager::pCursorName);
- manager->getAllItemsForCursor(cursor, items);
- for(itemPos = 0; itemPos < items.size(); ++itemPos) {
+ auto range = manager->getAllItemsForCursor(cursor, items);
+ EXPECT_EQ(0, range.start);
+ EXPECT_EQ(1010, range.end);
+ EXPECT_EQ(13, items.size());
+ EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
+ // Check that the next 10 items are all SET operations.
+ for(itemPos = 1; itemPos < 11; ++itemPos) {
queued_item qi = items.at(itemPos);
- if (qi->getOperation() != queue_op::checkpoint_start &&
- qi->getOperation() != queue_op::checkpoint_end) {
- size_t mid = qi->getBySeqno();
- EXPECT_GT(mid, lastMutationId);
- lastMutationId = qi->getBySeqno();
- }
- if (itemPos == 0 || itemPos == (items.size() - 1)) {
- EXPECT_EQ(queue_op::checkpoint_start, qi->getOperation()) << "For itemPos:" << itemPos;
- } else if (itemPos == (items.size() - 2)) {
- EXPECT_EQ(queue_op::checkpoint_end, qi->getOperation()) << "For itemPos:" << itemPos;
- chk++;
- } else {
- EXPECT_EQ(queue_op::set, qi->getOperation()) << "For itemPos:" << itemPos;
- }
+ EXPECT_EQ(queue_op::set, qi->getOperation());
+ size_t mid = qi->getBySeqno();
+ EXPECT_GT(mid, lastMutationId);
+ lastMutationId = qi->getBySeqno();
}
- EXPECT_EQ(13, items.size());
+
+ // Check that the following items are checkpoint end, followed by a
+ // checkpoint start.
+ EXPECT_EQ(queue_op::checkpoint_end, items.at(11)->getOperation());
+ EXPECT_EQ(queue_op::checkpoint_start, items.at(12)->getOperation());
+
items.clear();
manager->checkAndAddNewCheckpoint(1, vbucket);
- manager->getAllItemsForCursor(cursor, items);
+ range = manager->getAllItemsForCursor(cursor, items);
+ EXPECT_EQ(1001, range.start);
+ EXPECT_EQ(1010, range.end);
EXPECT_EQ(0, items.size());
-
- delete manager;
}
// Sanity check test fixture
}
// Should initially be zero items to persist.
EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+ // Check that the items fetched matches the number we were told to expect.
+ std::vector<queued_item> items;
+ auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
+ items);
+ EXPECT_EQ(0, result.start);
+ EXPECT_EQ(0, result.end);
+ EXPECT_EQ(1, items.size());
+ EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
}
+MATCHER_P(HasOperation, op, "") { return arg->getOperation() == op; }
+
// Basic test of a single, open checkpoint.
TEST_F(CheckpointTest, OneOpenCkpt) {
EXPECT_EQ(1003, qi3->getBySeqno());
EXPECT_EQ(0, qi3->getRevSeqno());
EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+ // Check that the items fetched matches the number we were told to expect.
+ std::vector<queued_item> items;
+ auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
+ items);
+ EXPECT_EQ(0, result.start);
+ EXPECT_EQ(1003, result.end);
+ EXPECT_EQ(3, items.size());
+ testing::ElementsAre(HasOperation(queue_op::checkpoint_start),
+ HasOperation(queue_op::set),
+ HasOperation(queue_op::set));
}
// Test with one open and one closed checkpoint.
// Add some items to the initial (open) checkpoint.
for (auto i : {1,2}) {
- queued_item qi(new Item("key" + std::to_string(i), vbucket->getId(),
- queue_op::set, /*revSeq*/0, /*bySeq*/0));
- EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+ EXPECT_TRUE(queueNewItem("key" + std::to_string(i)));
}
EXPECT_EQ(1, manager->getNumCheckpoints());
EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
- EXPECT_EQ(3, manager->getNumItems());
const uint64_t ckpt_id1 = manager->getOpenCheckpointId();
// Create a new checkpoint (closing the current open one).
// Add some items to the newly-opened checkpoint (note same keys as 1st
// ckpt).
for (auto ii : {1,2}) {
- queued_item qi(new Item("key" + std::to_string(ii), vbucket->getId(),
- queue_op::set, /*revSeq*/1, /*bySeq*/0));
- EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+ EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
}
EXPECT_EQ(2, manager->getNumCheckpoints());
EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
- EXPECT_EQ(3 + 4,
- manager->getNumItems()); // open items + 1x op_ckpt_start, 2x op_set, 1x op_ckpt_end
// Examine the items - should be 2 lots of two keys.
EXPECT_EQ(4, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+
+ // Check that the items fetched matches the number we were told to expect.
+ std::vector<queued_item> items;
+ auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
+ items);
+ EXPECT_EQ(0, result.start);
+ EXPECT_EQ(1004, result.end);
+ EXPECT_EQ(7, items.size());
+ EXPECT_THAT(items,
+ testing::ElementsAre(HasOperation(queue_op::checkpoint_start),
+ HasOperation(queue_op::set),
+ HasOperation(queue_op::set),
+ HasOperation(queue_op::checkpoint_end),
+ HasOperation(queue_op::checkpoint_start),
+ HasOperation(queue_op::set),
+ HasOperation(queue_op::set)));
}
// Test the automatic creation of checkpoints based on the number of items.
for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
EXPECT_EQ(ii + 1, manager->getNumOpenChkItems()); /* +1 for op_ckpt_start */
- qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
- queue_op::set, /*revSeq*/0, /*bySeq*/0));
- EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+ EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
EXPECT_EQ(1, manager->getNumCheckpoints());
}
// Add one more - should create a new checkpoint.
- qi.reset(new Item("key_epoch", vbucket->getId(), queue_op::set, /*revSeq*/0,
- /*bySeq*/0));
- EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+ EXPECT_TRUE(queueNewItem("key_epoch"));
EXPECT_EQ(2, manager->getNumCheckpoints());
EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_ckpt_start, 1x op_set
for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS - 1; ii++) {
EXPECT_EQ(ii + 2, manager->getNumOpenChkItems()); /* +2 op_ckpt_start, key_epoch */
- qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
- queue_op::set, /*revSeq*/1, /*bySeq*/0));
- EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+ EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
+
EXPECT_EQ(2, manager->getNumCheckpoints());
}
// Add one more - as we have hit maximum checkpoints should *not* create a
// new one.
- qi.reset(new Item("key_epoch2", vbucket->getId(), queue_op::set,
- /*revSeq*/1, /*bySeq*/0));
- EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+ EXPECT_TRUE(queueNewItem("key_epoch2"));
EXPECT_EQ(2, manager->getNumCheckpoints());
EXPECT_EQ(12, // 1x op_ckpt_start, 1x key_epoch, 9x key_X, 1x key_epoch2
manager->getNumOpenChkItems());
std::vector<queued_item> items;
range = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
items);
+
+ EXPECT_EQ(0, range.start);
+ EXPECT_EQ(1021, range.end);
+ EXPECT_EQ(24, items.size());
+
// Should still have the same number of checkpoints and open items.
EXPECT_EQ(2, manager->getNumCheckpoints());
EXPECT_EQ(12, manager->getNumOpenChkItems());
// But adding a new item will create a new one.
- qi.reset(new Item("key_epoch3", vbucket->getId(), queue_op::set,
- /*revSeq*/1, /*bySeq*/0));
- EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+ EXPECT_TRUE(queueNewItem("key_epoch3"));
EXPECT_EQ(3, manager->getNumCheckpoints());
EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_ckpt_start, 1x op_set
}
// Add two items to the initial (open) checkpoint.
for (auto i : {1,2}) {
- queued_item qi(new Item("key" + std::to_string(i), vbucket->getId(),
- queue_op::set, /*revSeq*/0, /*bySeq*/0));
- EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+ EXPECT_TRUE(queueNewItem("key" + std::to_string(i)));
}
EXPECT_EQ(1, manager->getNumCheckpoints());
EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
- EXPECT_EQ(3, manager->getNumItems());
// Use the existing persistence cursor for this test:
EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
// Check de-dupe counting - after adding another item with the same key,
// should still see two items.
- queued_item qi(new Item("key1", vbucket->getId(),
- queue_op::set, /*revSeq*/0, /*bySeq*/0));
- EXPECT_FALSE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes))
+ EXPECT_FALSE(queueNewItem("key1"))
<< "Adding a duplicate key to open checkpoint should not increase queue size";
EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
EXPECT_EQ(1, manager->getNumOpenChkItems())
<< "Expected 1 item (1x op_checkpoint_start)";
EXPECT_EQ(2, manager->getNumCheckpoints());
+ EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
+ << "Expected 2 items for cursor after creating new checkpoint";
// Advance cursor - first to get the 'checkpoint_start' meta item,
// and a second time to get the a 'proper' mutation.
auto item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
EXPECT_TRUE(item->isCheckPointMetaItem());
EXPECT_FALSE(isLastMutationItem);
+ EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
+ << "Expected 2 items for cursor after advancing one item";
item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
EXPECT_FALSE(item->isCheckPointMetaItem());
// Add two items to the newly-opened checkpoint. Same keys as 1st ckpt,
// but cannot de-dupe across checkpoints.
for (auto ii : {1,2}) {
- queued_item qi(new Item("key" + std::to_string(ii), vbucket->getId(),
- queue_op::set, /*revSeq*/1, /*bySeq*/0));
- EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+ EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
}
EXPECT_EQ(3, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
/* Add items such that we have 2 checkpoints */
queued_item qi;
for (unsigned int ii = 0; ii < 2 * MIN_CHECKPOINT_ITEMS; ii++) {
- qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
- queue_op::set, /*revSeq*/0, /*bySeq*/0));
- EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+ EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
}
/* Check if we have desired number of checkpoints and desired number of
Adding another would open new checkpoint */
queued_item qi;
for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
- qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
- queue_op::set, /*revSeq*/0, /*bySeq*/0));
- EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
+ EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
}
/* Check if we have desired number of checkpoints and desired number of
EXPECT_EQ(curr_open_chkpt_id + 1, manager->getOpenCheckpointId_UNLOCKED());
/* Get items for persistence cursor */
+ EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
+ << "Expected to have no normal (only meta) items";
items.clear();
manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
/* We should have got op_ckpt_start item */
EXPECT_EQ(1, items.size());
+ EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
/* Get items for DCP replication cursor */
+ EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
+ << "Expected to have no normal (only meta) items";
items.clear();
manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
/* Expecting only 1 op_ckpt_start item */
EXPECT_EQ(1, items.size());
+ EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
/* Get item for TAP cursor. We expect TAP to send op_ckpt_end of last
checkpoint. TAP unlike DCP cannot skip the op_ckpt_end message */
}
+// Test the checkpoint cursor movement for replica vBuckets (where we can
+// perform more checkpoint collapsing)
+TEST_F(CheckpointTest, CursorMovementReplicaMerge) {
+
+ vbucket->setState(vbucket_state_replica);
+
+ /* We want to have items across 2 checkpoints. Size down the default number
+ of items to create a new checkpoint and recreate the manager */
+ checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
+ MIN_CHECKPOINT_ITEMS,
+ /*numCheckpoints*/2,
+ /*itemBased*/true,
+ /*keepClosed*/false,
+ /*enableMerge*/true);
+
+ // Add items such that we have a checkpoint at half-capacity.
+ for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS / 2; ii++) {
+ EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
+ }
+
+ /* Check if we have desired number of checkpoints and desired number of
+ items */
+ EXPECT_EQ(1, manager->getNumCheckpoints());
+ // +1 for checkpoint_start.
+ EXPECT_EQ((MIN_CHECKPOINT_ITEMS / 2) + 1, manager->getNumOpenChkItems());
+
+ // Register DCP replication cursor, which will be moved into the middle of
+ // first checkpoint and then left there.
+ std::string dcp_cursor{DCP_CURSOR_PREFIX + std::to_string(1)};
+ manager->registerCursorBySeqno(dcp_cursor.c_str(), 0,
+ MustSendCheckpointEnd::NO);
+
+ std::vector<queued_item> items;
+ manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
+ EXPECT_EQ((MIN_CHECKPOINT_ITEMS / 2) + 1, items.size());
+
+ // Add more items so this checkpoint is now full.
+ for (unsigned int ii = MIN_CHECKPOINT_ITEMS / 2; ii < MIN_CHECKPOINT_ITEMS;
+ ii++) {
+ EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
+ }
+ EXPECT_EQ(1, manager->getNumCheckpoints())
+ << "Should still only have 1 checkpoint after adding "
+ "MIN_CHECKPOINT_ITEMS total";
+ EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, manager->getNumOpenChkItems());
+
+ /* Get items for persistence cursor - this will move the persistence cursor
+ * out of the initial checkpoint.
+ */
+ items.clear();
+ manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+
+ /* We should have got (MIN_CHECKPOINT_ITEMS + op_ckpt_start) items. */
+ EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
+
+ EXPECT_EQ(1, manager->getOpenCheckpointId_UNLOCKED());
+
+ // Create a new checkpoint.
+ EXPECT_EQ(2, manager->createNewCheckpoint());
+
+ // Add another MIN_CHECKPOINT_ITEMS. This should fill up the second
+ // checkpoint.
+ for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
+ EXPECT_TRUE(queueNewItem("keyB_" + std::to_string(ii)));
+ }
+
+ // Move the persistence cursor through these new items.
+ EXPECT_EQ(MIN_CHECKPOINT_ITEMS,
+ manager->getNumItemsForCursor(CheckpointManager::pCursorName));
+ items.clear();
+ manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+ EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
+
+ // Create a third checkpoint.
+ EXPECT_EQ(3, manager->createNewCheckpoint());
+
+ // Move persistence cursor into third checkpoint.
+ items.clear();
+ manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+ EXPECT_EQ(1, items.size())
+ << "Expected to get a single meta item";
+ EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
+
+ // We now have an unoccupied second checkpoint. We should be able to
+ // collapse this, and move the dcp_cursor into the merged checkpoint.
+ bool newCheckpointCreated;
+ manager->removeClosedUnrefCheckpoints(vbucket, newCheckpointCreated);
+
+ /* Get items for DCP cursor */
+ EXPECT_EQ(MIN_CHECKPOINT_ITEMS/2 + MIN_CHECKPOINT_ITEMS,
+ manager->getNumItemsForCursor(dcp_cursor))
+ << "DCP cursor remaining items should have been recalculated after "
+ "close of unref checkpoints.";
+
+ items.clear();
+ auto range = manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
+ EXPECT_EQ(1001, range.start);
+ EXPECT_EQ(1020, range.end);
+
+ // Check we have received correct items (done in chunks because
+ // EXPECT_THAT maxes out at 10 elements).
+ std::vector<queued_item> items_a(items.begin(), items.begin() + 5);
+ // Remainder of first checkpoint.
+ EXPECT_THAT(items_a, testing::Each(HasOperation(queue_op::set)));
+
+ // Second checkpoint's data- 10x set.
+ std::vector<queued_item> items_b(items.begin() + 5, items.begin() + 15);
+ EXPECT_THAT(items_b, testing::Each(HasOperation(queue_op::set)));
+
+ // end of second checkpoint and start of third.
+ std::vector<queued_item> items_c(items.begin() + 15, items.end());
+ EXPECT_THAT(items_c,
+ testing::ElementsAre(HasOperation(queue_op::checkpoint_end),
+ HasOperation(queue_op::checkpoint_start)));
+}
+
//
// It's critical that the HLC (CAS) is ordered with seqno generation
// otherwise XDCR may drop a newer bySeqno mutation because the CAS is not