MB-21724: Reduce iterations in CheckpointTest basic_chk_test
[ep-engine.git] / tests / module_tests / checkpoint_test.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2011 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <algorithm>
21 #include <set>
22 #include <thread>
23 #include <vector>
24
25 #include "checkpoint.h"
26 #include "stats.h"
27 #include "vbucket.h"
28
29 #include <gtest/gtest.h>
30 #include <gmock/gmock.h>
31 #include <valgrind/valgrind.h>
32
33 #define NUM_TAP_THREADS 3
34 #define NUM_TAP_THREADS_VG 2
35 #define NUM_SET_THREADS 4
36 #define NUM_SET_THREADS_VG 2
37
38 #define NUM_ITEMS 500
39 #define NUM_ITEMS_VG 10
40
41 #define DCP_CURSOR_PREFIX "dcp-client-"
42 #define TAP_CURSOR_PREFIX "tap-client-"
43
44 extern "C" {
45
46 /**
47  * Dummy callback to replace the flusher callback.
48  */
49 class DummyCB: public Callback<uint16_t> {
50 public:
51     DummyCB() {}
52
53     void callback(uint16_t &dummy) {
54         (void) dummy;
55     }
56 };
57
58 // Test fixture for Checkpoint tests. Once constructed provides a checkpoint
59 // manager and single vBucket (VBID 0).
60 class CheckpointTest : public ::testing::Test {
61 protected:
62     CheckpointTest()
63         : callback(new DummyCB()),
64           vbucket(new VBucket(0, vbucket_state_active, global_stats,
65                               checkpoint_config, /*kvshard*/NULL,
66                               /*lastSeqno*/1000, /*lastSnapStart*/0,
67                               /*lastSnapEnd*/0, /*table*/NULL,
68                               callback, config)) {
69         createManager();
70     }
71
72     void createManager() {
73         manager.reset(new CheckpointManager(global_stats, vbucket->getId(),
74                                             checkpoint_config,
75                                             /*lastSeqno*/1000,
76                                             /*lastSnapStart*/0,/*lastSnapEnd*/0,
77                                             callback));
78     }
79
80     // Creates a new item with the given key and queues it into the checkpoint
81     // manager.
82     bool queueNewItem(const std::string& key) {
83
84         queued_item qi{new Item(key, vbucket->getId(), queue_op::set,
85                                 /*revSeq*/0, /*bySeq*/0)};
86         return manager->queueDirty(*vbucket, qi,
87                                    GenerateBySeqno::Yes, GenerateCas::Yes);
88     }
89
90
91     EPStats global_stats;
92     CheckpointConfig checkpoint_config;
93     Configuration config;
94     std::shared_ptr<Callback<uint16_t> > callback;
95     RCPtr<VBucket> vbucket;
96     std::unique_ptr<CheckpointManager> manager;
97 };
98
99 struct thread_args {
100     RCPtr<VBucket> vbucket;
101     CheckpointManager *checkpoint_manager;
102     int n_threads;
103     std::string name;
104 };
105
106 /*
107  * atomically increment a threadCount
108  * if the calling thread is the last one up, notify_all
109  * if the calling thread is not the last one up, wait (in the function)
110  */
111 static void thread_up(struct thread_args* args) {
112     static int threadCount = 0;
113     static std::mutex m;
114     static std::condition_variable cv;
115     std::unique_lock<std::mutex> lh(m);
116     if (++threadCount != args->n_threads) {
117         cv.wait(lh, [args](){return threadCount == args->n_threads;});
118     } else {
119         cv.notify_all(); // all threads accounted for, begin
120     }
121     lh.unlock();
122 }
123
124 static void launch_persistence_thread(void *arg) {
125     struct thread_args *args = static_cast<struct thread_args *>(arg);
126     thread_up(args);
127
128     bool flush = false;
129     while(true) {
130         size_t itemPos;
131         std::vector<queued_item> items;
132         const std::string cursor(CheckpointManager::pCursorName);
133         args->checkpoint_manager->getAllItemsForCursor(cursor, items);
134         for(itemPos = 0; itemPos < items.size(); ++itemPos) {
135             queued_item qi = items.at(itemPos);
136             if (qi->getOperation() == queue_op::flush) {
137                 flush = true;
138                 break;
139             }
140         }
141         if (flush) {
142             // Checkpoint start and end operations may have been introduced in
143             // the items queue after the "flush" operation was added. Ignore
144             // these. Anything else will be considered an error.
145             for(size_t i = itemPos + 1; i < items.size(); ++i) {
146                 queued_item qi = items.at(i);
147                 EXPECT_TRUE(queue_op::checkpoint_start == qi->getOperation() ||
148                             queue_op::checkpoint_end == qi->getOperation())
149                     << "Unexpected operation:" << to_string(qi->getOperation());
150             }
151             break;
152         }
153     }
154     EXPECT_TRUE(flush);
155 }
156
157 static void launch_tap_client_thread(void *arg) {
158     struct thread_args *args = static_cast<struct thread_args *>(arg);
159     thread_up(args);
160
161     bool flush = false;
162     bool isLastItem = false;
163     while(true) {
164         queued_item qi = args->checkpoint_manager->nextItem(args->name,
165                                                             isLastItem);
166         if (qi->getOperation() == queue_op::flush) {
167             flush = true;
168             break;
169         }
170     }
171     EXPECT_TRUE(flush);
172 }
173
174 static void launch_checkpoint_cleanup_thread(void *arg) {
175     struct thread_args *args = static_cast<struct thread_args *>(arg);
176     thread_up(args);
177
178     while (args->checkpoint_manager->getNumOfCursors() > 1) {
179         bool newCheckpointCreated;
180         args->checkpoint_manager->removeClosedUnrefCheckpoints(args->vbucket,
181                                                                newCheckpointCreated);
182     }
183 }
184
185 static void launch_set_thread(void *arg) {
186     struct thread_args *args = static_cast<struct thread_args *>(arg);
187     thread_up(args);
188
189     int i(0);
190     const int item_count = RUNNING_ON_VALGRIND ? NUM_ITEMS_VG : NUM_ITEMS;
191     for (i = 0; i < item_count; ++i) {
192         std::stringstream key;
193         key << "key-" << i;
194         queued_item qi(new Item(key.str(), args->vbucket->getId(),
195                                 queue_op::set, 0, 0));
196         args->checkpoint_manager->queueDirty(*args->vbucket, qi,
197                                              GenerateBySeqno::Yes,
198                                              GenerateCas::Yes);
199     }
200 }
201 }
202
203 TEST_F(CheckpointTest, basic_chk_test) {
204     std::shared_ptr<Callback<uint16_t> > cb(new DummyCB());
205     RCPtr<VBucket> vbucket(new VBucket(0, vbucket_state_active, global_stats,
206                                        checkpoint_config, NULL, 0, 0, 0, NULL,
207                                        cb, config));
208
209     CheckpointManager *checkpoint_manager = new CheckpointManager(global_stats, 0,
210                                                                   checkpoint_config,
211                                                                   1, 0, 0, cb);
212
213     const int n_set_threads = RUNNING_ON_VALGRIND ? NUM_SET_THREADS_VG :
214                                                        NUM_SET_THREADS;
215
216     const int n_tap_threads = RUNNING_ON_VALGRIND ? NUM_TAP_THREADS_VG :
217                                                        NUM_TAP_THREADS;
218
219     std::vector<cb_thread_t> tap_threads(n_tap_threads);
220     std::vector<cb_thread_t> set_threads(n_set_threads);
221     cb_thread_t persistence_thread;
222     cb_thread_t checkpoint_cleanup_thread;
223     int i(0), rc(0);
224
225     struct thread_args t_args;
226     t_args.checkpoint_manager = checkpoint_manager;
227     t_args.vbucket = vbucket;
228     t_args.n_threads = n_set_threads + n_tap_threads + 2;
229
230     std::vector<struct thread_args> tap_t_args(n_tap_threads);
231     for (i = 0; i < n_tap_threads; ++i) {
232         std::string name(TAP_CURSOR_PREFIX + std::to_string(i));
233         tap_t_args[i].checkpoint_manager = checkpoint_manager;
234         tap_t_args[i].vbucket = vbucket;
235         tap_t_args[i].name = name;
236         tap_t_args[i].n_threads = n_set_threads + n_tap_threads + 2;
237         checkpoint_manager->registerCursor(name, 1, false,
238                                            MustSendCheckpointEnd::YES);
239     }
240
241     rc = cb_create_thread(&persistence_thread, launch_persistence_thread, &t_args, 0);
242     EXPECT_EQ(0, rc);
243
244     rc = cb_create_thread(&checkpoint_cleanup_thread,
245                         launch_checkpoint_cleanup_thread, &t_args, 0);
246     EXPECT_EQ(0, rc);
247
248     for (i = 0; i < n_tap_threads; ++i) {
249         rc = cb_create_thread(&tap_threads[i], launch_tap_client_thread, &tap_t_args[i], 0);
250         EXPECT_EQ(0, rc);
251     }
252
253     for (i = 0; i < n_set_threads; ++i) {
254         rc = cb_create_thread(&set_threads[i], launch_set_thread, &t_args, 0);
255         EXPECT_EQ(0, rc);
256     }
257
258     for (i = 0; i < n_set_threads; ++i) {
259         rc = cb_join_thread(set_threads[i]);
260         EXPECT_EQ(0, rc);
261     }
262
263     // Push the flush command into the queue so that all other threads can be terminated.
264     std::string key("flush");
265     queued_item qi(new Item(key, vbucket->getId(), queue_op::flush, 0xffff, 0));
266     checkpoint_manager->queueDirty(*vbucket, qi, GenerateBySeqno::Yes,
267                                    GenerateCas::Yes);
268
269     rc = cb_join_thread(persistence_thread);
270     EXPECT_EQ(0, rc);
271
272     for (i = 0; i < n_tap_threads; ++i) {
273         rc = cb_join_thread(tap_threads[i]);
274         EXPECT_EQ(0, rc);
275         std::stringstream name;
276         name << "tap-client-" << i;
277         checkpoint_manager->removeCursor(name.str());
278     }
279
280     rc = cb_join_thread(checkpoint_cleanup_thread);
281     EXPECT_EQ(0, rc);
282
283     delete checkpoint_manager;
284 }
285
286 TEST_F(CheckpointTest, reset_checkpoint_id) {
287     int i;
288     for (i = 0; i < 10; ++i) {
289         EXPECT_TRUE(queueNewItem("key-" + std::to_string(i)));
290     }
291     EXPECT_EQ(11, manager->getNumOpenChkItems());
292     EXPECT_EQ(10, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
293
294     EXPECT_EQ(2, manager->createNewCheckpoint());
295
296     size_t itemPos;
297     size_t lastMutationId = 0;
298     std::vector<queued_item> items;
299     const std::string cursor(CheckpointManager::pCursorName);
300     auto range = manager->getAllItemsForCursor(cursor, items);
301     EXPECT_EQ(0, range.start);
302     EXPECT_EQ(1010, range.end);
303     EXPECT_EQ(13, items.size());
304     EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
305     // Check that the next 10 items are all SET operations.
306     for(itemPos = 1; itemPos < 11; ++itemPos) {
307         queued_item qi = items.at(itemPos);
308         EXPECT_EQ(queue_op::set, qi->getOperation());
309         size_t mid = qi->getBySeqno();
310         EXPECT_GT(mid, lastMutationId);
311         lastMutationId = qi->getBySeqno();
312     }
313
314     // Check that the following items are checkpoint end, followed by a
315     // checkpoint start.
316     EXPECT_EQ(queue_op::checkpoint_end, items.at(11)->getOperation());
317     EXPECT_EQ(queue_op::checkpoint_start, items.at(12)->getOperation());
318
319     items.clear();
320
321     manager->checkAndAddNewCheckpoint(1, vbucket);
322     range = manager->getAllItemsForCursor(cursor, items);
323     EXPECT_EQ(1001, range.start);
324     EXPECT_EQ(1010, range.end);
325     EXPECT_EQ(0, items.size());
326 }
327
328 // Sanity check test fixture
329 TEST_F(CheckpointTest, CheckFixture) {
330
331     // Should intially have a single cursor (persistence).
332     EXPECT_EQ(1, manager->getNumOfCursors());
333     EXPECT_EQ(1, manager->getNumOpenChkItems());
334     for (auto& cursor : manager->getAllCursors()) {
335         EXPECT_EQ(CheckpointManager::pCursorName, cursor.first);
336     }
337     // Should initially be zero items to persist.
338     EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
339
340     // Check that the items fetched matches the number we were told to expect.
341     std::vector<queued_item> items;
342     auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
343                                                 items);
344     EXPECT_EQ(0, result.start);
345     EXPECT_EQ(0, result.end);
346     EXPECT_EQ(1, items.size());
347     EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
348 }
349
350 MATCHER_P(HasOperation, op, "") { return arg->getOperation() == op; }
351
352 // Basic test of a single, open checkpoint.
353 TEST_F(CheckpointTest, OneOpenCkpt) {
354
355     // Queue a set operation.
356     queued_item qi(new Item("key1", vbucket->getId(), queue_op::set,
357                             /*revSeq*/20, /*bySeq*/0));
358
359     // No set_ops in queue, expect queueDirty to return true (increase
360     // persistence queue size).
361     EXPECT_TRUE(manager->queueDirty(*vbucket, qi, GenerateBySeqno::Yes,
362                                     GenerateCas::Yes));
363     EXPECT_EQ(1, manager->getNumCheckpoints());  // Single open checkpoint.
364     EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 1x op_set
365     EXPECT_EQ(1001, qi->getBySeqno());
366     EXPECT_EQ(20, qi->getRevSeqno());
367     EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
368
369     // Adding the same key again shouldn't increase the size.
370     queued_item qi2(new Item("key1", vbucket->getId(), queue_op::set,
371                             /*revSeq*/21, /*bySeq*/0));
372     EXPECT_FALSE(manager->queueDirty(*vbucket, qi2, GenerateBySeqno::Yes,
373                                      GenerateCas::Yes));
374     EXPECT_EQ(1, manager->getNumCheckpoints());
375     EXPECT_EQ(2, manager->getNumOpenChkItems());
376     EXPECT_EQ(1002, qi2->getBySeqno());
377     EXPECT_EQ(21, qi2->getRevSeqno());
378     EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
379
380     // Adding a different key should increase size.
381     queued_item qi3(new Item("key2", vbucket->getId(), queue_op::set,
382                             /*revSeq*/0, /*bySeq*/0));
383     EXPECT_TRUE(manager->queueDirty(*vbucket, qi3, GenerateBySeqno::Yes,
384                                     GenerateCas::Yes));
385     EXPECT_EQ(1, manager->getNumCheckpoints());
386     EXPECT_EQ(3, manager->getNumOpenChkItems());
387     EXPECT_EQ(1003, qi3->getBySeqno());
388     EXPECT_EQ(0, qi3->getRevSeqno());
389     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
390
391     // Check that the items fetched matches the number we were told to expect.
392     std::vector<queued_item> items;
393     auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
394                                                 items);
395     EXPECT_EQ(0, result.start);
396     EXPECT_EQ(1003, result.end);
397     EXPECT_EQ(3, items.size());
398     testing::ElementsAre(HasOperation(queue_op::checkpoint_start),
399                          HasOperation(queue_op::set),
400                          HasOperation(queue_op::set));
401 }
402
403 // Test with one open and one closed checkpoint.
404 TEST_F(CheckpointTest, OneOpenOneClosed) {
405
406     // Add some items to the initial (open) checkpoint.
407     for (auto i : {1,2}) {
408         EXPECT_TRUE(queueNewItem("key" + std::to_string(i)));
409     }
410     EXPECT_EQ(1, manager->getNumCheckpoints());
411     EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
412     const uint64_t ckpt_id1 = manager->getOpenCheckpointId();
413
414     // Create a new checkpoint (closing the current open one).
415     const uint64_t ckpt_id2 = manager->createNewCheckpoint();
416     EXPECT_NE(ckpt_id1, ckpt_id2) << "New checkpoint ID should differ from old";
417     EXPECT_EQ(ckpt_id1, manager->getLastClosedCheckpointId());
418     EXPECT_EQ(1, manager->getNumOpenChkItems()); // 1x op_checkpoint_start
419
420     // Add some items to the newly-opened checkpoint (note same keys as 1st
421     // ckpt).
422     for (auto ii : {1,2}) {
423         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
424     }
425     EXPECT_EQ(2, manager->getNumCheckpoints());
426     EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
427
428     // Examine the items - should be 2 lots of two keys.
429     EXPECT_EQ(4, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
430
431     // Check that the items fetched matches the number we were told to expect.
432     std::vector<queued_item> items;
433     auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
434                                                 items);
435     EXPECT_EQ(0, result.start);
436     EXPECT_EQ(1004, result.end);
437     EXPECT_EQ(7, items.size());
438     EXPECT_THAT(items,
439                 testing::ElementsAre(HasOperation(queue_op::checkpoint_start),
440                                      HasOperation(queue_op::set),
441                                      HasOperation(queue_op::set),
442                                      HasOperation(queue_op::checkpoint_end),
443                                      HasOperation(queue_op::checkpoint_start),
444                                      HasOperation(queue_op::set),
445                                      HasOperation(queue_op::set)));
446 }
447
448 // Test the automatic creation of checkpoints based on the number of items.
449 TEST_F(CheckpointTest, ItemBasedCheckpointCreation) {
450
451     // Size down the default number of items to create a new checkpoint and
452     // recreate the manager
453     checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
454                                          MIN_CHECKPOINT_ITEMS,
455                                          /*numCheckpoints*/2,
456                                          /*itemBased*/true,
457                                          /*keepClosed*/false,
458                                          /*enableMerge*/false);
459     createManager();
460
461     // Sanity check initial state.
462     EXPECT_EQ(1, manager->getNumOfCursors());
463     EXPECT_EQ(1, manager->getNumOpenChkItems());
464     EXPECT_EQ(1, manager->getNumCheckpoints());
465
466     // Create one less than the number required to create a new checkpoint.
467     queued_item qi;
468     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
469         EXPECT_EQ(ii + 1, manager->getNumOpenChkItems()); /* +1 for op_ckpt_start */
470
471         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
472         EXPECT_EQ(1, manager->getNumCheckpoints());
473
474     }
475
476     // Add one more - should create a new checkpoint.
477     EXPECT_TRUE(queueNewItem("key_epoch"));
478     EXPECT_EQ(2, manager->getNumCheckpoints());
479     EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_ckpt_start, 1x op_set
480
481     // Fill up this checkpoint also - note loop for MIN_CHECKPOINT_ITEMS - 1
482     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS - 1; ii++) {
483         EXPECT_EQ(ii + 2, manager->getNumOpenChkItems()); /* +2 op_ckpt_start, key_epoch */
484
485         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
486
487         EXPECT_EQ(2, manager->getNumCheckpoints());
488     }
489
490     // Add one more - as we have hit maximum checkpoints should *not* create a
491     // new one.
492     EXPECT_TRUE(queueNewItem("key_epoch2"));
493     EXPECT_EQ(2, manager->getNumCheckpoints());
494     EXPECT_EQ(12, // 1x op_ckpt_start, 1x key_epoch, 9x key_X, 1x key_epoch2
495               manager->getNumOpenChkItems());
496
497     // Fetch the items associated with the persistence cursor. This
498     // moves the single cursor registered outside of the initial checkpoint,
499     // allowing a new open checkpoint to be created.
500     EXPECT_EQ(1, manager->getNumOfCursors());
501     snapshot_range_t range;
502     std::vector<queued_item> items;
503     range = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
504                                          items);
505
506     EXPECT_EQ(0, range.start);
507     EXPECT_EQ(1021, range.end);
508     EXPECT_EQ(24, items.size());
509
510     // Should still have the same number of checkpoints and open items.
511     EXPECT_EQ(2, manager->getNumCheckpoints());
512     EXPECT_EQ(12, manager->getNumOpenChkItems());
513
514     // But adding a new item will create a new one.
515     EXPECT_TRUE(queueNewItem("key_epoch3"));
516     EXPECT_EQ(3, manager->getNumCheckpoints());
517     EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_ckpt_start, 1x op_set
518 }
519
520 // Test checkpoint and cursor accounting - when checkpoints are closed the
521 // offset of cursors is updated as appropriate.
522 TEST_F(CheckpointTest, CursorOffsetOnCheckpointClose) {
523
524     // Add two items to the initial (open) checkpoint.
525     for (auto i : {1,2}) {
526         EXPECT_TRUE(queueNewItem("key" + std::to_string(i)));
527     }
528     EXPECT_EQ(1, manager->getNumCheckpoints());
529     EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
530
531     // Use the existing persistence cursor for this test:
532     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
533         << "Cursor should initially have two items pending";
534
535     // Check de-dupe counting - after adding another item with the same key,
536     // should still see two items.
537     EXPECT_FALSE(queueNewItem("key1"))
538         << "Adding a duplicate key to open checkpoint should not increase queue size";
539
540     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
541         << "Expected 2 items for cursor (2x op_set) after adding a duplicate.";
542
543     // Create a new checkpoint (closing the current open one).
544     manager->createNewCheckpoint();
545     EXPECT_EQ(1, manager->getNumOpenChkItems())
546         << "Expected 1 item (1x op_checkpoint_start)";
547     EXPECT_EQ(2, manager->getNumCheckpoints());
548     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
549         << "Expected 2 items for cursor after creating new checkpoint";
550
551     // Advance cursor - first to get the 'checkpoint_start' meta item,
552     // and a second time to get the a 'proper' mutation.
553     bool isLastMutationItem;
554     auto item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
555     EXPECT_TRUE(item->isCheckPointMetaItem());
556     EXPECT_FALSE(isLastMutationItem);
557     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
558         << "Expected 2 items for cursor after advancing one item";
559
560     item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
561     EXPECT_FALSE(item->isCheckPointMetaItem());
562     EXPECT_FALSE(isLastMutationItem);
563     EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
564         << "Expected 1 item for cursor after advancing by 1";
565
566     // Add two items to the newly-opened checkpoint. Same keys as 1st ckpt,
567     // but cannot de-dupe across checkpoints.
568     for (auto ii : {1,2}) {
569         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
570     }
571
572     EXPECT_EQ(3, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
573         << "Expected 3 items for cursor after adding 2 more to new checkpoint";
574
575     // Advance the cursor 'out' of the first checkpoint.
576     item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
577     EXPECT_FALSE(item->isCheckPointMetaItem());
578     EXPECT_TRUE(isLastMutationItem);
579
580     // Now at the end of the first checkpoint, move into the next checkpoint.
581     item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
582     EXPECT_TRUE(item->isCheckPointMetaItem());
583     EXPECT_TRUE(isLastMutationItem);
584     item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
585     EXPECT_TRUE(item->isCheckPointMetaItem());
586     EXPECT_FALSE(isLastMutationItem);
587
588     // Tell Checkpoint manager the items have been persisted, so it advances
589     // pCursorPreCheckpointId, which will allow us to remove the closed
590     // unreferenced checkpoints.
591     manager->itemsPersisted();
592
593     // Both previous checkpoints are unreferenced. Close them. This will
594     // cause the offset of this cursor to be recalculated.
595     bool new_open_ckpt_created;
596     EXPECT_EQ(2, manager->removeClosedUnrefCheckpoints(vbucket,
597                                                            new_open_ckpt_created));
598
599     EXPECT_EQ(1, manager->getNumCheckpoints());
600
601     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
602
603     // Drain the remaining items.
604     item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
605     EXPECT_FALSE(item->isCheckPointMetaItem());
606     EXPECT_FALSE(isLastMutationItem);
607     item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
608     EXPECT_FALSE(item->isCheckPointMetaItem());
609     EXPECT_TRUE(isLastMutationItem);
610
611     EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
612 }
613
614 // Test the getAllItemsForCursor()
615 TEST_F(CheckpointTest, ItemsForCheckpointCursor) {
616     /* We want to have items across 2 checkpoints. Size down the default number
617        of items to create a new checkpoint and recreate the manager */
618     checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
619                                          MIN_CHECKPOINT_ITEMS,
620                                          /*numCheckpoints*/2,
621                                          /*itemBased*/true,
622                                          /*keepClosed*/false,
623                                          /*enableMerge*/false);
624     createManager();
625
626     /* Sanity check initial state */
627     EXPECT_EQ(1, manager->getNumOfCursors());
628     EXPECT_EQ(1, manager->getNumOpenChkItems());
629     EXPECT_EQ(1, manager->getNumCheckpoints());
630
631     /* Add items such that we have 2 checkpoints */
632     queued_item qi;
633     for (unsigned int ii = 0; ii < 2 * MIN_CHECKPOINT_ITEMS; ii++) {
634         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
635     }
636
637     /* Check if we have desired number of checkpoints and desired number of
638        items */
639     EXPECT_EQ(2, manager->getNumCheckpoints());
640     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, manager->getNumOpenChkItems());
641     /* MIN_CHECKPOINT_ITEMS items + op_ckpt_start */
642
643     /* Register DCP replication cursor */
644     std::string dcp_cursor(DCP_CURSOR_PREFIX + std::to_string(1));
645     manager->registerCursorBySeqno(dcp_cursor.c_str(), 0,
646                                    MustSendCheckpointEnd::NO);
647
648     /* Get items for persistence*/
649     std::vector<queued_item> items;
650     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
651
652     /* We should have got (2 * MIN_CHECKPOINT_ITEMS + 3) items. 3 additional are
653        op_ckpt_start, op_ckpt_end and op_ckpt_start */
654     EXPECT_EQ(2 * MIN_CHECKPOINT_ITEMS + 3, items.size());
655
656     /* Get items for DCP replication cursor */
657     items.clear();
658     manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
659     EXPECT_EQ(2 * MIN_CHECKPOINT_ITEMS + 3, items.size());
660 }
661
662 // Test the checkpoint cursor movement
663 TEST_F(CheckpointTest, CursorMovement) {
664     /* We want to have items across 2 checkpoints. Size down the default number
665      of items to create a new checkpoint and recreate the manager */
666     checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
667                                          MIN_CHECKPOINT_ITEMS,
668                                          /*numCheckpoints*/2,
669                                          /*itemBased*/true,
670                                          /*keepClosed*/false,
671                                          /*enableMerge*/false);
672     createManager();
673
674     /* Sanity check initial state */
675     EXPECT_EQ(1, manager->getNumOfCursors());
676     EXPECT_EQ(1, manager->getNumOpenChkItems());
677     EXPECT_EQ(1, manager->getNumCheckpoints());
678
679     /* Add items such that we have 1 full (max items as per config) checkpoint.
680        Adding another would open new checkpoint */
681     queued_item qi;
682     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
683         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
684     }
685
686     /* Check if we have desired number of checkpoints and desired number of
687        items */
688     EXPECT_EQ(1, manager->getNumCheckpoints());
689     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, manager->getNumOpenChkItems());
690     /* MIN_CHECKPOINT_ITEMS items + op_ckpt_start */
691
692     /* Register DCP replication cursor */
693     std::string dcp_cursor(DCP_CURSOR_PREFIX + std::to_string(1));
694     manager->registerCursorBySeqno(dcp_cursor.c_str(), 0,
695                                    MustSendCheckpointEnd::NO);
696
697     /* Registor TAP cursor */
698     std::string tap_cursor(TAP_CURSOR_PREFIX + std::to_string(1));
699     manager->registerCursor(tap_cursor, 1, false, MustSendCheckpointEnd::YES);
700
701     /* Get items for persistence cursor */
702     std::vector<queued_item> items;
703     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
704
705     /* We should have got (MIN_CHECKPOINT_ITEMS + op_ckpt_start) items. */
706     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
707
708     /* Get items for DCP replication cursor */
709     items.clear();
710     manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
711     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
712
713     /* Get items for TAP cursor */
714     int num_items = 0;
715     while(true) {
716         bool isLastItem = false;
717         qi = manager->nextItem(tap_cursor, isLastItem);
718         num_items++;
719         if (isLastItem) {
720             break;
721         }
722     }
723     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, num_items);
724
725     uint64_t curr_open_chkpt_id = manager->getOpenCheckpointId_UNLOCKED();
726
727     /* Run the checkpoint remover so that new open checkpoint is created */
728     bool newCheckpointCreated;
729     manager->removeClosedUnrefCheckpoints(vbucket, newCheckpointCreated);
730     EXPECT_EQ(curr_open_chkpt_id + 1, manager->getOpenCheckpointId_UNLOCKED());
731
732     /* Get items for persistence cursor */
733     EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
734         << "Expected to have no normal (only meta) items";
735     items.clear();
736     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
737
738     /* We should have got op_ckpt_start item */
739     EXPECT_EQ(1, items.size());
740     EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
741
742     /* Get items for DCP replication cursor */
743     EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
744         << "Expected to have no normal (only meta) items";
745     items.clear();
746     manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
747     /* Expecting only 1 op_ckpt_start item */
748     EXPECT_EQ(1, items.size());
749     EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
750
751     /* Get item for TAP cursor. We expect TAP to send op_ckpt_end of last
752        checkpoint. TAP unlike DCP cannot skip the op_ckpt_end message */
753     bool isLastItem = false;
754     qi = manager->nextItem(tap_cursor, isLastItem);
755     EXPECT_EQ(queue_op::checkpoint_end, qi->getOperation());
756     EXPECT_EQ(true, isLastItem);
757
758 }
759
760 // Test the checkpoint cursor movement for replica vBuckets (where we can
761 // perform more checkpoint collapsing)
762 TEST_F(CheckpointTest, CursorMovementReplicaMerge) {
763
764     vbucket->setState(vbucket_state_replica);
765
766     /* We want to have items across 2 checkpoints. Size down the default number
767      of items to create a new checkpoint and recreate the manager */
768     checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
769                                          MIN_CHECKPOINT_ITEMS,
770                                          /*numCheckpoints*/2,
771                                          /*itemBased*/true,
772                                          /*keepClosed*/false,
773                                          /*enableMerge*/true);
774
775     // Add items such that we have a checkpoint at half-capacity.
776     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS / 2; ii++) {
777         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
778     }
779
780     /* Check if we have desired number of checkpoints and desired number of
781         items */
782     EXPECT_EQ(1, manager->getNumCheckpoints());
783     // +1 for checkpoint_start.
784     EXPECT_EQ((MIN_CHECKPOINT_ITEMS / 2) + 1, manager->getNumOpenChkItems());
785
786     // Register DCP replication cursor, which will be moved into the middle of
787     // first checkpoint and then left there.
788     std::string dcp_cursor{DCP_CURSOR_PREFIX + std::to_string(1)};
789     manager->registerCursorBySeqno(dcp_cursor.c_str(), 0,
790                                    MustSendCheckpointEnd::NO);
791
792     std::vector<queued_item> items;
793     manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
794     EXPECT_EQ((MIN_CHECKPOINT_ITEMS / 2) + 1, items.size());
795
796     // Add more items so this checkpoint is now full.
797     for (unsigned int ii = MIN_CHECKPOINT_ITEMS / 2; ii < MIN_CHECKPOINT_ITEMS;
798             ii++) {
799         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
800     }
801     EXPECT_EQ(1, manager->getNumCheckpoints())
802         << "Should still only have 1 checkpoint after adding "
803                 "MIN_CHECKPOINT_ITEMS total";
804     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, manager->getNumOpenChkItems());
805
806     /* Get items for persistence cursor - this will move the persistence cursor
807      * out of the initial checkpoint.
808      */
809     items.clear();
810     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
811
812     /* We should have got (MIN_CHECKPOINT_ITEMS + op_ckpt_start) items. */
813     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
814
815     EXPECT_EQ(1, manager->getOpenCheckpointId_UNLOCKED());
816
817     // Create a new checkpoint.
818     EXPECT_EQ(2, manager->createNewCheckpoint());
819
820     // Add another MIN_CHECKPOINT_ITEMS. This should fill up the second
821     // checkpoint.
822     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
823         EXPECT_TRUE(queueNewItem("keyB_" + std::to_string(ii)));
824     }
825
826     // Move the persistence cursor through these new items.
827     EXPECT_EQ(MIN_CHECKPOINT_ITEMS,
828               manager->getNumItemsForCursor(CheckpointManager::pCursorName));
829     items.clear();
830     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
831     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
832
833     // Create a third checkpoint.
834     EXPECT_EQ(3, manager->createNewCheckpoint());
835
836     // Move persistence cursor into third checkpoint.
837     items.clear();
838     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
839     EXPECT_EQ(1, items.size())
840         << "Expected to get a single meta item";
841     EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
842
843     // We now have an unoccupied second checkpoint. We should be able to
844     // collapse this, and move the dcp_cursor into the merged checkpoint.
845     bool newCheckpointCreated;
846     manager->removeClosedUnrefCheckpoints(vbucket, newCheckpointCreated);
847
848     /* Get items for DCP cursor */
849     EXPECT_EQ(MIN_CHECKPOINT_ITEMS/2 + MIN_CHECKPOINT_ITEMS,
850               manager->getNumItemsForCursor(dcp_cursor))
851         << "DCP cursor remaining items should have been recalculated after "
852                 "close of unref checkpoints.";
853
854     items.clear();
855     auto range = manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
856     EXPECT_EQ(1001, range.start);
857     EXPECT_EQ(1020, range.end);
858
859     // Check we have received correct items (done in chunks because
860     // EXPECT_THAT maxes out at 10 elements).
861     std::vector<queued_item> items_a(items.begin(), items.begin() + 5);
862     // Remainder of first checkpoint.
863     EXPECT_THAT(items_a, testing::Each(HasOperation(queue_op::set)));
864
865     // Second checkpoint's data- 10x set.
866     std::vector<queued_item> items_b(items.begin() + 5, items.begin() + 15);
867     EXPECT_THAT(items_b, testing::Each(HasOperation(queue_op::set)));
868
869     // end of second checkpoint and start of third.
870     std::vector<queued_item> items_c(items.begin() + 15, items.end());
871     EXPECT_THAT(items_c,
872                 testing::ElementsAre(HasOperation(queue_op::checkpoint_end),
873                                      HasOperation(queue_op::checkpoint_start)));
874 }
875
876 //
877 // It's critical that the HLC (CAS) is ordered with seqno generation
878 // otherwise XDCR may drop a newer bySeqno mutation because the CAS is not
879 // higher.
880 //
881 TEST_F(CheckpointTest, SeqnoAndHLCOrdering) {
882
883     const int n_threads = 8;
884     const int n_items = 1000;
885
886     // configure so we can store a large number of items
887     checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
888                                          n_threads*n_items,
889                                          /*numCheckpoints*/2,
890                                          /*itemBased*/true,
891                                          /*keepClosed*/false,
892                                          /*enableMerge*/false);
893     createManager();
894
895     /* Sanity check initial state */
896     EXPECT_EQ(1, manager->getNumOfCursors());
897     EXPECT_EQ(1, manager->getNumOpenChkItems());
898     EXPECT_EQ(1, manager->getNumCheckpoints());
899
900     std::vector<std::thread> threads;
901
902     // vector of pairs, first is seqno, second is CAS
903     // just do a scatter gather over n_threads
904     std::vector<std::vector<std::pair<uint64_t, uint64_t> > > threadData(n_threads);
905     for (int ii = 0; ii < n_threads; ii++) {
906         auto& threadsData = threadData[ii];
907         threads.push_back(std::thread([this, ii, n_items, &threadsData](){
908             std::string key = "key" + std::to_string(ii);
909             for (int item  = 0; item < n_items; item++) {
910                 queued_item qi(new Item(key + std::to_string(item),
911                                         vbucket->getId(), queue_op::set,
912                                         /*revSeq*/0, /*bySeq*/0));
913                 EXPECT_TRUE(manager->queueDirty(*vbucket,
914                                                 qi,
915                                                 GenerateBySeqno::Yes,
916                                                 GenerateCas::Yes));
917
918                 // Save seqno/cas
919                 threadsData.push_back(std::make_pair(qi->getBySeqno(), qi->getCas()));
920             }
921         }));
922     }
923
924     // Wait for all threads
925     for (auto& thread : threads) {
926         thread.join();
927     }
928
929     // Now combine the data and check HLC is increasing with seqno
930     std::map<uint64_t, uint64_t> finalData;
931     for (auto t : threadData) {
932         for (auto pair : t) {
933             EXPECT_EQ(finalData.end(), finalData.find(pair.first));
934             finalData[pair.first] = pair.second;
935         }
936     }
937
938     auto itr = finalData.begin();
939     EXPECT_NE(itr, finalData.end());
940     uint64_t previousCas = (itr++)->second;
941     EXPECT_NE(itr, finalData.end());
942     for (; itr != finalData.end(); itr++) {
943         EXPECT_LT(previousCas, itr->second);
944         previousCas = itr->second;
945     }
946
947     // Now a final check, iterate the checkpoint and also check for increasing
948     // HLC.
949     std::vector<queued_item> items;
950     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
951
952     /* We should have got (n_threads*n_items + op_ckpt_start) items. */
953     EXPECT_EQ(n_threads*n_items + 1, items.size());
954
955     previousCas = items[1]->getCas();
956     for (size_t ii = 2; ii < items.size(); ii++) {
957         EXPECT_LT(previousCas, items[ii]->getCas());
958         previousCas = items[ii]->getCas();
959     }
960 }
961
962 // Test cursor is correctly updated when enqueuing a key which already exists
963 // in the checkpoint (and needs de-duping), where the cursor points at a
964 // meta-item at the head of the checkpoint:
965 //
966 //  Before:
967 //      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET(key), 2:SET_VBSTATE() ]
968 //                                                               ^
969 //                                                            Cursor
970 //
971 //  After:
972 //      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 2:SET_VBSTATE(), 2:SET(key) ]
973 //                                                     ^
974 //                                                   Cursor
975 //
976 TEST_F(CheckpointTest, CursorUpdateForExistingItemWithMetaItemAtHead) {
977     // Setup the checkpoint and cursor.
978     ASSERT_EQ(1, manager->getNumItems());
979     ASSERT_TRUE(queueNewItem("key"));
980     ASSERT_EQ(2, manager->getNumItems());
981     manager->queueSetVBState(*vbucket.get());
982
983     ASSERT_EQ(3, manager->getNumItems());
984
985     // Advance persistence cursor so all items have been consumed.
986     std::vector<queued_item> items;
987     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
988     ASSERT_EQ(3, items.size());
989     ASSERT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
990
991     // Queue an item with a duplicate key.
992     queueNewItem("key");
993
994     // Test: Should have one item for cursor (the one we just added).
995     EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
996
997     // Should have another item to read (new version of 'key')
998     items.clear();
999     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1000     EXPECT_EQ(1, items.size());
1001 }
1002
1003 // Test cursor is correctly updated when enqueuing a key which already exists
1004 // in the checkpoint (and needs de-duping), where the cursor points at a
1005 // meta-item *not* at the head of the checkpoint:
1006 //
1007 //  Before:
1008 //      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET_VBSTATE(key), 1:SET() ]
1009 //                                                     ^
1010 //                                                    Cursor
1011 //
1012 //  After:
1013 //      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET_VBSTATE(key), 2:SET() ]
1014 //                                                     ^
1015 //                                                   Cursor
1016 //
1017 TEST_F(CheckpointTest, CursorUpdateForExistingItemWithNonMetaItemAtHead) {
1018     // Setup the checkpoint and cursor.
1019     ASSERT_EQ(1, manager->getNumItems());
1020     manager->queueSetVBState(*vbucket.get());
1021     ASSERT_EQ(2, manager->getNumItems());
1022
1023     // Advance persistence cursor so all items have been consumed.
1024     std::vector<queued_item> items;
1025     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1026     ASSERT_EQ(2, items.size());
1027     ASSERT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
1028
1029     // Queue a set (cursor will now be one behind).
1030     ASSERT_TRUE(queueNewItem("key"));
1031     ASSERT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
1032
1033     // Test: queue an item with a duplicate key.
1034     queueNewItem("key");
1035
1036     // Test: Should have one item for cursor (the one we just added).
1037     EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
1038
1039     // Should an item to read (new version of 'key')
1040     items.clear();
1041     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1042     EXPECT_EQ(1, items.size());
1043     EXPECT_EQ(1002, items.at(0)->getBySeqno());
1044     EXPECT_EQ("key", items.at(0)->getKey());
1045 }