7b76c4ccf23a832720062e4b367c63eeec5d4216
[ep-engine.git] / tests / module_tests / checkpoint_test.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2011 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <algorithm>
21 #include <set>
22 #include <thread>
23 #include <vector>
24
25 #include "checkpoint.h"
26 #include "stats.h"
27 #include "vbucket.h"
28
29 #include <gtest/gtest.h>
30 #include <gmock/gmock.h>
31
32 #define NUM_TAP_THREADS 3
33 #define NUM_SET_THREADS 4
34
35 #define NUM_ITEMS 10000
36
37 #define DCP_CURSOR_PREFIX "dcp-client-"
38 #define TAP_CURSOR_PREFIX "tap-client-"
39
40 struct thread_args {
41     SyncObject *mutex;
42     SyncObject *gate;
43     RCPtr<VBucket> vbucket;
44     CheckpointManager *checkpoint_manager;
45     int *counter;
46     std::string name;
47 };
48
49 extern "C" {
50
51 /**
52  * Dummy callback to replace the flusher callback.
53  */
54 class DummyCB: public Callback<uint16_t> {
55 public:
56     DummyCB() {}
57
58     void callback(uint16_t &dummy) {
59         (void) dummy;
60     }
61 };
62
63 // Test fixture for Checkpoint tests. Once constructed provides a checkpoint
64 // manager and single vBucket (VBID 0).
65 class CheckpointTest : public ::testing::Test {
66 protected:
67     CheckpointTest()
68         : callback(new DummyCB()),
69           vbucket(new VBucket(0, vbucket_state_active, global_stats,
70                               checkpoint_config, /*kvshard*/NULL,
71                               /*lastSeqno*/1000, /*lastSnapStart*/0,
72                               /*lastSnapEnd*/0, /*table*/NULL,
73                               callback, config)) {
74         createManager();
75     }
76
77     void createManager() {
78         manager.reset(new CheckpointManager(global_stats, vbucket->getId(),
79                                             checkpoint_config,
80                                             /*lastSeqno*/1000,
81                                             /*lastSnapStart*/0,/*lastSnapEnd*/0,
82                                             callback));
83     }
84
85     // Creates a new item with the given key and queues it into the checkpoint
86     // manager.
87     bool queueNewItem(const std::string& key) {
88
89         queued_item qi{new Item(key, vbucket->getId(), queue_op::set,
90                                 /*revSeq*/0, /*bySeq*/0)};
91         return manager->queueDirty(*vbucket, qi,
92                                    GenerateBySeqno::Yes, GenerateCas::Yes);
93     }
94
95
96     EPStats global_stats;
97     CheckpointConfig checkpoint_config;
98     Configuration config;
99     std::shared_ptr<Callback<uint16_t> > callback;
100     RCPtr<VBucket> vbucket;
101     std::unique_ptr<CheckpointManager> manager;
102 };
103
104 static void launch_persistence_thread(void *arg) {
105     struct thread_args *args = static_cast<struct thread_args *>(arg);
106     std::unique_lock<std::mutex> lh(*(args->mutex));
107     std::unique_lock<std::mutex> lhg(*(args->gate));
108     ++(*(args->counter));
109     lhg.unlock();
110     args->gate->notify_all();
111     args->mutex->wait(lh);
112     lh.unlock();
113
114     bool flush = false;
115     while(true) {
116         size_t itemPos;
117         std::vector<queued_item> items;
118         const std::string cursor(CheckpointManager::pCursorName);
119         args->checkpoint_manager->getAllItemsForCursor(cursor, items);
120         for(itemPos = 0; itemPos < items.size(); ++itemPos) {
121             queued_item qi = items.at(itemPos);
122             if (qi->getOperation() == queue_op::flush) {
123                 flush = true;
124                 break;
125             }
126         }
127         if (flush) {
128             // Checkpoint start and end operations may have been introduced in
129             // the items queue after the "flush" operation was added. Ignore
130             // these. Anything else will be considered an error.
131             for(size_t i = itemPos + 1; i < items.size(); ++i) {
132                 queued_item qi = items.at(i);
133                 EXPECT_TRUE(queue_op::checkpoint_start == qi->getOperation() ||
134                             queue_op::checkpoint_end == qi->getOperation())
135                     << "Unexpected operation:" << to_string(qi->getOperation());
136             }
137             break;
138         }
139     }
140     EXPECT_TRUE(flush);
141 }
142
143 static void launch_tap_client_thread(void *arg) {
144     struct thread_args *args = static_cast<struct thread_args *>(arg);
145     std::unique_lock<std::mutex> lh(*(args->mutex));
146     std::unique_lock<std::mutex> lhg(*(args->gate));
147     ++(*(args->counter));
148     lhg.unlock();
149     args->gate->notify_all();
150     args->mutex->wait(lh);
151     lh.unlock();
152
153     bool flush = false;
154     bool isLastItem = false;
155     while(true) {
156         queued_item qi = args->checkpoint_manager->nextItem(args->name,
157                                                             isLastItem);
158         if (qi->getOperation() == queue_op::flush) {
159             flush = true;
160             break;
161         }
162     }
163     EXPECT_TRUE(flush);
164 }
165
166 static void launch_checkpoint_cleanup_thread(void *arg) {
167     struct thread_args *args = static_cast<struct thread_args *>(arg);
168     std::unique_lock<std::mutex> lh(*(args->mutex));
169     std::unique_lock<std::mutex> lhg(*(args->gate));
170     ++(*(args->counter));
171     lhg.unlock();
172     args->gate->notify_all();
173     args->mutex->wait(lh);
174     lh.unlock();
175
176     while (args->checkpoint_manager->getNumOfCursors() > 1) {
177         bool newCheckpointCreated;
178         args->checkpoint_manager->removeClosedUnrefCheckpoints(args->vbucket,
179                                                                newCheckpointCreated);
180     }
181 }
182
183 static void launch_set_thread(void *arg) {
184     struct thread_args *args = static_cast<struct thread_args *>(arg);
185     std::unique_lock<std::mutex> lh(*(args->mutex));
186     std::unique_lock<std::mutex> lhg(*(args->gate));
187     ++(*(args->counter));
188     lhg.unlock();
189     args->gate->notify_all();
190     args->mutex->wait(lh);
191     lh.unlock();
192
193     int i(0);
194     for (i = 0; i < NUM_ITEMS; ++i) {
195         std::stringstream key;
196         key << "key-" << i;
197         queued_item qi(new Item(key.str(), args->vbucket->getId(),
198                                 queue_op::set, 0, 0));
199         args->checkpoint_manager->queueDirty(*args->vbucket, qi,
200                                              GenerateBySeqno::Yes,
201                                              GenerateCas::Yes);
202     }
203 }
204 }
205
206 TEST_F(CheckpointTest, basic_chk_test) {
207     std::shared_ptr<Callback<uint16_t> > cb(new DummyCB());
208     RCPtr<VBucket> vbucket(new VBucket(0, vbucket_state_active, global_stats,
209                                        checkpoint_config, NULL, 0, 0, 0, NULL,
210                                        cb, config));
211
212     CheckpointManager *checkpoint_manager = new CheckpointManager(global_stats, 0,
213                                                                   checkpoint_config,
214                                                                   1, 0, 0, cb);
215     SyncObject *mutex = new SyncObject();
216     SyncObject *gate = new SyncObject();
217     int *counter = new int;
218     *counter = 0;
219
220     cb_thread_t tap_threads[NUM_TAP_THREADS];
221     cb_thread_t set_threads[NUM_SET_THREADS];
222     cb_thread_t persistence_thread;
223     cb_thread_t checkpoint_cleanup_thread;
224     int i(0), rc(0);
225
226     struct thread_args t_args;
227     t_args.checkpoint_manager = checkpoint_manager;
228     t_args.vbucket = vbucket;
229     t_args.mutex = mutex;
230     t_args.gate = gate;
231     t_args.counter = counter;
232
233     struct thread_args tap_t_args[NUM_TAP_THREADS];
234     for (i = 0; i < NUM_TAP_THREADS; ++i) {
235         std::string name(TAP_CURSOR_PREFIX + std::to_string(i));
236         tap_t_args[i].checkpoint_manager = checkpoint_manager;
237         tap_t_args[i].vbucket = vbucket;
238         tap_t_args[i].mutex = mutex;
239         tap_t_args[i].gate = gate;
240         tap_t_args[i].counter = counter;
241         tap_t_args[i].name = name;
242         checkpoint_manager->registerCursor(name, 1, false,
243                                            MustSendCheckpointEnd::YES);
244     }
245
246     rc = cb_create_thread(&persistence_thread, launch_persistence_thread, &t_args, 0);
247     EXPECT_EQ(0, rc);
248
249     rc = cb_create_thread(&checkpoint_cleanup_thread,
250                         launch_checkpoint_cleanup_thread, &t_args, 0);
251     EXPECT_EQ(0, rc);
252
253     for (i = 0; i < NUM_TAP_THREADS; ++i) {
254         rc = cb_create_thread(&tap_threads[i], launch_tap_client_thread, &tap_t_args[i], 0);
255         EXPECT_EQ(0, rc);
256     }
257
258     for (i = 0; i < NUM_SET_THREADS; ++i) {
259         rc = cb_create_thread(&set_threads[i], launch_set_thread, &t_args, 0);
260         EXPECT_EQ(0, rc);
261     }
262
263     // Wait for all threads to reach the starting gate
264     while (true) {
265         std::unique_lock<std::mutex> lh(*gate);
266         if (*counter == (NUM_TAP_THREADS + NUM_SET_THREADS + 2)) {
267             break;
268         }
269         gate->wait(lh);
270     }
271     sleep(1);
272     mutex->notify_all();
273
274     for (i = 0; i < NUM_SET_THREADS; ++i) {
275         rc = cb_join_thread(set_threads[i]);
276         EXPECT_EQ(0, rc);
277     }
278
279     // Push the flush command into the queue so that all other threads can be terminated.
280     std::string key("flush");
281     queued_item qi(new Item(key, vbucket->getId(), queue_op::flush, 0xffff, 0));
282     checkpoint_manager->queueDirty(*vbucket, qi, GenerateBySeqno::Yes,
283                                    GenerateCas::Yes);
284
285     rc = cb_join_thread(persistence_thread);
286     EXPECT_EQ(0, rc);
287
288     for (i = 0; i < NUM_TAP_THREADS; ++i) {
289         rc = cb_join_thread(tap_threads[i]);
290         EXPECT_EQ(0, rc);
291         std::stringstream name;
292         name << "tap-client-" << i;
293         checkpoint_manager->removeCursor(name.str());
294     }
295
296     rc = cb_join_thread(checkpoint_cleanup_thread);
297     EXPECT_EQ(0, rc);
298
299     delete checkpoint_manager;
300     delete gate;
301     delete mutex;
302     delete counter;
303 }
304
305 TEST_F(CheckpointTest, reset_checkpoint_id) {
306     int i;
307     for (i = 0; i < 10; ++i) {
308         EXPECT_TRUE(queueNewItem("key-" + std::to_string(i)));
309     }
310     EXPECT_EQ(11, manager->getNumOpenChkItems());
311     EXPECT_EQ(10, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
312
313     EXPECT_EQ(2, manager->createNewCheckpoint());
314
315     size_t itemPos;
316     size_t lastMutationId = 0;
317     std::vector<queued_item> items;
318     const std::string cursor(CheckpointManager::pCursorName);
319     auto range = manager->getAllItemsForCursor(cursor, items);
320     EXPECT_EQ(0, range.start);
321     EXPECT_EQ(1010, range.end);
322     EXPECT_EQ(13, items.size());
323     EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
324     // Check that the next 10 items are all SET operations.
325     for(itemPos = 1; itemPos < 11; ++itemPos) {
326         queued_item qi = items.at(itemPos);
327         EXPECT_EQ(queue_op::set, qi->getOperation());
328         size_t mid = qi->getBySeqno();
329         EXPECT_GT(mid, lastMutationId);
330         lastMutationId = qi->getBySeqno();
331     }
332
333     // Check that the following items are checkpoint end, followed by a
334     // checkpoint start.
335     EXPECT_EQ(queue_op::checkpoint_end, items.at(11)->getOperation());
336     EXPECT_EQ(queue_op::checkpoint_start, items.at(12)->getOperation());
337
338     items.clear();
339
340     manager->checkAndAddNewCheckpoint(1, vbucket);
341     range = manager->getAllItemsForCursor(cursor, items);
342     EXPECT_EQ(1001, range.start);
343     EXPECT_EQ(1010, range.end);
344     EXPECT_EQ(0, items.size());
345 }
346
347 // Sanity check test fixture
348 TEST_F(CheckpointTest, CheckFixture) {
349
350     // Should intially have a single cursor (persistence).
351     EXPECT_EQ(1, manager->getNumOfCursors());
352     EXPECT_EQ(1, manager->getNumOpenChkItems());
353     for (auto& cursor : manager->getAllCursors()) {
354         EXPECT_EQ(CheckpointManager::pCursorName, cursor.first);
355     }
356     // Should initially be zero items to persist.
357     EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
358
359     // Check that the items fetched matches the number we were told to expect.
360     std::vector<queued_item> items;
361     auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
362                                                 items);
363     EXPECT_EQ(0, result.start);
364     EXPECT_EQ(0, result.end);
365     EXPECT_EQ(1, items.size());
366     EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
367 }
368
369 MATCHER_P(HasOperation, op, "") { return arg->getOperation() == op; }
370
371 // Basic test of a single, open checkpoint.
372 TEST_F(CheckpointTest, OneOpenCkpt) {
373
374     // Queue a set operation.
375     queued_item qi(new Item("key1", vbucket->getId(), queue_op::set,
376                             /*revSeq*/20, /*bySeq*/0));
377
378     // No set_ops in queue, expect queueDirty to return true (increase
379     // persistence queue size).
380     EXPECT_TRUE(manager->queueDirty(*vbucket, qi, GenerateBySeqno::Yes,
381                                     GenerateCas::Yes));
382     EXPECT_EQ(1, manager->getNumCheckpoints());  // Single open checkpoint.
383     EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 1x op_set
384     EXPECT_EQ(1001, qi->getBySeqno());
385     EXPECT_EQ(20, qi->getRevSeqno());
386     EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
387
388     // Adding the same key again shouldn't increase the size.
389     queued_item qi2(new Item("key1", vbucket->getId(), queue_op::set,
390                             /*revSeq*/21, /*bySeq*/0));
391     EXPECT_FALSE(manager->queueDirty(*vbucket, qi2, GenerateBySeqno::Yes,
392                                      GenerateCas::Yes));
393     EXPECT_EQ(1, manager->getNumCheckpoints());
394     EXPECT_EQ(2, manager->getNumOpenChkItems());
395     EXPECT_EQ(1002, qi2->getBySeqno());
396     EXPECT_EQ(21, qi2->getRevSeqno());
397     EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
398
399     // Adding a different key should increase size.
400     queued_item qi3(new Item("key2", vbucket->getId(), queue_op::set,
401                             /*revSeq*/0, /*bySeq*/0));
402     EXPECT_TRUE(manager->queueDirty(*vbucket, qi3, GenerateBySeqno::Yes,
403                                     GenerateCas::Yes));
404     EXPECT_EQ(1, manager->getNumCheckpoints());
405     EXPECT_EQ(3, manager->getNumOpenChkItems());
406     EXPECT_EQ(1003, qi3->getBySeqno());
407     EXPECT_EQ(0, qi3->getRevSeqno());
408     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
409
410     // Check that the items fetched matches the number we were told to expect.
411     std::vector<queued_item> items;
412     auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
413                                                 items);
414     EXPECT_EQ(0, result.start);
415     EXPECT_EQ(1003, result.end);
416     EXPECT_EQ(3, items.size());
417     testing::ElementsAre(HasOperation(queue_op::checkpoint_start),
418                          HasOperation(queue_op::set),
419                          HasOperation(queue_op::set));
420 }
421
422 // Test with one open and one closed checkpoint.
423 TEST_F(CheckpointTest, OneOpenOneClosed) {
424
425     // Add some items to the initial (open) checkpoint.
426     for (auto i : {1,2}) {
427         EXPECT_TRUE(queueNewItem("key" + std::to_string(i)));
428     }
429     EXPECT_EQ(1, manager->getNumCheckpoints());
430     EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
431     const uint64_t ckpt_id1 = manager->getOpenCheckpointId();
432
433     // Create a new checkpoint (closing the current open one).
434     const uint64_t ckpt_id2 = manager->createNewCheckpoint();
435     EXPECT_NE(ckpt_id1, ckpt_id2) << "New checkpoint ID should differ from old";
436     EXPECT_EQ(ckpt_id1, manager->getLastClosedCheckpointId());
437     EXPECT_EQ(1, manager->getNumOpenChkItems()); // 1x op_checkpoint_start
438
439     // Add some items to the newly-opened checkpoint (note same keys as 1st
440     // ckpt).
441     for (auto ii : {1,2}) {
442         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
443     }
444     EXPECT_EQ(2, manager->getNumCheckpoints());
445     EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
446
447     // Examine the items - should be 2 lots of two keys.
448     EXPECT_EQ(4, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
449
450     // Check that the items fetched matches the number we were told to expect.
451     std::vector<queued_item> items;
452     auto result = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
453                                                 items);
454     EXPECT_EQ(0, result.start);
455     EXPECT_EQ(1004, result.end);
456     EXPECT_EQ(7, items.size());
457     EXPECT_THAT(items,
458                 testing::ElementsAre(HasOperation(queue_op::checkpoint_start),
459                                      HasOperation(queue_op::set),
460                                      HasOperation(queue_op::set),
461                                      HasOperation(queue_op::checkpoint_end),
462                                      HasOperation(queue_op::checkpoint_start),
463                                      HasOperation(queue_op::set),
464                                      HasOperation(queue_op::set)));
465 }
466
467 // Test the automatic creation of checkpoints based on the number of items.
468 TEST_F(CheckpointTest, ItemBasedCheckpointCreation) {
469
470     // Size down the default number of items to create a new checkpoint and
471     // recreate the manager
472     checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
473                                          MIN_CHECKPOINT_ITEMS,
474                                          /*numCheckpoints*/2,
475                                          /*itemBased*/true,
476                                          /*keepClosed*/false,
477                                          /*enableMerge*/false);
478     createManager();
479
480     // Sanity check initial state.
481     EXPECT_EQ(1, manager->getNumOfCursors());
482     EXPECT_EQ(1, manager->getNumOpenChkItems());
483     EXPECT_EQ(1, manager->getNumCheckpoints());
484
485     // Create one less than the number required to create a new checkpoint.
486     queued_item qi;
487     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
488         EXPECT_EQ(ii + 1, manager->getNumOpenChkItems()); /* +1 for op_ckpt_start */
489
490         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
491         EXPECT_EQ(1, manager->getNumCheckpoints());
492
493     }
494
495     // Add one more - should create a new checkpoint.
496     EXPECT_TRUE(queueNewItem("key_epoch"));
497     EXPECT_EQ(2, manager->getNumCheckpoints());
498     EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_ckpt_start, 1x op_set
499
500     // Fill up this checkpoint also - note loop for MIN_CHECKPOINT_ITEMS - 1
501     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS - 1; ii++) {
502         EXPECT_EQ(ii + 2, manager->getNumOpenChkItems()); /* +2 op_ckpt_start, key_epoch */
503
504         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
505
506         EXPECT_EQ(2, manager->getNumCheckpoints());
507     }
508
509     // Add one more - as we have hit maximum checkpoints should *not* create a
510     // new one.
511     EXPECT_TRUE(queueNewItem("key_epoch2"));
512     EXPECT_EQ(2, manager->getNumCheckpoints());
513     EXPECT_EQ(12, // 1x op_ckpt_start, 1x key_epoch, 9x key_X, 1x key_epoch2
514               manager->getNumOpenChkItems());
515
516     // Fetch the items associated with the persistence cursor. This
517     // moves the single cursor registered outside of the initial checkpoint,
518     // allowing a new open checkpoint to be created.
519     EXPECT_EQ(1, manager->getNumOfCursors());
520     snapshot_range_t range;
521     std::vector<queued_item> items;
522     range = manager->getAllItemsForCursor(CheckpointManager::pCursorName,
523                                          items);
524
525     EXPECT_EQ(0, range.start);
526     EXPECT_EQ(1021, range.end);
527     EXPECT_EQ(24, items.size());
528
529     // Should still have the same number of checkpoints and open items.
530     EXPECT_EQ(2, manager->getNumCheckpoints());
531     EXPECT_EQ(12, manager->getNumOpenChkItems());
532
533     // But adding a new item will create a new one.
534     EXPECT_TRUE(queueNewItem("key_epoch3"));
535     EXPECT_EQ(3, manager->getNumCheckpoints());
536     EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_ckpt_start, 1x op_set
537 }
538
539 // Test checkpoint and cursor accounting - when checkpoints are closed the
540 // offset of cursors is updated as appropriate.
541 TEST_F(CheckpointTest, CursorOffsetOnCheckpointClose) {
542
543     // Add two items to the initial (open) checkpoint.
544     for (auto i : {1,2}) {
545         EXPECT_TRUE(queueNewItem("key" + std::to_string(i)));
546     }
547     EXPECT_EQ(1, manager->getNumCheckpoints());
548     EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
549
550     // Use the existing persistence cursor for this test:
551     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
552         << "Cursor should initially have two items pending";
553
554     // Check de-dupe counting - after adding another item with the same key,
555     // should still see two items.
556     EXPECT_FALSE(queueNewItem("key1"))
557         << "Adding a duplicate key to open checkpoint should not increase queue size";
558
559     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
560         << "Expected 2 items for cursor (2x op_set) after adding a duplicate.";
561
562     // Create a new checkpoint (closing the current open one).
563     manager->createNewCheckpoint();
564     EXPECT_EQ(1, manager->getNumOpenChkItems())
565         << "Expected 1 item (1x op_checkpoint_start)";
566     EXPECT_EQ(2, manager->getNumCheckpoints());
567     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
568         << "Expected 2 items for cursor after creating new checkpoint";
569
570     // Advance cursor - first to get the 'checkpoint_start' meta item,
571     // and a second time to get the a 'proper' mutation.
572     bool isLastMutationItem;
573     auto item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
574     EXPECT_TRUE(item->isCheckPointMetaItem());
575     EXPECT_FALSE(isLastMutationItem);
576     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
577         << "Expected 2 items for cursor after advancing one item";
578
579     item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
580     EXPECT_FALSE(item->isCheckPointMetaItem());
581     EXPECT_FALSE(isLastMutationItem);
582     EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
583         << "Expected 1 item for cursor after advancing by 1";
584
585     // Add two items to the newly-opened checkpoint. Same keys as 1st ckpt,
586     // but cannot de-dupe across checkpoints.
587     for (auto ii : {1,2}) {
588         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
589     }
590
591     EXPECT_EQ(3, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
592         << "Expected 3 items for cursor after adding 2 more to new checkpoint";
593
594     // Advance the cursor 'out' of the first checkpoint.
595     item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
596     EXPECT_FALSE(item->isCheckPointMetaItem());
597     EXPECT_TRUE(isLastMutationItem);
598
599     // Now at the end of the first checkpoint, move into the next checkpoint.
600     item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
601     EXPECT_TRUE(item->isCheckPointMetaItem());
602     EXPECT_TRUE(isLastMutationItem);
603     item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
604     EXPECT_TRUE(item->isCheckPointMetaItem());
605     EXPECT_FALSE(isLastMutationItem);
606
607     // Tell Checkpoint manager the items have been persisted, so it advances
608     // pCursorPreCheckpointId, which will allow us to remove the closed
609     // unreferenced checkpoints.
610     manager->itemsPersisted();
611
612     // Both previous checkpoints are unreferenced. Close them. This will
613     // cause the offset of this cursor to be recalculated.
614     bool new_open_ckpt_created;
615     EXPECT_EQ(2, manager->removeClosedUnrefCheckpoints(vbucket,
616                                                            new_open_ckpt_created));
617
618     EXPECT_EQ(1, manager->getNumCheckpoints());
619
620     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
621
622     // Drain the remaining items.
623     item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
624     EXPECT_FALSE(item->isCheckPointMetaItem());
625     EXPECT_FALSE(isLastMutationItem);
626     item = manager->nextItem(CheckpointManager::pCursorName, isLastMutationItem);
627     EXPECT_FALSE(item->isCheckPointMetaItem());
628     EXPECT_TRUE(isLastMutationItem);
629
630     EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
631 }
632
633 // Test the getAllItemsForCursor()
634 TEST_F(CheckpointTest, ItemsForCheckpointCursor) {
635     /* We want to have items across 2 checkpoints. Size down the default number
636        of items to create a new checkpoint and recreate the manager */
637     checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
638                                          MIN_CHECKPOINT_ITEMS,
639                                          /*numCheckpoints*/2,
640                                          /*itemBased*/true,
641                                          /*keepClosed*/false,
642                                          /*enableMerge*/false);
643     createManager();
644
645     /* Sanity check initial state */
646     EXPECT_EQ(1, manager->getNumOfCursors());
647     EXPECT_EQ(1, manager->getNumOpenChkItems());
648     EXPECT_EQ(1, manager->getNumCheckpoints());
649
650     /* Add items such that we have 2 checkpoints */
651     queued_item qi;
652     for (unsigned int ii = 0; ii < 2 * MIN_CHECKPOINT_ITEMS; ii++) {
653         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
654     }
655
656     /* Check if we have desired number of checkpoints and desired number of
657        items */
658     EXPECT_EQ(2, manager->getNumCheckpoints());
659     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, manager->getNumOpenChkItems());
660     /* MIN_CHECKPOINT_ITEMS items + op_ckpt_start */
661
662     /* Register DCP replication cursor */
663     std::string dcp_cursor(DCP_CURSOR_PREFIX + std::to_string(1));
664     manager->registerCursorBySeqno(dcp_cursor.c_str(), 0,
665                                    MustSendCheckpointEnd::NO);
666
667     /* Get items for persistence*/
668     std::vector<queued_item> items;
669     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
670
671     /* We should have got (2 * MIN_CHECKPOINT_ITEMS + 3) items. 3 additional are
672        op_ckpt_start, op_ckpt_end and op_ckpt_start */
673     EXPECT_EQ(2 * MIN_CHECKPOINT_ITEMS + 3, items.size());
674
675     /* Get items for DCP replication cursor */
676     items.clear();
677     manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
678     EXPECT_EQ(2 * MIN_CHECKPOINT_ITEMS + 3, items.size());
679 }
680
681 // Test the checkpoint cursor movement
682 TEST_F(CheckpointTest, CursorMovement) {
683     /* We want to have items across 2 checkpoints. Size down the default number
684      of items to create a new checkpoint and recreate the manager */
685     checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
686                                          MIN_CHECKPOINT_ITEMS,
687                                          /*numCheckpoints*/2,
688                                          /*itemBased*/true,
689                                          /*keepClosed*/false,
690                                          /*enableMerge*/false);
691     createManager();
692
693     /* Sanity check initial state */
694     EXPECT_EQ(1, manager->getNumOfCursors());
695     EXPECT_EQ(1, manager->getNumOpenChkItems());
696     EXPECT_EQ(1, manager->getNumCheckpoints());
697
698     /* Add items such that we have 1 full (max items as per config) checkpoint.
699        Adding another would open new checkpoint */
700     queued_item qi;
701     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
702         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
703     }
704
705     /* Check if we have desired number of checkpoints and desired number of
706        items */
707     EXPECT_EQ(1, manager->getNumCheckpoints());
708     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, manager->getNumOpenChkItems());
709     /* MIN_CHECKPOINT_ITEMS items + op_ckpt_start */
710
711     /* Register DCP replication cursor */
712     std::string dcp_cursor(DCP_CURSOR_PREFIX + std::to_string(1));
713     manager->registerCursorBySeqno(dcp_cursor.c_str(), 0,
714                                    MustSendCheckpointEnd::NO);
715
716     /* Registor TAP cursor */
717     std::string tap_cursor(TAP_CURSOR_PREFIX + std::to_string(1));
718     manager->registerCursor(tap_cursor, 1, false, MustSendCheckpointEnd::YES);
719
720     /* Get items for persistence cursor */
721     std::vector<queued_item> items;
722     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
723
724     /* We should have got (MIN_CHECKPOINT_ITEMS + op_ckpt_start) items. */
725     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
726
727     /* Get items for DCP replication cursor */
728     items.clear();
729     manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
730     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
731
732     /* Get items for TAP cursor */
733     int num_items = 0;
734     while(true) {
735         bool isLastItem = false;
736         qi = manager->nextItem(tap_cursor, isLastItem);
737         num_items++;
738         if (isLastItem) {
739             break;
740         }
741     }
742     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, num_items);
743
744     uint64_t curr_open_chkpt_id = manager->getOpenCheckpointId_UNLOCKED();
745
746     /* Run the checkpoint remover so that new open checkpoint is created */
747     bool newCheckpointCreated;
748     manager->removeClosedUnrefCheckpoints(vbucket, newCheckpointCreated);
749     EXPECT_EQ(curr_open_chkpt_id + 1, manager->getOpenCheckpointId_UNLOCKED());
750
751     /* Get items for persistence cursor */
752     EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
753         << "Expected to have no normal (only meta) items";
754     items.clear();
755     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
756
757     /* We should have got op_ckpt_start item */
758     EXPECT_EQ(1, items.size());
759     EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
760
761     /* Get items for DCP replication cursor */
762     EXPECT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
763         << "Expected to have no normal (only meta) items";
764     items.clear();
765     manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
766     /* Expecting only 1 op_ckpt_start item */
767     EXPECT_EQ(1, items.size());
768     EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
769
770     /* Get item for TAP cursor. We expect TAP to send op_ckpt_end of last
771        checkpoint. TAP unlike DCP cannot skip the op_ckpt_end message */
772     bool isLastItem = false;
773     qi = manager->nextItem(tap_cursor, isLastItem);
774     EXPECT_EQ(queue_op::checkpoint_end, qi->getOperation());
775     EXPECT_EQ(true, isLastItem);
776
777 }
778
779 // Test the checkpoint cursor movement for replica vBuckets (where we can
780 // perform more checkpoint collapsing)
781 TEST_F(CheckpointTest, CursorMovementReplicaMerge) {
782
783     vbucket->setState(vbucket_state_replica);
784
785     /* We want to have items across 2 checkpoints. Size down the default number
786      of items to create a new checkpoint and recreate the manager */
787     checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
788                                          MIN_CHECKPOINT_ITEMS,
789                                          /*numCheckpoints*/2,
790                                          /*itemBased*/true,
791                                          /*keepClosed*/false,
792                                          /*enableMerge*/true);
793
794     // Add items such that we have a checkpoint at half-capacity.
795     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS / 2; ii++) {
796         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
797     }
798
799     /* Check if we have desired number of checkpoints and desired number of
800         items */
801     EXPECT_EQ(1, manager->getNumCheckpoints());
802     // +1 for checkpoint_start.
803     EXPECT_EQ((MIN_CHECKPOINT_ITEMS / 2) + 1, manager->getNumOpenChkItems());
804
805     // Register DCP replication cursor, which will be moved into the middle of
806     // first checkpoint and then left there.
807     std::string dcp_cursor{DCP_CURSOR_PREFIX + std::to_string(1)};
808     manager->registerCursorBySeqno(dcp_cursor.c_str(), 0,
809                                    MustSendCheckpointEnd::NO);
810
811     std::vector<queued_item> items;
812     manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
813     EXPECT_EQ((MIN_CHECKPOINT_ITEMS / 2) + 1, items.size());
814
815     // Add more items so this checkpoint is now full.
816     for (unsigned int ii = MIN_CHECKPOINT_ITEMS / 2; ii < MIN_CHECKPOINT_ITEMS;
817             ii++) {
818         EXPECT_TRUE(queueNewItem("key" + std::to_string(ii)));
819     }
820     EXPECT_EQ(1, manager->getNumCheckpoints())
821         << "Should still only have 1 checkpoint after adding "
822                 "MIN_CHECKPOINT_ITEMS total";
823     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, manager->getNumOpenChkItems());
824
825     /* Get items for persistence cursor - this will move the persistence cursor
826      * out of the initial checkpoint.
827      */
828     items.clear();
829     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
830
831     /* We should have got (MIN_CHECKPOINT_ITEMS + op_ckpt_start) items. */
832     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
833
834     EXPECT_EQ(1, manager->getOpenCheckpointId_UNLOCKED());
835
836     // Create a new checkpoint.
837     EXPECT_EQ(2, manager->createNewCheckpoint());
838
839     // Add another MIN_CHECKPOINT_ITEMS. This should fill up the second
840     // checkpoint.
841     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
842         EXPECT_TRUE(queueNewItem("keyB_" + std::to_string(ii)));
843     }
844
845     // Move the persistence cursor through these new items.
846     EXPECT_EQ(MIN_CHECKPOINT_ITEMS,
847               manager->getNumItemsForCursor(CheckpointManager::pCursorName));
848     items.clear();
849     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
850     EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
851
852     // Create a third checkpoint.
853     EXPECT_EQ(3, manager->createNewCheckpoint());
854
855     // Move persistence cursor into third checkpoint.
856     items.clear();
857     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
858     EXPECT_EQ(1, items.size())
859         << "Expected to get a single meta item";
860     EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
861
862     // We now have an unoccupied second checkpoint. We should be able to
863     // collapse this, and move the dcp_cursor into the merged checkpoint.
864     bool newCheckpointCreated;
865     manager->removeClosedUnrefCheckpoints(vbucket, newCheckpointCreated);
866
867     /* Get items for DCP cursor */
868     EXPECT_EQ(MIN_CHECKPOINT_ITEMS/2 + MIN_CHECKPOINT_ITEMS,
869               manager->getNumItemsForCursor(dcp_cursor))
870         << "DCP cursor remaining items should have been recalculated after "
871                 "close of unref checkpoints.";
872
873     items.clear();
874     auto range = manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
875     EXPECT_EQ(1001, range.start);
876     EXPECT_EQ(1020, range.end);
877
878     // Check we have received correct items (done in chunks because
879     // EXPECT_THAT maxes out at 10 elements).
880     std::vector<queued_item> items_a(items.begin(), items.begin() + 5);
881     // Remainder of first checkpoint.
882     EXPECT_THAT(items_a, testing::Each(HasOperation(queue_op::set)));
883
884     // Second checkpoint's data- 10x set.
885     std::vector<queued_item> items_b(items.begin() + 5, items.begin() + 15);
886     EXPECT_THAT(items_b, testing::Each(HasOperation(queue_op::set)));
887
888     // end of second checkpoint and start of third.
889     std::vector<queued_item> items_c(items.begin() + 15, items.end());
890     EXPECT_THAT(items_c,
891                 testing::ElementsAre(HasOperation(queue_op::checkpoint_end),
892                                      HasOperation(queue_op::checkpoint_start)));
893 }
894
895 //
896 // It's critical that the HLC (CAS) is ordered with seqno generation
897 // otherwise XDCR may drop a newer bySeqno mutation because the CAS is not
898 // higher.
899 //
900 TEST_F(CheckpointTest, SeqnoAndHLCOrdering) {
901
902     const int n_threads = 8;
903     const int n_items = 1000;
904
905     // configure so we can store a large number of items
906     checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
907                                          n_threads*n_items,
908                                          /*numCheckpoints*/2,
909                                          /*itemBased*/true,
910                                          /*keepClosed*/false,
911                                          /*enableMerge*/false);
912     createManager();
913
914     /* Sanity check initial state */
915     EXPECT_EQ(1, manager->getNumOfCursors());
916     EXPECT_EQ(1, manager->getNumOpenChkItems());
917     EXPECT_EQ(1, manager->getNumCheckpoints());
918
919     std::vector<std::thread> threads;
920
921     // vector of pairs, first is seqno, second is CAS
922     // just do a scatter gather over n_threads
923     std::vector<std::vector<std::pair<uint64_t, uint64_t> > > threadData(n_threads);
924     for (int ii = 0; ii < n_threads; ii++) {
925         auto& threadsData = threadData[ii];
926         threads.push_back(std::thread([this, ii, n_items, &threadsData](){
927             std::string key = "key" + std::to_string(ii);
928             for (int item  = 0; item < n_items; item++) {
929                 queued_item qi(new Item(key + std::to_string(item),
930                                         vbucket->getId(), queue_op::set,
931                                         /*revSeq*/0, /*bySeq*/0));
932                 EXPECT_TRUE(manager->queueDirty(*vbucket,
933                                                 qi,
934                                                 GenerateBySeqno::Yes,
935                                                 GenerateCas::Yes));
936
937                 // Save seqno/cas
938                 threadsData.push_back(std::make_pair(qi->getBySeqno(), qi->getCas()));
939             }
940         }));
941     }
942
943     // Wait for all threads
944     for (auto& thread : threads) {
945         thread.join();
946     }
947
948     // Now combine the data and check HLC is increasing with seqno
949     std::map<uint64_t, uint64_t> finalData;
950     for (auto t : threadData) {
951         for (auto pair : t) {
952             EXPECT_EQ(finalData.end(), finalData.find(pair.first));
953             finalData[pair.first] = pair.second;
954         }
955     }
956
957     auto itr = finalData.begin();
958     EXPECT_NE(itr, finalData.end());
959     uint64_t previousCas = (itr++)->second;
960     EXPECT_NE(itr, finalData.end());
961     for (; itr != finalData.end(); itr++) {
962         EXPECT_LT(previousCas, itr->second);
963         previousCas = itr->second;
964     }
965
966     // Now a final check, iterate the checkpoint and also check for increasing
967     // HLC.
968     std::vector<queued_item> items;
969     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
970
971     /* We should have got (n_threads*n_items + op_ckpt_start) items. */
972     EXPECT_EQ(n_threads*n_items + 1, items.size());
973
974     previousCas = items[1]->getCas();
975     for (size_t ii = 2; ii < items.size(); ii++) {
976         EXPECT_LT(previousCas, items[ii]->getCas());
977         previousCas = items[ii]->getCas();
978     }
979 }
980
981 // Test cursor is correctly updated when enqueuing a key which already exists
982 // in the checkpoint (and needs de-duping), where the cursor points at a
983 // meta-item at the head of the checkpoint:
984 //
985 //  Before:
986 //      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET(key), 2:SET_VBSTATE() ]
987 //                                                               ^
988 //                                                            Cursor
989 //
990 //  After:
991 //      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 2:SET_VBSTATE(), 2:SET(key) ]
992 //                                                     ^
993 //                                                   Cursor
994 //
995 TEST_F(CheckpointTest, CursorUpdateForExistingItemWithMetaItemAtHead) {
996     // Setup the checkpoint and cursor.
997     ASSERT_EQ(1, manager->getNumItems());
998     ASSERT_TRUE(queueNewItem("key"));
999     ASSERT_EQ(2, manager->getNumItems());
1000     manager->queueSetVBState(*vbucket.get());
1001
1002     ASSERT_EQ(3, manager->getNumItems());
1003
1004     // Advance persistence cursor so all items have been consumed.
1005     std::vector<queued_item> items;
1006     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1007     ASSERT_EQ(3, items.size());
1008     ASSERT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
1009
1010     // Queue an item with a duplicate key.
1011     queueNewItem("key");
1012
1013     // Test: Should have one item for cursor (the one we just added).
1014     EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
1015
1016     // Should have another item to read (new version of 'key')
1017     items.clear();
1018     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1019     EXPECT_EQ(1, items.size());
1020 }
1021
1022 // Test cursor is correctly updated when enqueuing a key which already exists
1023 // in the checkpoint (and needs de-duping), where the cursor points at a
1024 // meta-item *not* at the head of the checkpoint:
1025 //
1026 //  Before:
1027 //      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET_VBSTATE(key), 1:SET() ]
1028 //                                                     ^
1029 //                                                    Cursor
1030 //
1031 //  After:
1032 //      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET_VBSTATE(key), 2:SET() ]
1033 //                                                     ^
1034 //                                                   Cursor
1035 //
1036 TEST_F(CheckpointTest, CursorUpdateForExistingItemWithNonMetaItemAtHead) {
1037     // Setup the checkpoint and cursor.
1038     ASSERT_EQ(1, manager->getNumItems());
1039     manager->queueSetVBState(*vbucket.get());
1040     ASSERT_EQ(2, manager->getNumItems());
1041
1042     // Advance persistence cursor so all items have been consumed.
1043     std::vector<queued_item> items;
1044     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1045     ASSERT_EQ(2, items.size());
1046     ASSERT_EQ(0, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
1047
1048     // Queue a set (cursor will now be one behind).
1049     ASSERT_TRUE(queueNewItem("key"));
1050     ASSERT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
1051
1052     // Test: queue an item with a duplicate key.
1053     queueNewItem("key");
1054
1055     // Test: Should have one item for cursor (the one we just added).
1056     EXPECT_EQ(1, manager->getNumItemsForCursor(CheckpointManager::pCursorName));
1057
1058     // Should an item to read (new version of 'key')
1059     items.clear();
1060     manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1061     EXPECT_EQ(1, items.size());
1062     EXPECT_EQ(1002, items.at(0)->getBySeqno());
1063     EXPECT_EQ("key", items.at(0)->getKey());
1064 }