MB-20798: Allow CAS and seqno to be generated consistently 70/67670/16
authorJim Walker <jim@couchbase.com>
Tue, 27 Sep 2016 11:26:44 +0000 (12:26 +0100)
committerDave Rigby <daver@couchbase.com>
Wed, 12 Oct 2016 07:42:38 +0000 (07:42 +0000)
Add a new option to queueDirty so that it can generate
and assign the CAS to the StoredValue.

This allows us to create a seqno and CAS under the same
lock, thus seqno and CAS will be incrementing when
a checkpoint is serially observed.

Change-Id: Ic24619326a4e8722613824f2253b606d228e98c7
Reviewed-on: http://review.couchbase.org/67670
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/checkpoint.cc
src/checkpoint.h
src/ep.cc
src/ep.h
src/ep_types.h
tests/module_tests/checkpoint_test.cc

index 0a0f522..9ef8ff6 100644 (file)
@@ -966,7 +966,8 @@ std::vector<std::string> CheckpointManager::getListOfCursorsToDrop() {
 
 bool CheckpointManager::queueDirty(const RCPtr<VBucket> &vb,
                                    queued_item& qi,
-                                   const GenerateBySeqno generateBySeqno) {
+                                   const GenerateBySeqno generateBySeqno,
+                                   const GenerateCas generateCas) {
     LockHolder lh(queueLock);
     if (!vb) {
         throw std::invalid_argument("CheckpointManager::queueDirty: vb must "
@@ -1011,6 +1012,13 @@ bool CheckpointManager::queueDirty(const RCPtr<VBucket> &vb,
     } else {
         lastBySeqno = qi->getBySeqno();
     }
+
+    // MB-20798: Allow the HLC to be created 'atomically' with the seqno as
+    // we're holding the ::queueLock.
+    if (GenerateCas::Yes == generateCas) {
+        qi->setCas(vb->nextHLCCas());
+    }
+
     uint64_t st = checkpointList.back()->getSnapshotStartSeqno();
     uint64_t en = checkpointList.back()->getSnapshotEndSeqno();
     if (!(st <= static_cast<uint64_t>(lastBySeqno) &&
index f45a175..b164c75 100644 (file)
@@ -549,7 +549,8 @@ public:
      */
     bool queueDirty(const RCPtr<VBucket> &vb,
                     queued_item& qi,
-                    const GenerateBySeqno generateBySeqno);
+                    const GenerateBySeqno generateBySeqno,
+                    const GenerateCas generateCas);
 
     /**
      * Return the next item to be sent to a given connection
index 821f642..a9697e9 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -619,7 +619,6 @@ EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
                     }
                 } else if (v->isExpired(startTime) && !v->isDeleted()) {
                     vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
-                    v->setCas(vb->nextHLCCas());
                     queueDirty(vb, v, &lh, NULL);
                 }
             } else {
@@ -637,7 +636,6 @@ EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
                         v->setDeleted();
                         v->setRevSeqno(revSeqno);
                         vb->ht.unlocked_softDelete(v, 0, eviction_policy);
-                        v->setCas(vb->nextHLCCas());
                         queueDirty(vb, v, &lh, NULL);
                     }
                 }
@@ -677,7 +675,6 @@ StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
             if (queueExpired && vb->getState() == vbucket_state_active) {
                 incExpirationStat(vb, EXP_BY_ACCESS);
                 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
-                v->setCas(vb->nextHLCCas());
                 queueDirty(vb, v, NULL, NULL);
             }
             return wantDeleted ? v : NULL;
@@ -860,10 +857,10 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
         // Even if the item was dirty, push it into the vbucket's open
         // checkpoint.
     case WAS_CLEAN:
-        it.setCas(vb->nextHLCCas());
-        v->setCas(it.getCas());
-        queueDirty(vb, v, &lh, &seqno);
+        // We keep lh held as we need to do v->getCas()
+        queueDirty(vb, v, nullptr, &seqno);
         it.setBySeqno(seqno);
+        it.setCas(v->getCas());
         break;
     case NEED_BG_FETCH:
     {   // CAS operation with non-resident item + full eviction.
@@ -951,10 +948,10 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
         return ENGINE_EWOULDBLOCK;
     case ADD_SUCCESS:
     case ADD_UNDEL:
-        it.setCas(vb->nextHLCCas());
-        v->setCas(it.getCas());
-        queueDirty(vb, v, &lh, &seqno);
+        // We need to keep lh as we will do v->getCas()
+        queueDirty(vb, v, nullptr, &seqno);
         it.setBySeqno(seqno);
+        it.setCas(v->getCas());
         break;
     }
 
@@ -1019,10 +1016,10 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
                 // Even if the item was dirty, push it into the vbucket's open
                 // checkpoint.
             case WAS_CLEAN:
-                it.setCas(vb->nextHLCCas());
-                v->setCas(it.getCas());
-                queueDirty(vb, v, &lh, &seqno);
+                // Keep lh as we need to do v->getCas()
+                queueDirty(vb, v, nullptr, &seqno);
                 it.setBySeqno(seqno);
+                it.setCas(v->getCas());
                 break;
             case NEED_BG_FETCH:
             {
@@ -1861,7 +1858,8 @@ void EventuallyPersistentStore::completeBGFetch(const std::string &key,
                         // returns, the item may have been updated and queued
                         // Hence test the CAS value to be the same first.
                         // exptime mutated, schedule it into new checkpoint
-                        queueDirty(vb, v, &hlh, NULL);
+                        queueDirty(vb, v, &hlh, NULL, GenerateBySeqno::Yes,
+                                                    GenerateCas::No);
                     }
                 } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
                     v->setNonExistent();
@@ -1981,7 +1979,8 @@ void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
                             // updated and queued
                             // Hence test the CAS value to be the same first.
                             // exptime mutated, schedule it into new checkpoint
-                            queueDirty(vb, v, &blh, NULL);
+                            queueDirty(vb, v, &blh, NULL, GenerateBySeqno::Yes,
+                                                        GenerateCas::No);
                         }
                     } else if (status == ENGINE_KEY_ENOENT) {
                         v->setNonExistent();
@@ -2360,7 +2359,8 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
     case WAS_CLEAN:
         vb->setMaxCas(v->getCas());
         queueDirty(vb, v, &lh, seqno,
-                   genBySeqno ? GenerateBySeqno::Yes : GenerateBySeqno::No);
+                   genBySeqno ? GenerateBySeqno::Yes : GenerateBySeqno::No,
+                   GenerateCas::No);
         break;
     case NOT_FOUND:
         ret = ENGINE_KEY_ENOENT;
@@ -2429,7 +2429,6 @@ GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
         if (exptime_mutated) {
             v->markDirty();
             v->setExptime(exptime);
-            v->setCas(vb->nextHLCCas());
             v->setRevSeqno(v->getRevSeqno()+1);
         }
 
@@ -2853,8 +2852,6 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
     mutation_type_t delrv;
     delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
     if (v && (delrv == NOT_FOUND || delrv == WAS_DIRTY || delrv == WAS_CLEAN)) {
-        v->setCas(vb->nextHLCCas());
-        *cas = v->getCas();
         if (itemMeta != nullptr) {
             itemMeta->revSeqno = v->getRevSeqno();
             itemMeta->cas = v->getCas();
@@ -2880,15 +2877,21 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
         break;
     case NOT_FOUND:
         ret = ENGINE_KEY_ENOENT;
+    case WAS_CLEAN:
+    case WAS_DIRTY:
         if (v) {
-            queueDirty(vb, v, &lh, NULL);
+            // Keep lh as we need to do v->getCas
+            queueDirty(vb, v, nullptr, &seqno);
+            *cas = v->getCas();
+        }
+
+        if (delrv != NOT_FOUND) {
+            mutInfo->seqno = seqno;
+            mutInfo->vbucket_uuid = vb->failovers->getLatestUUID();
+            if (itemMeta != nullptr) {
+                itemMeta->cas = v->getCas();
+            }
         }
-        break;
-    case WAS_DIRTY:
-    case WAS_CLEAN:
-        queueDirty(vb, v, &lh, &seqno);
-        mutInfo->seqno = seqno;
-        mutInfo->vbucket_uuid = vb->failovers->getLatestUUID();
         break;
     case NEED_BG_FETCH:
         // We already figured out if a bg fetch is requred for a full-evicted
@@ -3032,7 +3035,8 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
                           genBySeqno ? GenerateBySeqno::Yes : GenerateBySeqno::No);
         } else {
             queueDirty(vb, v, &lh, seqno,
-                       genBySeqno ? GenerateBySeqno::Yes : GenerateBySeqno::No);
+                       genBySeqno ? GenerateBySeqno::Yes : GenerateBySeqno::No,
+                       GenerateCas::No);
         }
         break;
     case NEED_BG_FETCH:
@@ -3515,17 +3519,23 @@ void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
                                            StoredValue* v,
                                            LockHolder *plh,
                                            uint64_t *seqno,
-                                           const GenerateBySeqno generateBySeqno) {
+                                           const GenerateBySeqno generateBySeqno,
+                                           const GenerateCas generateCas) {
     if (vb) {
         queued_item qi(v->toItem(false, vb->getId()));
 
-        bool rv = vb->checkpointManager.queueDirty(vb, qi, generateBySeqno);
+        bool rv = vb->checkpointManager.queueDirty(vb, qi,
+                                                   generateBySeqno, generateCas);
         v->setBySeqno(qi->getBySeqno());
 
         if (seqno) {
             *seqno = v->getBySeqno();
         }
 
+        if (GenerateCas::Yes == generateCas) {
+            v->setCas(qi->getCas());
+        }
+
         if (plh) {
             plh->unlock();
         }
index 43bde3d..6465736 100644 (file)
--- a/src/ep.h
+++ b/src/ep.h
@@ -921,12 +921,14 @@ protected:
      *        Note that the lock is released inside this function
      * @param seqno sequence number of the mutation
      * @param generateBySeqno request that the seqno is generated by this call
+     * @param generateCas request that the CAS is generated by this call
      */
     void queueDirty(RCPtr<VBucket> &vb,
                     StoredValue* v,
                     LockHolder *plh,
                     uint64_t *seqno,
-                    const GenerateBySeqno generateBySeqno = GenerateBySeqno::Yes);
+                    const GenerateBySeqno generateBySeqno = GenerateBySeqno::Yes,
+                    const GenerateCas generateCas = GenerateCas::Yes);
 
     /* Queue an item for persistence following a TAP command
      *
index dd87a87..7c08a2c 100644 (file)
@@ -38,3 +38,22 @@ static inline std::string to_string(const GenerateBySeqno generateBySeqno) {
             return "";
     }
 }
+
+enum class GenerateCas {
+    No, Yes
+};
+
+typedef std::underlying_type<GenerateCas>::type GenerateByCasUType;
+
+static inline std::string to_string(GenerateCas generateCas) {
+    switch (generateCas) {
+        case GenerateCas::Yes:
+            return "Yes";
+        case GenerateCas::No:
+            return "No";
+        default:
+            throw std::invalid_argument("to_string(GenerateCas) unknown " +
+                    std::to_string(static_cast<GenerateByCasUType>(generateCas)));
+            return "";
+    }
+}
index fa7f69b..9a34783 100644 (file)
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <set>
+#include <thread>
 #include <vector>
 
 #include "checkpoint.h"
@@ -183,7 +184,7 @@ static void launch_set_thread(void *arg) {
         key << "key-" << i;
         queued_item qi(new Item(key.str(), args->vbucket->getId(),
                                 queue_op_set, 0, 0));
-        args->checkpoint_manager->queueDirty(args->vbucket, qi, GenerateBySeqno::Yes);
+        args->checkpoint_manager->queueDirty(args->vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes);
     }
 }
 }
@@ -264,7 +265,7 @@ TEST_F(CheckpointTest, basic_chk_test) {
     // Push the flush command into the queue so that all other threads can be terminated.
     std::string key("flush");
     queued_item qi(new Item(key, vbucket->getId(), queue_op_flush, 0xffff, 0));
-    checkpoint_manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes);
+    checkpoint_manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes);
 
     rc = cb_join_thread(persistence_thread);
     EXPECT_EQ(0, rc);
@@ -300,7 +301,7 @@ TEST_F(CheckpointTest, reset_checkpoint_id) {
         key << "key-" << i;
         queued_item qi(new Item(key.str(), vbucket->getId(), queue_op_set,
                                 0, 0));
-        manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes);
+        manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes);
     }
     manager->createNewCheckpoint();
 
@@ -359,7 +360,7 @@ TEST_F(CheckpointTest, OneOpenCkpt) {
 
     // No set_ops in queue, expect queueDirty to return true (increase
     // persistence queue size).
-    EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes));
+    EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     EXPECT_EQ(1, manager->getNumCheckpoints());  // Single open checkpoint.
     EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 1x op_set
     EXPECT_EQ(1001, qi->getBySeqno());
@@ -369,7 +370,7 @@ TEST_F(CheckpointTest, OneOpenCkpt) {
     // Adding the same key again shouldn't increase the size.
     queued_item qi2(new Item("key1", vbucket->getId(), queue_op_set,
                             /*revSeq*/21, /*bySeq*/0));
-    EXPECT_FALSE(manager->queueDirty(vbucket, qi2, GenerateBySeqno::Yes));
+    EXPECT_FALSE(manager->queueDirty(vbucket, qi2, GenerateBySeqno::Yes, GenerateCas::Yes));
     EXPECT_EQ(1, manager->getNumCheckpoints());
     EXPECT_EQ(2, manager->getNumOpenChkItems());
     EXPECT_EQ(1002, qi2->getBySeqno());
@@ -379,7 +380,7 @@ TEST_F(CheckpointTest, OneOpenCkpt) {
     // Adding a different key should increase size.
     queued_item qi3(new Item("key2", vbucket->getId(), queue_op_set,
                             /*revSeq*/0, /*bySeq*/0));
-    EXPECT_TRUE(manager->queueDirty(vbucket, qi3, GenerateBySeqno::Yes));
+    EXPECT_TRUE(manager->queueDirty(vbucket, qi3, GenerateBySeqno::Yes, GenerateCas::Yes));
     EXPECT_EQ(1, manager->getNumCheckpoints());
     EXPECT_EQ(3, manager->getNumOpenChkItems());
     EXPECT_EQ(1003, qi3->getBySeqno());
@@ -394,7 +395,7 @@ TEST_F(CheckpointTest, OneOpenOneClosed) {
     for (auto i : {1,2}) {
         queued_item qi(new Item("key" + std::to_string(i), vbucket->getId(),
                                 queue_op_set, /*revSeq*/0, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes));
+        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     }
     EXPECT_EQ(1, manager->getNumCheckpoints());
     EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
@@ -412,7 +413,7 @@ TEST_F(CheckpointTest, OneOpenOneClosed) {
     for (auto ii : {1,2}) {
         queued_item qi(new Item("key" + std::to_string(ii), vbucket->getId(),
                                 queue_op_set, /*revSeq*/1, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes));
+        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     }
     EXPECT_EQ(2, manager->getNumCheckpoints());
     EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
@@ -448,7 +449,7 @@ TEST_F(CheckpointTest, ItemBasedCheckpointCreation) {
 
         qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
                           queue_op_set, /*revSeq*/0, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes));
+        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
         EXPECT_EQ(1, manager->getNumCheckpoints());
 
     }
@@ -456,7 +457,7 @@ TEST_F(CheckpointTest, ItemBasedCheckpointCreation) {
     // Add one more - should create a new checkpoint.
     qi.reset(new Item("key_epoch", vbucket->getId(), queue_op_set, /*revSeq*/0,
                       /*bySeq*/0));
-    EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes));
+    EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     EXPECT_EQ(2, manager->getNumCheckpoints());
     EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_ckpt_start, 1x op_set
 
@@ -466,7 +467,7 @@ TEST_F(CheckpointTest, ItemBasedCheckpointCreation) {
 
         qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
                                 queue_op_set, /*revSeq*/1, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes));
+        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
         EXPECT_EQ(2, manager->getNumCheckpoints());
     }
 
@@ -474,7 +475,7 @@ TEST_F(CheckpointTest, ItemBasedCheckpointCreation) {
     // new one.
     qi.reset(new Item("key_epoch2", vbucket->getId(), queue_op_set,
                       /*revSeq*/1, /*bySeq*/0));
-    EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes));
+    EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     EXPECT_EQ(2, manager->getNumCheckpoints());
     EXPECT_EQ(12, // 1x op_ckpt_start, 1x key_epoch, 9x key_X, 1x key_epoch2
               manager->getNumOpenChkItems());
@@ -494,7 +495,7 @@ TEST_F(CheckpointTest, ItemBasedCheckpointCreation) {
     // But adding a new item will create a new one.
     qi.reset(new Item("key_epoch3", vbucket->getId(), queue_op_set,
                       /*revSeq*/1, /*bySeq*/0));
-    EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes));
+    EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     EXPECT_EQ(3, manager->getNumCheckpoints());
     EXPECT_EQ(2, manager->getNumOpenChkItems()); // 1x op_ckpt_start, 1x op_set
 }
@@ -507,7 +508,7 @@ TEST_F(CheckpointTest, CursorOffsetOnCheckpointClose) {
     for (auto i : {1,2}) {
         queued_item qi(new Item("key" + std::to_string(i), vbucket->getId(),
                                 queue_op_set, /*revSeq*/0, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes));
+        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     }
     EXPECT_EQ(1, manager->getNumCheckpoints());
     EXPECT_EQ(3, manager->getNumOpenChkItems()); // 1x op_checkpoint_start, 2x op_set
@@ -521,7 +522,7 @@ TEST_F(CheckpointTest, CursorOffsetOnCheckpointClose) {
     // should still see two items.
     queued_item qi(new Item("key1", vbucket->getId(),
                             queue_op_set, /*revSeq*/0, /*bySeq*/0));
-    EXPECT_FALSE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes))
+    EXPECT_FALSE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes))
         << "Adding a duplicate key to open checkpoint should not increase queue size";
 
     EXPECT_EQ(2, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
@@ -551,7 +552,7 @@ TEST_F(CheckpointTest, CursorOffsetOnCheckpointClose) {
     for (auto ii : {1,2}) {
         queued_item qi(new Item("key" + std::to_string(ii), vbucket->getId(),
                                 queue_op_set, /*revSeq*/1, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes));
+        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     }
 
     EXPECT_EQ(3, manager->getNumItemsForCursor(CheckpointManager::pCursorName))
@@ -618,7 +619,7 @@ TEST_F(CheckpointTest, ItemsForCheckpointCursor) {
     for (unsigned int ii = 0; ii < 2 * MIN_CHECKPOINT_ITEMS; ii++) {
         qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
                           queue_op_set, /*revSeq*/0, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes));
+        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     }
 
     /* Check if we have desired number of checkpoints and desired number of
@@ -669,7 +670,7 @@ TEST_F(CheckpointTest, CursorMovement) {
     for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
         qi.reset(new Item("key" + std::to_string(ii), vbucket->getId(),
                           queue_op_set, /*revSeq*/0, /*bySeq*/0));
-        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes));
+        EXPECT_TRUE(manager->queueDirty(vbucket, qi, GenerateBySeqno::Yes, GenerateCas::Yes));
     }
 
     /* Check if we have desired number of checkpoints and desired number of
@@ -740,3 +741,88 @@ TEST_F(CheckpointTest, CursorMovement) {
 
 }
 
+//
+// It's critical that the HLC (CAS) is ordered with seqno generation
+// otherwise XDCR may drop a newer bySeqno mutation because the CAS is not
+// higher.
+//
+TEST_F(CheckpointTest, SeqnoAndHLCOrdering) {
+
+    const int n_threads = 8;
+    const int n_items = 1000;
+
+    // configure so we can store a large number of items
+    checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
+                                         n_threads*n_items,
+                                         /*numCheckpoints*/2,
+                                         /*itemBased*/true,
+                                         /*keepClosed*/false,
+                                         /*enableMerge*/false);
+    createManager();
+
+    /* Sanity check initial state */
+    EXPECT_EQ(1, manager->getNumOfCursors());
+    EXPECT_EQ(1, manager->getNumOpenChkItems());
+    EXPECT_EQ(1, manager->getNumCheckpoints());
+
+    std::vector<std::thread> threads;
+
+    // vector of pairs, first is seqno, second is CAS
+    // just do a scatter gather over n_threads
+    std::vector<std::vector<std::pair<uint64_t, uint64_t> > > threadData(n_threads);
+    for (int ii = 0; ii < n_threads; ii++) {
+        auto& threadsData = threadData[ii];
+        threads.push_back(std::thread([this, ii, n_items, &threadsData](){
+            std::string key = "key" + std::to_string(ii);
+            for (int item  = 0; item < n_items; item++) {
+                queued_item qi(new Item(key + std::to_string(item),
+                                        vbucket->getId(), queue_op_set,
+                                        /*revSeq*/0, /*bySeq*/0));
+                EXPECT_TRUE(manager->queueDirty(vbucket,
+                                                qi,
+                                                GenerateBySeqno::Yes,
+                                                GenerateCas::Yes));
+
+                // Save seqno/cas
+                threadsData.push_back(std::make_pair(qi->getBySeqno(), qi->getCas()));
+            }
+        }));
+    }
+
+    // Wait for all threads
+    for (auto& thread : threads) {
+        thread.join();
+    }
+
+    // Now combine the data and check HLC is increasing with seqno
+    std::map<uint64_t, uint64_t> finalData;
+    for (auto t : threadData) {
+        for (auto pair : t) {
+            EXPECT_EQ(finalData.end(), finalData.find(pair.first));
+            finalData[pair.first] = pair.second;
+        }
+    }
+
+    auto itr = finalData.begin();
+    EXPECT_NE(itr, finalData.end());
+    uint64_t previousCas = (itr++)->second;
+    EXPECT_NE(itr, finalData.end());
+    for (; itr != finalData.end(); itr++) {
+        EXPECT_LT(previousCas, itr->second);
+        previousCas = itr->second;
+    }
+
+    // Now a final check, iterate the checkpoint and also check for increasing
+    // HLC.
+    std::vector<queued_item> items;
+    manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
+
+    /* We should have got (n_threads*n_items + op_ckpt_start) items. */
+    EXPECT_EQ(n_threads*n_items + 1, items.size());
+
+    previousCas = items[1]->getCas();
+    for (size_t ii = 2; ii < items.size(); ii++) {
+        EXPECT_LT(previousCas, items[ii]->getCas());
+        previousCas = items[ii]->getCas();
+    }
+}