MB-24246: update highestDedupedSeqno when an existing value is changed 79/77779/12
authorJames Harrison <00jamesh@gmail.com>
Fri, 5 May 2017 13:22:21 +0000 (14:22 +0100)
committerDave Rigby <daver@couchbase.com>
Wed, 10 May 2017 16:40:43 +0000 (16:40 +0000)
Without this, rangeReads are allowed to stop "too early" - potentially
missing out on items that have been updated, i.e.,

^ = HDDS (HighestDedupedSeqno)

 A₁   B₂   C₃            Initial items
[A₁   B₂   C₃]           rangeRead 1-3
[A₁   B₂   C₃]  B'₄       Update B
 A₁   B₂   C₃   B'₄       RR ends
[A₁   B₂   C₃]  B'₄       new rangeRead, still consistent
 A₁   B₂   C₃   B'₄       RR ends
 A₁        C₃   B'₄       purger removes stale B
!A₁        C₃!  B'₄       RR 1-3 could be requested, but would be
inconsistent. The HDDS serves to extend the end of a rangeRead to the
most recently deduped item, in this case B' replaced an older B.

Currently, this updates the HDDS too soon in the case of a stale item,
forcing the rangeRead to include the new version immediately even though
the stale item is still present and would still allow a valid range.
This is to be improved in a coming patch, in which the TombstonePurger
will update the HDDS when removing the stale item.

Change-Id: If6c57a86bab56ccc007b0fe17c9229218bb0c2c7
Reviewed-on: http://review.couchbase.org/77779
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
src/ephemeral_vb.cc
src/linked_list.cc
src/linked_list.h
src/seqlist.h
tests/ep_testsuite_dcp.cc
tests/mock/mock_ephemeral_vb.h
tests/module_tests/basic_ll_test.cc
tests/module_tests/ephemeral_bucket_test.cc
tests/module_tests/ephemeral_vb_test.cc

index 92fdd6c..2379690 100644 (file)
@@ -299,51 +299,63 @@ EphemeralVBucket::updateStoredValue(const HashTable::HashBucketLock& hbl,
     const bool oldValueDeleted = v.isDeleted();
     const bool recreatingDeletedItem = v.isDeleted() && !itm.isDeleted();
 
-    /* Update the OrderedStoredValue in hash table + Ordered data structure
-       (list) */
-    auto res = seqList->updateListElem(lh, *(v.toOrderedStoredValue()));
-
+    SequenceList::UpdateStatus res;
+    VBNotifyCtx notifyCtx;
     StoredValue* newSv = &v;
     StoredValue::UniquePtr ownedSv;
     MutationStatus status(MutationStatus::WasClean);
 
-    switch (res) {
-    case SequenceList::UpdateStatus::Success:
-        /* OrderedStoredValue moved to end of the list, just update its
-           value */
-        status = ht.unlocked_updateStoredValue(hbl.getHTLock(), v, itm);
-        break;
-
-    case SequenceList::UpdateStatus::Append: {
-        /* OrderedStoredValue cannot be moved to end of the list,
-           due to a range read. Hence, release the storedvalue from the
-           hash table, indicate the list to mark the OrderedStoredValue
-           stale (old duplicate) and add a new StoredValue for the itm.
-
-           Note: It is important to remove item from hash table before
-                 marking stale because once marked stale list assumes the
-                 ownership of the item and may delete it anytime. */
-        /* Release current storedValue from hash table */
-        /* [EPHE TODO]: Write a HT func to release the StoredValue directly
-                        than taking key as a param and deleting
-                        (MB-23184) */
-        ownedSv = ht.unlocked_release(hbl, v.getKey());
-
-        /* Add a new storedvalue for the item */
-        newSv = ht.unlocked_addNewStoredValue(hbl, itm);
-
-        seqList->appendToList(lh, *(newSv->toOrderedStoredValue()));
-    } break;
-    }
+    {
+        // Once we update the seqList, there is a short period where the
+        // highSeqno and highestDedupedSeqno are both incorrect. We have to hold
+        // this lock to prevent a new rangeRead starting, and covering an
+        // inconsistent range.
+        std::lock_guard<std::mutex> highSeqnoLh(seqList->getHighSeqnosLock());
+
+        /* Update the OrderedStoredValue in hash table + Ordered data structure
+           (list) */
+        res = seqList->updateListElem(lh, *(v.toOrderedStoredValue()));
+
+        switch (res) {
+        case SequenceList::UpdateStatus::Success:
+            /* OrderedStoredValue moved to end of the list, just update its
+               value */
+            status = ht.unlocked_updateStoredValue(hbl.getHTLock(), v, itm);
+            break;
+
+        case SequenceList::UpdateStatus::Append: {
+            /* OrderedStoredValue cannot be moved to end of the list,
+               due to a range read. Hence, release the storedvalue from the
+               hash table, indicate the list to mark the OrderedStoredValue
+               stale (old duplicate) and add a new StoredValue for the itm.
+
+               Note: It is important to remove item from hash table before
+                     marking stale because once marked stale list assumes the
+                     ownership of the item and may delete it anytime. */
+            /* Release current storedValue from hash table */
+            /* [EPHE TODO]: Write a HT func to release the StoredValue directly
+                            than taking key as a param and deleting
+                            (MB-23184) */
+            ownedSv = ht.unlocked_release(hbl, v.getKey());
+
+            /* Add a new storedvalue for the item */
+            newSv = ht.unlocked_addNewStoredValue(hbl, itm);
+
+            seqList->appendToList(lh, *(newSv->toOrderedStoredValue()));
+        } break;
+        }
 
-    VBNotifyCtx notifyCtx;
-    if (queueItmCtx) {
-        /* Put on checkpoint mgr */
-        notifyCtx = queueDirty(*newSv, *queueItmCtx);
+        if (queueItmCtx) {
+            /* Put on checkpoint mgr */
+            notifyCtx = queueDirty(*newSv, *queueItmCtx);
+        }
+
+        /* Update the high seqno in the sequential storage */
+        auto& osv = *(newSv->toOrderedStoredValue());
+        seqList->updateHighSeqno(highSeqnoLh, osv);
+        seqList->updateHighestDedupedSeqno(highSeqnoLh, osv);
     }
 
-    /* Update the high seqno in the sequential storage */
-    seqList->updateHighSeqno(*(newSv->toOrderedStoredValue()));
     if (recreatingDeletedItem) {
         ++opsCreate;
     } else {
@@ -391,7 +403,8 @@ std::pair<StoredValue*, VBNotifyCtx> EphemeralVBucket::addNewStoredValue(
     }
 
     /* Update the high seqno in the sequential storage */
-    seqList->updateHighSeqno(*(v->toOrderedStoredValue()));
+    std::lock_guard<std::mutex> highSeqnoLh(seqList->getHighSeqnosLock());
+    seqList->updateHighSeqno(highSeqnoLh, *(v->toOrderedStoredValue()));
     ++opsCreate;
 
     seqList->updateNumDeletedItems(false, itm.isDeleted());
@@ -412,45 +425,59 @@ std::tuple<StoredValue*, VBNotifyCtx> EphemeralVBucket::softDeleteStoredValue(
 
     const bool oldValueDeleted = v.isDeleted();
 
-    /* Update the OrderedStoredValue in hash table + Ordered data structure
-       (list) */
-    auto res = seqList->updateListElem(lh, *(v.toOrderedStoredValue()));
-
-    switch (res) {
-    case SequenceList::UpdateStatus::Success:
-        /* OrderedStoredValue is moved to end of the list, do nothing */
-        break;
-
-    case SequenceList::UpdateStatus::Append: {
-        /* OrderedStoredValue cannot be moved to end of the list,
-           due to a range read. Hence, replace the storedvalue in the
-           hash table with its copy and indicate the list to mark the
-           OrderedStoredValue stale (old duplicate).
-
-           Note: It is important to remove item from hash table before
-                 marking stale because once marked stale list assumes the
-                 ownership of the item and may delete it anytime. */
+    SequenceList::UpdateStatus res;
+    VBNotifyCtx notifyCtx;
+    {
+        // Once we update the seqList, there is a short period where the
+        // highSeqno and highestDedupedSeqno are both incorrect. We have to hold
+        // this lock to prevent a new rangeRead starting, and covering an
+        // inconsistent range.
+        std::lock_guard<std::mutex> highSeqnoLh(seqList->getHighSeqnosLock());
+
+        /* Update the OrderedStoredValue in hash table + Ordered data structure
+           (list) */
+        res = seqList->updateListElem(lh, *(v.toOrderedStoredValue()));
+
+        switch (res) {
+        case SequenceList::UpdateStatus::Success:
+            /* OrderedStoredValue is moved to end of the list, do nothing */
+            break;
+
+        case SequenceList::UpdateStatus::Append: {
+            /* OrderedStoredValue cannot be moved to end of the list,
+               due to a range read. Hence, replace the storedvalue in the
+               hash table with its copy and indicate the list to mark the
+               OrderedStoredValue stale (old duplicate).
+
+               Note: It is important to remove item from hash table before
+                     marking stale because once marked stale list assumes the
+                     ownership of the item and may delete it anytime. */
+
+            /* Release current storedValue from hash table */
+            /* [EPHE TODO]: Write a HT func to replace the StoredValue directly
+                            than taking key as a param and deleting (MB-23184)
+               */
+            std::tie(newSv, ownedSv) = ht.unlocked_replaceByCopy(hbl, v);
+
+            seqList->appendToList(lh, *(newSv->toOrderedStoredValue()));
+        } break;
+        }
 
-        /* Release current storedValue from hash table */
-        /* [EPHE TODO]: Write a HT func to replace the StoredValue directly
-                        than taking key as a param and deleting (MB-23184) */
-        std::tie(newSv, ownedSv) = ht.unlocked_replaceByCopy(hbl, v);
+        /* Delete the storedvalue */
+        ht.unlocked_softDelete(hbl.getHTLock(), *newSv, onlyMarkDeleted);
 
-        seqList->appendToList(lh, *(newSv->toOrderedStoredValue()));
-    } break;
-    }
+        if (queueItmCtx.genBySeqno == GenerateBySeqno::No) {
+            newSv->setBySeqno(bySeqno);
+        }
 
-    /* Delete the storedvalue */
-    ht.unlocked_softDelete(hbl.getHTLock(), *newSv, onlyMarkDeleted);
+        notifyCtx = queueDirty(*newSv, queueItmCtx);
 
-    if (queueItmCtx.genBySeqno == GenerateBySeqno::No) {
-        newSv->setBySeqno(bySeqno);
+        /* Update the high seqno in the sequential storage */
+        auto& osv = *(newSv->toOrderedStoredValue());
+        seqList->updateHighSeqno(highSeqnoLh, osv);
+        seqList->updateHighestDedupedSeqno(highSeqnoLh, osv);
     }
 
-    VBNotifyCtx notifyCtx = queueDirty(*newSv, queueItmCtx);
-
-    /* Update the high seqno in the sequential storage */
-    seqList->updateHighSeqno(*(newSv->toOrderedStoredValue()));
     ++opsDelete;
 
     seqList->updateNumDeletedItems(oldValueDeleted, true);
index 05c2141..aa1dcda 100644 (file)
@@ -166,10 +166,16 @@ BasicLinkedList::rangeRead(seqno_t start, seqno_t end) {
     return {ENGINE_SUCCESS, std::move(items)};
 }
 
-void BasicLinkedList::updateHighSeqno(const OrderedStoredValue& v) {
-    std::lock_guard<SpinLock> lh(rangeLock);
+void BasicLinkedList::updateHighSeqno(
+        std::lock_guard<std::mutex>& highSeqnoLock,
+        const OrderedStoredValue& v) {
     highSeqno = v.getBySeqno();
 }
+void BasicLinkedList::updateHighestDedupedSeqno(
+        std::lock_guard<std::mutex>& highSeqnoLock,
+        const OrderedStoredValue& v) {
+    highestDedupedSeqno = v.getBySeqno();
+}
 
 void BasicLinkedList::markItemStale(StoredValue::UniquePtr ownedSv) {
     /* Release the StoredValue as BasicLinkedList does not want it to be of
@@ -320,12 +326,12 @@ uint64_t BasicLinkedList::getNumItems() const {
 }
 
 uint64_t BasicLinkedList::getHighSeqno() const {
-    std::lock_guard<SpinLock> lh(rangeLock);
+    std::lock_guard<std::mutex> lckGd(highSeqnosLock);
     return highSeqno;
 }
 
 uint64_t BasicLinkedList::getHighestDedupedSeqno() const {
-    std::lock_guard<std::mutex> lckGd(writeLock);
+    std::lock_guard<std::mutex> lckGd(highSeqnosLock);
     return highestDedupedSeqno;
 }
 
@@ -342,6 +348,9 @@ uint64_t BasicLinkedList::getRangeReadEnd() const {
     std::lock_guard<SpinLock> lh(rangeLock);
     return readRange.getEnd();
 }
+std::mutex& BasicLinkedList::getHighSeqnosLock() const {
+    return highSeqnosLock;
+}
 
 void BasicLinkedList::dump() const {
     std::cerr << *this << std::endl;
index 9fbbcee..0c437c9 100644 (file)
@@ -146,7 +146,11 @@ public:
     std::pair<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>> rangeRead(
             seqno_t start, seqno_t end) override;
 
-    void updateHighSeqno(const OrderedStoredValue& v) override;
+    void updateHighSeqno(std::lock_guard<std::mutex>& highSeqnoLock,
+                         const OrderedStoredValue& v) override;
+
+    void updateHighestDedupedSeqno(std::lock_guard<std::mutex>& highSeqnoLock,
+                                   const OrderedStoredValue& v) override;
 
     void markItemStale(StoredValue::UniquePtr ownedSv) override;
 
@@ -174,6 +178,8 @@ public:
 
     uint64_t getRangeReadEnd() const override;
 
+    std::mutex& getHighSeqnosLock() const override;
+
     void dump() const override;
 
 protected:
@@ -201,6 +207,11 @@ protected:
     mutable SpinLock rangeLock;
 
     /**
+     * Lock protecting the highSeqno and highestDedupedSeqno.
+     */
+    mutable std::mutex highSeqnosLock;
+
+    /**
      * Lock that serializes range reads on the 'seqList' - i.e. serializes
      * the addition / removal of range reads from the set in-flight.
      * We need to serialize range reads because, range reads set a list level
index 8cefdbe..6e359bb 100644 (file)
@@ -125,10 +125,25 @@ public:
      * Updates the highSeqno in the list. Since seqno is generated and managed
      * outside the list, the module managing it must update this after the seqno
      * is generated for the item already put in the list.
-     *
+     * @param highSeqnoLock The lock protecting the high seqnos the caller is
+     * expected to hold
      * @param v Ref to orderedStoredValue
      */
-    virtual void updateHighSeqno(const OrderedStoredValue& v) = 0;
+    virtual void updateHighSeqno(std::lock_guard<std::mutex>& highSeqnoLock,
+                                 const OrderedStoredValue& v) = 0;
+
+    /**
+     * Updates the highestDedupedSeqno in the list. Since seqno is generated and
+     * managed outside the list, the module managing it must update this after
+     * the seqno is generated for the item already put in the list.
+     * @param highSeqnoLock The lock protecting the high seqnos the caller is
+     * expected to hold
+     * @param v Ref to orderedStoredValue
+     *
+     */
+    virtual void updateHighestDedupedSeqno(
+            std::lock_guard<std::mutex>& highSeqnoLock,
+            const OrderedStoredValue& v) = 0;
 
     /**
      * Mark an OrderedStoredValue stale and assumes its ownership.
@@ -217,6 +232,12 @@ public:
     virtual uint64_t getRangeReadEnd() const = 0;
 
     /**
+     * Returns the lock which must be held to modify the highSeqno or the
+     * highestDedupedSeqno
+     */
+    virtual std::mutex& getHighSeqnosLock() const = 0;
+
+    /**
      * Debug - prints a representation of the list to stderr.
      */
     virtual void dump() const = 0;
index ef2784a..77c9688 100644 (file)
@@ -1770,7 +1770,14 @@ static enum test_result test_dcp_producer_stream_req_partial(ENGINE_HANDLE *h,
     ctx.snapshot = {105, 105};
     ctx.exp_mutations = 95; // 105 to 200
     ctx.exp_deletions = 100; // 201 to 300
-    ctx.exp_markers = 2;
+
+    if (isPersistentBucket(h, h1)) {
+        ctx.exp_markers = 2;
+    } else {
+        // the ephemeral stream request won't be broken into two snapshots of
+        // backfill from disk vs the checkpoint in memory
+        ctx.exp_markers = 1;
+    }
 
     TestDcpConsumer tdc("unittest", cookie);
     tdc.addStreamCtx(ctx);
index eab4511..bc546b9 100644 (file)
@@ -65,6 +65,10 @@ public:
         mockLL->registerFakeReadRange(start, end);
     }
 
+    void resetReadRange() {
+        mockLL->resetReadRange();
+    }
+
     int public_getNumStaleItems() {
         return mockLL->getNumStaleItems();
     }
index ff93136..52d290d 100644 (file)
@@ -87,7 +87,9 @@ protected:
             sv = ht.find(key, TrackReference::Yes, WantsDeleted::No)
                          ->toOrderedStoredValue();
             basicLL->appendToList(lg, *sv);
-            basicLL->updateHighSeqno(*sv);
+            std::lock_guard<std::mutex> highSeqnoLh(
+                    basicLL->getHighSeqnosLock());
+            basicLL->updateHighSeqno(highSeqnoLh, *sv);
             expectedSeqno.push_back(i);
         }
         return expectedSeqno;
@@ -109,7 +111,8 @@ protected:
         EXPECT_EQ(SequenceList::UpdateStatus::Success,
                   basicLL->updateListElem(lg, *osv));
         osv->setBySeqno(highSeqno + 1);
-        basicLL->updateHighSeqno(*osv);
+        std::lock_guard<std::mutex> highSeqnoLh(basicLL->getHighSeqnosLock());
+        basicLL->updateHighSeqno(highSeqnoLh, *osv);
     }
 
     /**
index 3df0a50..482a267 100644 (file)
@@ -46,7 +46,7 @@ TEST_F(EphemeralBucketStatTest, VBSeqlistStats) {
         << "Expected both current and deleted documents";
     EXPECT_EQ("1", stats.at("vb_0:seqlist_deleted_count"));
     EXPECT_EQ("4", stats.at("vb_0:seqlist_high_seqno"));
-    EXPECT_EQ("0", stats.at("vb_0:seqlist_highest_deduped_seqno"));
+    EXPECT_EQ("4", stats.at("vb_0:seqlist_highest_deduped_seqno"));
     EXPECT_EQ("0", stats.at("vb_0:seqlist_range_read_begin"));
     EXPECT_EQ("0", stats.at("vb_0:seqlist_range_read_end"));
     EXPECT_EQ("0", stats.at("vb_0:seqlist_range_read_count"));
index f224358..1a32c84 100644 (file)
@@ -485,3 +485,48 @@ TEST_F(EphTombstoneTest, DoubleDeleteTimeCorrect) {
     auto secondDelTime = delOSV->getDeletedTime();
     EXPECT_GE(secondDelTime, initialDelTime + timeJump);
 }
+
+TEST_F(EphemeralVBucketTest, UpdateUpdatesHighestDedupedSeqno) {
+    /* Add 3 items and then update all of them */
+    const int numItems = 3;
+
+    auto keys = generateKeys(numItems);
+    setMany(keys, MutationStatus::WasClean);
+
+    ASSERT_EQ(0, mockEpheVB->getLL()->getHighestDedupedSeqno());
+
+    /* Update the items */
+    setMany(keys, MutationStatus::WasDirty);
+
+    EXPECT_EQ(6, mockEpheVB->getLL()->getHighestDedupedSeqno());
+}
+
+TEST_F(EphemeralVBucketTest, AppendUpdatesHighestDedupedSeqno) {
+    /* Add 3 items and then update all of them */
+    const int numItems = 3;
+
+    auto keys = generateKeys(numItems);
+    setMany(keys, MutationStatus::WasClean);
+
+    ASSERT_EQ(0, mockEpheVB->getLL()->getHighestDedupedSeqno());
+
+    /* Set up a mock backfill by setting the range of the backfill */
+    mockEpheVB->registerFakeReadRange(1, numItems);
+
+    /* Update the items */
+    setMany(keys, MutationStatus::WasClean);
+
+    mockEpheVB->resetReadRange();
+
+    // TODO: expect 0 once the purger handles updating the HDDS
+    // Updating the HDDS at append time is only a temporary measure - this will
+    // be changed to having the tombstone purger update the HDDS shortly. Once
+    // that is done, we would expect HDDS == 0 at this point, and HDDS == 6 only
+    // after the purger runs.
+
+    EXPECT_EQ(6, mockEpheVB->getLL()->getHighestDedupedSeqno());
+
+    EXPECT_EQ(3, mockEpheVB->purgeTombstones(0));
+
+    EXPECT_EQ(6, mockEpheVB->getLL()->getHighestDedupedSeqno());
+}
\ No newline at end of file