1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2011 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
25 #include "checkpoint.h"
29 #include <gtest/gtest.h>
30 #include <gmock/gmock.h>
31 #include <valgrind/valgrind.h>
33 #define NUM_TAP_THREADS 3
34 #define NUM_TAP_THREADS_VG 2
35 #define NUM_SET_THREADS 4
36 #define NUM_SET_THREADS_VG 2
39 #define NUM_ITEMS_VG 10
41 #define DCP_CURSOR_PREFIX "dcp-client-"
42 #define TAP_CURSOR_PREFIX "tap-client-"
47 * Dummy callback to replace the flusher callback.
49 class DummyCB: public Callback<uint16_t> {
53 void callback(uint16_t &dummy) {
58 // Test fixture for Checkpoint tests. Once constructed provides a checkpoint
59 // manager and single vBucket (VBID 0).
60 class CheckpointTest : public ::testing::Test {
63 : callback(new DummyCB()),
64 vbucket(new VBucket(0, vbucket_state_active, global_stats,
65 checkpoint_config, /*kvshard*/NULL,
66 /*lastSeqno*/1000, /*lastSnapStart*/0,
67 /*lastSnapEnd*/0, /*table*/NULL,
72 void createManager() {
73 manager.reset(new CheckpointManager(global_stats, vbucket->getId(),
76 /*lastSnapStart*/0,/*lastSnapEnd*/0,
80 // Creates a new item with the given key and queues it into the checkpoint
82 bool queueNewItem(const std::string& key) {
84 queued_item qi{new Item(key, vbucket->getId(), queue_op::set,
85 /*revSeq*/0, /*bySeq*/0)};
86 return manager->queueDirty(*vbucket, qi,
87 GenerateBySeqno::Yes, GenerateCas::Yes);
92 CheckpointConfig checkpoint_config;
94 std::shared_ptr<Callback<uint16_t> > callback;
95 RCPtr<VBucket> vbucket;
96 std::unique_ptr<CheckpointManager> manager;
100 RCPtr<VBucket> vbucket;
101 CheckpointManager *checkpoint_manager;
107 * atomically increment a threadCount
108 * if the calling thread is the last one up, notify_all
109 * if the calling thread is not the last one up, wait (in the function)
111 static void thread_up(struct thread_args* args) {
112 static int threadCount = 0;
114 static std::condition_variable cv;
115 std::unique_lock<std::mutex> lh(m);
116 if (++threadCount != args->n_threads) {
117 cv.wait(lh, [args](){return threadCount == args->n_threads;});
119 cv.notify_all(); // all threads accounted for, begin
124 static void launch_persistence_thread(void *arg) {
125 struct thread_args *args = static_cast<struct thread_args *>(arg);
131 std::vector<queued_item> items;
132 const std::string cursor(CheckpointManager::pCursorName);
133 args->checkpoint_manager->getAllItemsForCursor(cursor, items);
134 for(itemPos = 0; itemPos < items.size(); ++itemPos) {
135 queued_item qi = items.at(itemPos);
136 if (qi->getOperation() == queue_op::flush) {
142 // Checkpoint start and end operations may have been introduced in
143 // the items queue after the "flush" operation was added. Ignore
144 // these. Anything else will be considered an error.
145 for(size_t i = itemPos + 1; i < items.size(); ++i) {
146 queued_item qi = items.at(i);
147 EXPECT_TRUE(queue_op::checkpoint_start == qi->getOperation() ||
148 queue_op::checkpoint_end == qi->getOperation())
149 << "Unexpected operation:" << to_string(qi->getOperation());
157 static void launch_tap_client_thread(void *arg) {
158 struct thread_args *args = static_cast<struct thread_args *>(arg);
162 bool isLastItem = false;
164 queued_item qi = args->checkpoint_manager->nextItem(args->name,
166 if (qi->getOperation() == queue_op::flush) {
174 static void launch_checkpoint_cleanup_thread(void *arg) {
175 struct thread_args *args = static_cast<struct thread_args *>(arg);
178 while (args->checkpoint_manager->getNumOfCursors() > 1) {
179 bool newCheckpointCreated;
180 args->checkpoint_manager->removeClosedUnrefCheckpoints(args->vbucket,
181 newCheckpointCreated);
185 static void launch_set_thread(void *arg) {
186 struct thread_args *args = static_cast<struct thread_args *>(arg);
190 const int item_count = RUNNING_ON_VALGRIND ? NUM_ITEMS_VG : NUM_ITEMS;
191 for (i = 0; i < item_count; ++i) {
192 std::stringstream key;
194 queued_item qi(new Item(key.str(), args->vbucket->getId(),
195 queue_op::set, 0, 0));
196 args->checkpoint_manager->queueDirty(*args->vbucket, qi,
197 GenerateBySeqno::Yes,
203 TEST_F(CheckpointTest, basic_chk_test) {
204 std::shared_ptr<Callback<uint16_t> > cb(new DummyCB());
205 RCPtr<VBucket> vbucket(new VBucket(0, vbucket_state_active, global_stats,
206 checkpoint_config, NULL, 0, 0, 0, NULL,
209 CheckpointManager *checkpoint_manager = new CheckpointManager(global_stats, 0,
213 const int n_set_threads = RUNNING_ON_VALGRIND ? NUM_SET_THREADS_VG :
216 const int n_tap_threads = RUNNING_ON_VALGRIND ? NUM_TAP_THREADS_VG :
219 std::vector<cb_thread_t> tap_threads(n_tap_threads);
220 std::vector<cb_thread_t> set_threads(n_set_threads);
221 cb_thread_t persistence_thread;
222 cb_thread_t checkpoint_cleanup_thread;
225 struct thread_args t_args;
226 t_args.checkpoint_manager = checkpoint_manager;
227 t_args.vbucket = vbucket;
228 t_args.n_threads = n_set_threads + n_tap_threads + 2;
230 std::vector<struct thread_args> tap_t_args(n_tap_threads);
231 for (i = 0; i < n_tap_threads; ++i) {
232 std::string name(TAP_CURSOR_PREFIX + std::to_string(i));
233 tap_t_args[i].checkpoint_manager = checkpoint_manager;
234 tap_t_args[i].vbucket = vbucket;
235 tap_t_args[i].name = name;
236 tap_t_args[i].n_threads = n_set_threads + n_tap_threads + 2;
237 checkpoint_manager->registerCursor(name, 1, false,
238 MustSendCheckpointEnd::YES);
241 rc = cb_create_thread(&persistence_thread, launch_persistence_thread, &t_args, 0);
244 rc = cb_create_thread(&checkpoint_cleanup_thread,
245 launch_checkpoint_cleanup_thread, &t_args, 0);
248 for (i = 0; i < n_tap_threads; ++i) {
249 rc = cb_create_thread(&tap_threads[i], launch_tap_client_thread, &tap_t_args[i], 0);
253 for (i = 0; i < n_set_threads; ++i) {
254 rc = cb_create_thread(&set_threads[i], launch_set_thread, &t_args, 0);
258 for (i = 0; i < n_set_threads; ++i) {
259 rc = cb_join_thread(set_threads[i]);
263 // Push the flush command into the queue so that all other threads can be terminated.
264 std::string key("flush");
265 queued_item qi(new Item(key, vbucket->getId(), queue_op::flush, 0xffff, 0));
266 checkpoint_manager->queueDirty(*vbucket, qi, GenerateBySeqno::Yes,
269 rc = cb_join_thread(persistence_thread);
272 for (i = 0; i < n_tap_threads; ++i) {
273 rc = cb_join_thread(tap_threads[i]);
275 std::stringstream name;
276 name << "tap-client-" << i;
277 checkpoint_manager->removeCursor(name.str());
280 rc = cb_join_thread(checkpoint_cleanup_thread);
283 delete checkpoint_manager;
286 TEST_F(CheckpointTest, reset_checkpoint_id) {
288 for (i = 0; i < 10; ++i) {
289 EXPECT_TRUE(queueNewItem("key-" + std::to_string(i)));
291 EXPECT_EQ(11, manager->getNumOpenChkItems());
292 EXPECT_EQ(10, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
294 EXPECT_EQ(2, manager->createNewCheckpoint());
297 size_t lastMutationId = 0;
298 std::vector<queued_item> items;
299 const std::string cursor(CheckpointManager::pCursorName);
300 auto range = manager->getAllItemsForCursor(cursor, items);
301 EXPECT_EQ(0, range.start);
302 EXPECT_EQ(1010, range.end);
303 EXPECT_EQ(13, items.size());
304 EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
305 // Check that the next 10 items are all SET operations.
306 for(itemPos = 1; itemPos < 11; ++itemPos) {
307 queued_item qi = items.at(itemPos);
308 EXPECT_EQ(queue_op::set, qi->getOperation());
309 size_t mid = qi->getBySeqno();
310 EXPECT_GT(mid, lastMutationId);
311 lastMutationId = qi->getBySeqno();
314 // Check that the following items are checkpoint end, followed by a
316 EXPECT_EQ(queue_op::checkpoint_end, items.at(11)->getOperation());
317 EXPECT_EQ(queue_op::checkpoint_start, items.at(12)->getOperation());
321 manager->checkAndAddNewCheckpoint(1, vbucket);
322 range = manager->getAllItemsForCursor(cursor, items);
323 EXPECT_EQ(1001, range.start);
324 EXPECT_EQ(1010, range.end);
325 EXPECT_EQ(0, items.size());
328 // Sanity check test fixture
329 TEST_F(CheckpointTest, CheckFixture) {
331 // Should intially have a single cursor (persistence).
332 EXPECT_EQ(1, manager->getNumOfCursors());
333 EXPECT_EQ(1, manager->getNumOpenChkItems());
334 for (auto& cursor : manager->getAllCursors()) {
335 EXPECT_EQ(CheckpointManager::pCursorName, cursor.first);
337 // Should initially be zero items to persist.
338 EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
340 // Check that the items fetched matches the number we were told to expect.
341 std::vector<queued_item> items;
342 auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
344 EXPECT_EQ(0, result.start);
345 EXPECT_EQ(0, result.end);
346 EXPECT_EQ(1, items.size());
347 EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
350 MATCHER_P(HasOperation, op, "") { return arg->getOperation() == op; }
352 // Basic test of a single, open checkpoint.
353 TEST_F(CheckpointTest, OneOpenCkpt) {
355 // Queue a set operation.
356 queued_item qi(new Item("key1", vbucket->getId(), queue_op::set,
357 /*revSeq*/20, /*bySeq*/0));
359 // No set_ops in queue, expect queueDirty to return true (increase
360 // persistence queue size).
361 EXPECT_TRUE(manager->queueDirty(*vbucket, qi, GenerateBySeqno::Yes,
363 EXPECT_EQ(1, manager->getNumCheckpoints()); // Single open checkpoint.
364 EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 1x op_set
365 EXPECT_EQ(1001, qi->getBySeqno());
366 EXPECT_EQ(20, qi->getRevSeqno());
367 EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
369 // Adding the same key again shouldn't increase the size.
370 queued_item qi2(new Item("key1", vbucket->getId(), queue_op::set,
371 /*revSeq*/21, /*bySeq*/0));
372 EXPECT_FALSE(manager->queueDirty(*vbucket, qi2, GenerateBySeqno::Yes,
374 EXPECT_EQ(1, manager->getNumCheckpoints());
375 EXPECT_EQ(2, manager->getNumOpenChkItems());
376 EXPECT_EQ(1002, qi2->getBySeqno());
377 EXPECT_EQ(21, qi2->getRevSeqno());
378 EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
380 // Adding a different key should increase size.
381 queued_item qi3(new Item("key2", vbucket->getId(), queue_op::set,
382 /*revSeq*/0, /*bySeq*/0));
383 EXPECT_TRUE(manager->queueDirty(*vbucket, qi3, GenerateBySeqno::Yes,
385 EXPECT_EQ(1, manager->getNumCheckpoints());
386 EXPECT_EQ(3, manager->getNumOpenChkItems());
387 EXPECT_EQ(1003, qi3->getBySeqno());
388 EXPECT_EQ(0, qi3->getRevSeqno());
389 EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
391 // Check that the items fetched matches the number we were told to expect.
392 std::vector<queued_item> items;
393 auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
395 EXPECT_EQ(0, result.start);
396 EXPECT_EQ(1003, result.end);
397 EXPECT_EQ(3, items.size());
398 testing::ElementsAre(HasOperation(queue_op::checkpoint_start),
399 HasOperation(queue_op::set),
400 HasOperation(queue_op::set));
403 // Test with one open and one closed checkpoint.
404 TEST_F(CheckpointTest, OneOpenOneClosed) {
406 // Add some items to the initial (open) checkpoint.
407 for (auto i : {1,2}) {
408 EXPECT_TRUE(queueNewItem("key" + std::to_string(i)));
410 EXPECT_EQ(1, manager->getNumCheckpoints());
411 EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
412 const uint64_t ckpt_id1 = manager->getOpenCheckpointId();
414 // Create a new checkpoint (closing the current open one).
415 const uint64_t ckpt_id2 = manager->createNewCheckpoint();
416 EXPECT_NE(ckpt_id1, ckpt_id2) << "New checkpoint ID should differ from old";
417 EXPECT_EQ(ckpt_id1, manager->getLastClosedCheckpointId());
418 EXPECT_EQ(1, manager->getNumOpenChkItems()); // 1x op_checkpoint_start
420 // Add some items to the newly-opened checkpoint (note same keys as 1st
422 for (auto ii : {1,2}) {
423 EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
425 EXPECT_EQ(2, manager->getNumCheckpoints());
426 EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
428 // Examine the items - should be 2 lots of two keys.
429 EXPECT_EQ(4, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
431 // Check that the items fetched matches the number we were told to expect.
432 std::vector<queued_item> items;
433 auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
435 EXPECT_EQ(0, result.start);
436 EXPECT_EQ(1004, result.end);
437 EXPECT_EQ(7, items.size());
439 testing::ElementsAre(HasOperation(queue_op::checkpoint_start),
440 HasOperation(queue_op::set),
441 HasOperation(queue_op::set),
442 HasOperation(queue_op::checkpoint_end),
443 HasOperation(queue_op::checkpoint_start),
444 HasOperation(queue_op::set),
445 HasOperation(queue_op::set)));
448 // Test the automatic creation of checkpoints based on the number of items.
449 TEST_F(CheckpointTest, ItemBasedCheckpointCreation) {
451 // Size down the default number of items to create a new checkpoint and
452 // recreate the manager
453 checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
454 MIN_CHECKPOINT_ITEMS,
458 /*enableMerge*/false);
461 // Sanity check initial state.
462 EXPECT_EQ(1, manager->getNumOfCursors());
463 EXPECT_EQ(1, manager->getNumOpenChkItems());
464 EXPECT_EQ(1, manager->getNumCheckpoints());
466 // Create one less than the number required to create a new checkpoint.
468 for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
469 EXPECT_EQ(ii + 1, manager->getNumOpenChkItems()); /* +1 for op_ckpt_start */
471 EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
472 EXPECT_EQ(1, manager->getNumCheckpoints());
476 // Add one more - should create a new checkpoint.
477 EXPECT_TRUE(queueNewItem("key_epoch"));
478 EXPECT_EQ(2, manager->getNumCheckpoints());
479 EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_ckpt_start, 1x op_set
481 // Fill up this checkpoint also - note loop for MIN_CHECKPOINT_ITEMS - 1
482 for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS - 1; ii++) {
483 EXPECT_EQ(ii + 2, manager->getNumOpenChkItems()); /* +2 op_ckpt_start, key_epoch */
485 EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
487 EXPECT_EQ(2, manager->getNumCheckpoints());
490 // Add one more - as we have hit maximum checkpoints should *not* create a
492 EXPECT_TRUE(queueNewItem("key_epoch2"));
493 EXPECT_EQ(2, manager->getNumCheckpoints());
494 EXPECT_EQ(12, // 1x op_ckpt_start, 1x key_epoch, 9x key_X, 1x key_epoch2
495 manager->getNumOpenChkItems());
497 // Fetch the items associated with the persistence cursor. This
498 // moves the single cursor registered outside of the initial checkpoint,
499 // allowing a new open checkpoint to be created.
500 EXPECT_EQ(1, manager->getNumOfCursors());
501 snapshot_range_t range;
502 std::vector<queued_item> items;
503 range = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
506 EXPECT_EQ(0, range.start);
507 EXPECT_EQ(1021, range.end);
508 EXPECT_EQ(24, items.size());
510 // Should still have the same number of checkpoints and open items.
511 EXPECT_EQ(2, manager->getNumCheckpoints());
512 EXPECT_EQ(12, manager->getNumOpenChkItems());
514 // But adding a new item will create a new one.
515 EXPECT_TRUE(queueNewItem("key_epoch3"));
516 EXPECT_EQ(3, manager->getNumCheckpoints());
517 EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_ckpt_start, 1x op_set
520 // Test checkpoint and cursor accounting - when checkpoints are closed the
521 // offset of cursors is updated as appropriate.
522 TEST_F(CheckpointTest, CursorOffsetOnCheckpointClose) {
524 // Add two items to the initial (open) checkpoint.
525 for (auto i : {1,2}) {
526 EXPECT_TRUE(queueNewItem("key" + std::to_string(i)));
528 EXPECT_EQ(1, manager->getNumCheckpoints());
529 EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
531 // Use the existing persistence cursor for this test:
532 EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
533 << "Cursor should initially have two items pending";
535 // Check de-dupe counting - after adding another item with the same key,
536 // should still see two items.
537 EXPECT_FALSE(queueNewItem("key1"))
538 << "Adding a duplicate key to open checkpoint should not increase queue size";
540 EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
541 << "Expected 2 items for cursor (2x op_set) after adding a duplicate.";
543 // Create a new checkpoint (closing the current open one).
544 manager->createNewCheckpoint();
545 EXPECT_EQ(1, manager->getNumOpenChkItems())
546 << "Expected 1 item (1x op_checkpoint_start)";
547 EXPECT_EQ(2, manager->getNumCheckpoints());
548 EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
549 << "Expected 2 items for cursor after creating new checkpoint";
551 // Advance cursor - first to get the 'checkpoint_start' meta item,
552 // and a second time to get the a 'proper' mutation.
553 bool isLastMutationItem;
554 auto item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
555 EXPECT_TRUE(item->isCheckPointMetaItem());
556 EXPECT_FALSE(isLastMutationItem);
557 EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
558 << "Expected 2 items for cursor after advancing one item";
560 item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
561 EXPECT_FALSE(item->isCheckPointMetaItem());
562 EXPECT_FALSE(isLastMutationItem);
563 EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
564 << "Expected 1 item for cursor after advancing by 1";
566 // Add two items to the newly-opened checkpoint. Same keys as 1st ckpt,
567 // but cannot de-dupe across checkpoints.
568 for (auto ii : {1,2}) {
569 EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
572 EXPECT_EQ(3, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
573 << "Expected 3 items for cursor after adding 2 more to new checkpoint";
575 // Advance the cursor 'out' of the first checkpoint.
576 item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
577 EXPECT_FALSE(item->isCheckPointMetaItem());
578 EXPECT_TRUE(isLastMutationItem);
580 // Now at the end of the first checkpoint, move into the next checkpoint.
581 item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
582 EXPECT_TRUE(item->isCheckPointMetaItem());
583 EXPECT_TRUE(isLastMutationItem);
584 item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
585 EXPECT_TRUE(item->isCheckPointMetaItem());
586 EXPECT_FALSE(isLastMutationItem);
588 // Tell Checkpoint manager the items have been persisted, so it advances
589 // pCursorPreCheckpointId, which will allow us to remove the closed
590 // unreferenced checkpoints.
591 manager->itemsPersisted();
593 // Both previous checkpoints are unreferenced. Close them. This will
594 // cause the offset of this cursor to be recalculated.
595 bool new_open_ckpt_created;
596 EXPECT_EQ(2, manager->removeClosedUnrefCheckpoints(vbucket,
597 new_open_ckpt_created));
599 EXPECT_EQ(1, manager->getNumCheckpoints());
601 EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
603 // Drain the remaining items.
604 item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
605 EXPECT_FALSE(item->isCheckPointMetaItem());
606 EXPECT_FALSE(isLastMutationItem);
607 item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
608 EXPECT_FALSE(item->isCheckPointMetaItem());
609 EXPECT_TRUE(isLastMutationItem);
611 EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
614 // Test the getAllItemsForCursor()
615 TEST_F(CheckpointTest, ItemsForCheckpointCursor) {
616 /* We want to have items across 2 checkpoints. Size down the default number
617 of items to create a new checkpoint and recreate the manager */
618 checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
619 MIN_CHECKPOINT_ITEMS,
623 /*enableMerge*/false);
626 /* Sanity check initial state */
627 EXPECT_EQ(1, manager->getNumOfCursors());
628 EXPECT_EQ(1, manager->getNumOpenChkItems());
629 EXPECT_EQ(1, manager->getNumCheckpoints());
631 /* Add items such that we have 2 checkpoints */
633 for (unsigned int ii = 0; ii < 2 * MIN_CHECKPOINT_ITEMS; ii++) {
634 EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
637 /* Check if we have desired number of checkpoints and desired number of
639 EXPECT_EQ(2, manager->getNumCheckpoints());
640 EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, manager->getNumOpenChkItems());
641 /* MIN_CHECKPOINT_ITEMS items + op_ckpt_start */
643 /* Register DCP replication cursor */
644 std::string dcp_cursor(DCP_CURSOR_PREFIX + std::to_string(1));
645 manager->registerCursorBySeqno(dcp_cursor.c_str(), 0,
646 MustSendCheckpointEnd::NO);
648 /* Get items for persistence*/
649 std::vector<queued_item> items;
650 manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
652 /* We should have got (2 * MIN_CHECKPOINT_ITEMS + 3) items. 3 additional are
653 op_ckpt_start, op_ckpt_end and op_ckpt_start */
654 EXPECT_EQ(2 * MIN_CHECKPOINT_ITEMS + 3, items.size());
656 /* Get items for DCP replication cursor */
658 manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
659 EXPECT_EQ(2 * MIN_CHECKPOINT_ITEMS + 3, items.size());
662 // Test the checkpoint cursor movement
663 TEST_F(CheckpointTest, CursorMovement) {
664 /* We want to have items across 2 checkpoints. Size down the default number
665 of items to create a new checkpoint and recreate the manager */
666 checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
667 MIN_CHECKPOINT_ITEMS,
671 /*enableMerge*/false);
674 /* Sanity check initial state */
675 EXPECT_EQ(1, manager->getNumOfCursors());
676 EXPECT_EQ(1, manager->getNumOpenChkItems());
677 EXPECT_EQ(1, manager->getNumCheckpoints());
679 /* Add items such that we have 1 full (max items as per config) checkpoint.
680 Adding another would open new checkpoint */
682 for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
683 EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
686 /* Check if we have desired number of checkpoints and desired number of
688 EXPECT_EQ(1, manager->getNumCheckpoints());
689 EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, manager->getNumOpenChkItems());
690 /* MIN_CHECKPOINT_ITEMS items + op_ckpt_start */
692 /* Register DCP replication cursor */
693 std::string dcp_cursor(DCP_CURSOR_PREFIX + std::to_string(1));
694 manager->registerCursorBySeqno(dcp_cursor.c_str(), 0,
695 MustSendCheckpointEnd::NO);
697 /* Registor TAP cursor */
698 std::string tap_cursor(TAP_CURSOR_PREFIX + std::to_string(1));
699 manager->registerCursor(tap_cursor, 1, false, MustSendCheckpointEnd::YES);
701 /* Get items for persistence cursor */
702 std::vector<queued_item> items;
703 manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
705 /* We should have got (MIN_CHECKPOINT_ITEMS + op_ckpt_start) items. */
706 EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
708 /* Get items for DCP replication cursor */
710 manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
711 EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
713 /* Get items for TAP cursor */
716 bool isLastItem = false;
717 qi = manager->nextItem(tap_cursor, isLastItem);
723 EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, num_items);
725 uint64_t curr_open_chkpt_id = manager->getOpenCheckpointId_UNLOCKED();
727 /* Run the checkpoint remover so that new open checkpoint is created */
728 bool newCheckpointCreated;
729 manager->removeClosedUnrefCheckpoints(vbucket, newCheckpointCreated);
730 EXPECT_EQ(curr_open_chkpt_id + 1, manager->getOpenCheckpointId_UNLOCKED());
732 /* Get items for persistence cursor */
733 EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
734 << "Expected to have no normal (only meta) items";
736 manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
738 /* We should have got op_ckpt_start item */
739 EXPECT_EQ(1, items.size());
740 EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
742 /* Get items for DCP replication cursor */
743 EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
744 << "Expected to have no normal (only meta) items";
746 manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
747 /* Expecting only 1 op_ckpt_start item */
748 EXPECT_EQ(1, items.size());
749 EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
751 /* Get item for TAP cursor. We expect TAP to send op_ckpt_end of last
752 checkpoint. TAP unlike DCP cannot skip the op_ckpt_end message */
753 bool isLastItem = false;
754 qi = manager->nextItem(tap_cursor, isLastItem);
755 EXPECT_EQ(queue_op::checkpoint_end, qi->getOperation());
756 EXPECT_EQ(true, isLastItem);
760 // Test the checkpoint cursor movement for replica vBuckets (where we can
761 // perform more checkpoint collapsing)
762 TEST_F(CheckpointTest, CursorMovementReplicaMerge) {
764 vbucket->setState(vbucket_state_replica);
766 /* We want to have items across 2 checkpoints. Size down the default number
767 of items to create a new checkpoint and recreate the manager */
768 checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
769 MIN_CHECKPOINT_ITEMS,
773 /*enableMerge*/true);
775 // Add items such that we have a checkpoint at half-capacity.
776 for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS / 2; ii++) {
777 EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
780 /* Check if we have desired number of checkpoints and desired number of
782 EXPECT_EQ(1, manager->getNumCheckpoints());
783 // +1 for checkpoint_start.
784 EXPECT_EQ((MIN_CHECKPOINT_ITEMS / 2) + 1, manager->getNumOpenChkItems());
786 // Register DCP replication cursor, which will be moved into the middle of
787 // first checkpoint and then left there.
788 std::string dcp_cursor{DCP_CURSOR_PREFIX + std::to_string(1)};
789 manager->registerCursorBySeqno(dcp_cursor.c_str(), 0,
790 MustSendCheckpointEnd::NO);
792 std::vector<queued_item> items;
793 manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
794 EXPECT_EQ((MIN_CHECKPOINT_ITEMS / 2) + 1, items.size());
796 // Add more items so this checkpoint is now full.
797 for (unsigned int ii = MIN_CHECKPOINT_ITEMS / 2; ii < MIN_CHECKPOINT_ITEMS;
799 EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
801 EXPECT_EQ(1, manager->getNumCheckpoints())
802 << "Should still only have 1 checkpoint after adding "
803 "MIN_CHECKPOINT_ITEMS total";
804 EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, manager->getNumOpenChkItems());
806 /* Get items for persistence cursor - this will move the persistence cursor
807 * out of the initial checkpoint.
810 manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
812 /* We should have got (MIN_CHECKPOINT_ITEMS + op_ckpt_start) items. */
813 EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
815 EXPECT_EQ(1, manager->getOpenCheckpointId_UNLOCKED());
817 // Create a new checkpoint.
818 EXPECT_EQ(2, manager->createNewCheckpoint());
820 // Add another MIN_CHECKPOINT_ITEMS. This should fill up the second
822 for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
823 EXPECT_TRUE(queueNewItem("keyB_" + std::to_string(ii)));
826 // Move the persistence cursor through these new items.
827 EXPECT_EQ(MIN_CHECKPOINT_ITEMS,
828 manager->getNumItemsForCursor(CheckpointManager::pCursorName));
830 manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
831 EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
833 // Create a third checkpoint.
834 EXPECT_EQ(3, manager->createNewCheckpoint());
836 // Move persistence cursor into third checkpoint.
838 manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
839 EXPECT_EQ(1, items.size())
840 << "Expected to get a single meta item";
841 EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
843 // We now have an unoccupied second checkpoint. We should be able to
844 // collapse this, and move the dcp_cursor into the merged checkpoint.
845 bool newCheckpointCreated;
846 manager->removeClosedUnrefCheckpoints(vbucket, newCheckpointCreated);
848 /* Get items for DCP cursor */
849 EXPECT_EQ(MIN_CHECKPOINT_ITEMS/2 + MIN_CHECKPOINT_ITEMS,
850 manager->getNumItemsForCursor(dcp_cursor))
851 << "DCP cursor remaining items should have been recalculated after "
852 "close of unref checkpoints.";
855 auto range = manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
856 EXPECT_EQ(1001, range.start);
857 EXPECT_EQ(1020, range.end);
859 // Check we have received correct items (done in chunks because
860 // EXPECT_THAT maxes out at 10 elements).
861 std::vector<queued_item> items_a(items.begin(), items.begin() + 5);
862 // Remainder of first checkpoint.
863 EXPECT_THAT(items_a, testing::Each(HasOperation(queue_op::set)));
865 // Second checkpoint's data- 10x set.
866 std::vector<queued_item> items_b(items.begin() + 5, items.begin() + 15);
867 EXPECT_THAT(items_b, testing::Each(HasOperation(queue_op::set)));
869 // end of second checkpoint and start of third.
870 std::vector<queued_item> items_c(items.begin() + 15, items.end());
872 testing::ElementsAre(HasOperation(queue_op::checkpoint_end),
873 HasOperation(queue_op::checkpoint_start)));
877 // It's critical that the HLC (CAS) is ordered with seqno generation
878 // otherwise XDCR may drop a newer bySeqno mutation because the CAS is not
881 TEST_F(CheckpointTest, SeqnoAndHLCOrdering) {
883 const int n_threads = 8;
884 const int n_items = 1000;
886 // configure so we can store a large number of items
887 checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
892 /*enableMerge*/false);
895 /* Sanity check initial state */
896 EXPECT_EQ(1, manager->getNumOfCursors());
897 EXPECT_EQ(1, manager->getNumOpenChkItems());
898 EXPECT_EQ(1, manager->getNumCheckpoints());
900 std::vector<std::thread> threads;
902 // vector of pairs, first is seqno, second is CAS
903 // just do a scatter gather over n_threads
904 std::vector<std::vector<std::pair<uint64_t, uint64_t> > > threadData(n_threads);
905 for (int ii = 0; ii < n_threads; ii++) {
906 auto& threadsData = threadData[ii];
907 threads.push_back(std::thread([this, ii, n_items, &threadsData](){
908 std::string key = "key" + std::to_string(ii);
909 for (int item = 0; item < n_items; item++) {
910 queued_item qi(new Item(key + std::to_string(item),
911 vbucket->getId(), queue_op::set,
912 /*revSeq*/0, /*bySeq*/0));
913 EXPECT_TRUE(manager->queueDirty(*vbucket,
915 GenerateBySeqno::Yes,
919 threadsData.push_back(std::make_pair(qi->getBySeqno(), qi->getCas()));
924 // Wait for all threads
925 for (auto& thread : threads) {
929 // Now combine the data and check HLC is increasing with seqno
930 std::map<uint64_t, uint64_t> finalData;
931 for (auto t : threadData) {
932 for (auto pair : t) {
933 EXPECT_EQ(finalData.end(), finalData.find(pair.first));
934 finalData[pair.first] = pair.second;
938 auto itr = finalData.begin();
939 EXPECT_NE(itr, finalData.end());
940 uint64_t previousCas = (itr++)->second;
941 EXPECT_NE(itr, finalData.end());
942 for (; itr != finalData.end(); itr++) {
943 EXPECT_LT(previousCas, itr->second);
944 previousCas = itr->second;
947 // Now a final check, iterate the checkpoint and also check for increasing
949 std::vector<queued_item> items;
950 manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
952 /* We should have got (n_threads*n_items + op_ckpt_start) items. */
953 EXPECT_EQ(n_threads*n_items + 1, items.size());
955 previousCas = items[1]->getCas();
956 for (size_t ii = 2; ii < items.size(); ii++) {
957 EXPECT_LT(previousCas, items[ii]->getCas());
958 previousCas = items[ii]->getCas();
962 // Test cursor is correctly updated when enqueuing a key which already exists
963 // in the checkpoint (and needs de-duping), where the cursor points at a
964 // meta-item at the head of the checkpoint:
967 // Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET(key), 2:SET_VBSTATE() ]
972 // Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 2:SET_VBSTATE(), 2:SET(key) ]
976 TEST_F(CheckpointTest, CursorUpdateForExistingItemWithMetaItemAtHead) {
977 // Setup the checkpoint and cursor.
978 ASSERT_EQ(1, manager->getNumItems());
979 ASSERT_TRUE(queueNewItem("key"));
980 ASSERT_EQ(2, manager->getNumItems());
981 manager->queueSetVBState(*vbucket.get());
983 ASSERT_EQ(3, manager->getNumItems());
985 // Advance persistence cursor so all items have been consumed.
986 std::vector<queued_item> items;
987 manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
988 ASSERT_EQ(3, items.size());
989 ASSERT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
991 // Queue an item with a duplicate key.
994 // Test: Should have one item for cursor (the one we just added).
995 EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
997 // Should have another item to read (new version of 'key')
999 manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1000 EXPECT_EQ(1, items.size());
1003 // Test cursor is correctly updated when enqueuing a key which already exists
1004 // in the checkpoint (and needs de-duping), where the cursor points at a
1005 // meta-item *not* at the head of the checkpoint:
1008 // Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET_VBSTATE(key), 1:SET() ]
1013 // Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET_VBSTATE(key), 2:SET() ]
1017 TEST_F(CheckpointTest, CursorUpdateForExistingItemWithNonMetaItemAtHead) {
1018 // Setup the checkpoint and cursor.
1019 ASSERT_EQ(1, manager->getNumItems());
1020 manager->queueSetVBState(*vbucket.get());
1021 ASSERT_EQ(2, manager->getNumItems());
1023 // Advance persistence cursor so all items have been consumed.
1024 std::vector<queued_item> items;
1025 manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1026 ASSERT_EQ(2, items.size());
1027 ASSERT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
1029 // Queue a set (cursor will now be one behind).
1030 ASSERT_TRUE(queueNewItem("key"));
1031 ASSERT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
1033 // Test: queue an item with a duplicate key.
1034 queueNewItem("key");
1036 // Test: Should have one item for cursor (the one we just added).
1037 EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
1039 // Should an item to read (new version of 'key')
1041 manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1042 EXPECT_EQ(1, items.size());
1043 EXPECT_EQ(1002, items.at(0)->getBySeqno());
1044 EXPECT_EQ("key", items.at(0)->getKey());