MB-24244: Prevent duplicate items in an inMemoryBackfill 14/78014/9
authorJames Harrison <00jamesh@gmail.com>
Tue, 9 May 2017 11:07:40 +0000 (12:07 +0100)
committerDave Rigby <daver@couchbase.com>
Mon, 15 May 2017 17:10:59 +0000 (17:10 +0000)
Have rangeRead ignore stale items if their replacement also appears in
the requested range, for example

 A₁  B₂  C₃           Initial items
[A₁  B₂  C₃]          rangeRead 1-3
[A₁  B₂  C₃] B₄       Update B
 A₁  B₂  C₃  B₄       RR ends
[A₁  B₂  C₃  B₄]      new RR 1-4, B₂ should be ignored because B₄ is
also in the snapshot
[A₁  B₂  C₃  B₄] B₅   Another update, but B₄ must still be sent as B₅ is
not in the range
 A₁  B₂  C₃  B₄  B₅   RR ends

To achieve this, when marking items as stale, we store in it a pointer
to its replacement. To avoid bloating the OSV, we reuse the UniquePtr
which points to the items successor when in the HashTable. Once the item
is stale, it will never be used for that purpose again, leaving it free
for reuse.

This is a basic solution. It would be better to not have to take the
writeLock, but improvements of that form will be done in a separate
patch.

Change-Id: Id743da606e009d17f5e5af6f9344223a95aa4a38
Reviewed-on: http://review.couchbase.org/78014
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
src/ephemeral_tombstone_purger.cc
src/ephemeral_vb.cc
src/hash_table.cc
src/hash_table.h
src/linked_list.cc
src/linked_list.h
src/seqlist.h
src/stored-value.h
tests/module_tests/basic_ll_test.cc
tests/module_tests/ephemeral_vb_test.cc
tests/module_tests/vbucket_test.cc

index 034353e..431d39f 100644 (file)
@@ -49,7 +49,8 @@ void EphemeralVBucket::HTTombstonePurger::visit(
     {
         std::lock_guard<std::mutex> listWriteLg(
                 vbucket.seqList->getListWriteLock());
-        vbucket.seqList->markItemStale(listWriteLg, std::move(ownedSV));
+        // Mark the item stale, with no replacement item
+        vbucket.seqList->markItemStale(listWriteLg, std::move(ownedSV), nullptr);
     }
     ++numPurgedItems;
 }
index a69ff4d..a1b8fc3 100644 (file)
@@ -359,13 +359,12 @@ EphemeralVBucket::updateStoredValue(const HashTable::HashBucketLock& hbl,
 
         if (res == SequenceList::UpdateStatus::Append) {
             /* Mark the un-updated storedValue as stale. This must be done after
-             the new storedvalue for the item is visible for range read in the
-             list. This is because we do not want the seqlist to delete the
-             stale
-             item before its latest copy is added to the list.
-             (item becomes visible for range read only after updating the list
-             with the seqno of the item) */
-            seqList->markItemStale(listWriteLg, std::move(ownedSv));
+               the new storedvalue for the item is visible for range read in the
+               list. This is because we do not want the seqlist to delete the
+               stale item before its latest copy is added to the list.
+               (item becomes visible for range read only after updating the list
+                with the seqno of the item) */
+            seqList->markItemStale(listWriteLg, std::move(ownedSv), newSv);
         }
     }
 
@@ -491,13 +490,12 @@ std::tuple<StoredValue*, VBNotifyCtx> EphemeralVBucket::softDeleteStoredValue(
 
         if (res == SequenceList::UpdateStatus::Append) {
             /* Mark the un-updated storedValue as stale. This must be done after
-             the new storedvalue for the item is visible for range read in the
-             list. This is because we do not want the seqlist to delete the
-             stale
-             item before its latest copy is added to the list.
-             (item becomes visible for range read only after updating the list
-             with the seqno of the item) */
-            seqList->markItemStale(listWriteLg, std::move(ownedSv));
+               the new storedvalue for the item is visible for range read in the
+               list. This is because we do not want the seqlist to delete the
+               stale item before its latest copy is added to the list.
+               (item becomes visible for range read only after updating the list
+               with the seqno of the item) */
+            seqList->markItemStale(listWriteLg, std::move(ownedSv), newSv);
         }
     }
 
index d02d8f0..f9f27ed 100644 (file)
@@ -101,7 +101,7 @@ void HashTable::clear_UNLOCKED(bool deactivate) {
             auto v = std::move(values[i]);
             clearedMemSize += v->size();
             clearedValSize += v->valuelen();
-            values[i] = std::move(v->next);
+            values[i] = std::move(v->getNext());
         }
     }
 
@@ -201,11 +201,11 @@ void HashTable::resize(size_t newSize) {
         while (values[i]) {
             // unlink the front element from the hash chain at values[i].
             auto v = std::move(values[i]);
-            values[i] = std::move(v->next);
+            values[i] = std::move(v->getNext());
 
             // And re-link it into the correct place in newValues.
             int newBucket = getBucketForHash(v->getKey().hash());
-            v->next = std::move(newValues[newBucket]);
+            v->setNext(std::move(newValues[newBucket]));
             newValues[newBucket] = std::move(v);
         }
     }
@@ -400,7 +400,7 @@ StoredValue* HashTable::unlocked_find(const DocKey& key,
                                       int bucket_num,
                                       WantsDeleted wantsDeleted,
                                       TrackReference trackReference) {
-    for (StoredValue* v = values[bucket_num].get(); v; v = v->next.get()) {
+    for (StoredValue* v = values[bucket_num].get(); v; v = v->getNext().get()) {
         if (v->hasKey(key)) {
             if (trackReference == TrackReference::Yes && !v->isDeleted()) {
                 v->referenced();
@@ -497,7 +497,7 @@ void HashTable::visit(HashTableVisitor &visitor) {
                 }
             }
             while (v) {
-                StoredValue* tmp = v->next.get();
+                StoredValue* tmp = v->getNext().get();
                 visitor.visit(lh, v);
                 v = tmp;
             }
@@ -535,7 +535,7 @@ void HashTable::visitDepth(HashTableDepthVisitor &visitor) {
             while (p) {
                 depth++;
                 mem += p->size();
-                p = p->next.get();
+                p = p->getNext().get();
             }
             visitor.visit(i, depth, mem);
             ++visited;
@@ -591,7 +591,7 @@ HashTable::pauseResumeVisit(PauseResumeHashTableVisitor& visitor,
 
             StoredValue* v = values[hash_bucket].get();
             while (!paused && v) {
-                StoredValue* tmp = v->next.get();
+                StoredValue* tmp = v->getNext().get();
                 paused = !visitor.visit(*v);
                 v = tmp;
             }
@@ -668,7 +668,7 @@ bool HashTable::unlocked_ejectItem(StoredValue*& vptr,
 
 std::unique_ptr<Item> HashTable::getRandomKeyFromSlot(int slot) {
     auto lh = getLockedBucket(slot);
-    for (StoredValue* v = values[slot].get(); v; v = v->next.get()) {
+    for (StoredValue* v = values[slot].get(); v; v = v->getNext().get()) {
         if (!v->isTempItem() && !v->isDeleted() && v->isResident()) {
             return v->toItem(false, 0);
         }
@@ -734,7 +734,7 @@ std::ostream& operator<<(std::ostream& os, const HashTable& ht) {
     for (const auto& chain : ht.values) {
         if (chain) {
             for (StoredValue* sv = chain.get(); sv != nullptr;
-                 sv = sv->next.get()) {
+                 sv = sv->getNext().get()) {
                 os << "    " << *sv << std::endl;
             }
         }
index ce17847..f9eea91 100644 (file)
@@ -641,17 +641,17 @@ private:
         if (p(chain.get())) {
             // Head element:
             auto removed = std::move(chain);
-            chain = std::move(removed->next);
+            chain = std::move(removed->getNext());
             return removed;
         }
 
         // Not head element, start searching.
-        for (StoredValue::UniquePtr* curr = &chain; curr->get()->next;
-             curr = &curr->get()->next) {
-            if (p(curr->get()->next.get())) {
+        for (StoredValue::UniquePtr* curr = &chain; curr->get()->getNext();
+             curr = &curr->get()->getNext()) {
+            if (p(curr->get()->getNext().get())) {
                 // next element matches predicate - splice it out of the list.
-                auto removed = std::move(curr->get()->next);
-                curr->get()->next = std::move(removed->next);
+                auto removed = std::move(curr->get()->getNext());
+                curr->get()->setNext(std::move(removed->getNext()));
                 return removed;
             }
         }
index f021915..56bfb7b 100644 (file)
@@ -140,6 +140,20 @@ BasicLinkedList::rangeRead(seqno_t start, seqno_t end) {
             continue;
         }
 
+        /* Check if this OSV has been made stale and has been superseded by a
+         * newer version. If it has, and the replacement is /also/ in the range
+         * we are reading, we should skip this item to avoid duplicates */
+        StoredValue* replacement;
+        {
+            std::lock_guard<std::mutex> writeGuard(getListWriteLock());
+            replacement = osv.getReplacementIfStale(writeGuard);
+        }
+
+        if (replacement &&
+            replacement->toOrderedStoredValue()->getBySeqno() <= end) {
+            continue;
+        }
+
         try {
             items.push_back(UniqueItemPtr(osv.toItem(false, vbid)));
         } catch (const std::bad_alloc&) {
@@ -177,7 +191,8 @@ void BasicLinkedList::updateHighestDedupedSeqno(
 }
 
 void BasicLinkedList::markItemStale(std::lock_guard<std::mutex>& listWriteLg,
-                                    StoredValue::UniquePtr ownedSv) {
+                                    StoredValue::UniquePtr ownedSv,
+                                    StoredValue* newSv) {
     /* Release the StoredValue as BasicLinkedList does not want it to be of
        owned type */
     StoredValue* v = ownedSv.release();
@@ -188,7 +203,7 @@ void BasicLinkedList::markItemStale(std::lock_guard<std::mutex>& listWriteLg,
     st.currentSize.fetch_add(v->metaDataSize());
 
     ++numStaleItems;
-    v->toOrderedStoredValue()->markStale(listWriteLg);
+    v->toOrderedStoredValue()->markStale(listWriteLg, newSv);
 }
 
 size_t BasicLinkedList::purgeTombstones() {
index 8662a6c..c68973c 100644 (file)
@@ -170,7 +170,8 @@ public:
                                    const OrderedStoredValue& v) override;
 
     void markItemStale(std::lock_guard<std::mutex>& listWriteLg,
-                       StoredValue::UniquePtr ownedSv) override;
+                       StoredValue::UniquePtr ownedSv,
+                       StoredValue* newSv) override;
 
     size_t purgeTombstones() override;
 
index bf5cc1e..2efa347 100644 (file)
@@ -263,7 +263,9 @@ public:
             const OrderedStoredValue& v) = 0;
 
     /**
-     * Mark an OrderedStoredValue stale and assumes its ownership.
+     * Mark an OrderedStoredValue stale and assumes its ownership. Stores ptr to
+     * the newer version in the OSV, or nullptr if there is no newer version
+     * (i.e., expired Tombstone)
      * Note: It is upto the sequential data structure implementation how it
      *       wants to own the OrderedStoredValue (as owned type vs non-owned
      *       type)
@@ -271,9 +273,11 @@ public:
      * @param listWriteLg Write lock of the sequenceList from getListWriteLock()
      * @param ownedSv StoredValue whose ownership is passed to the sequential
      *                data structure.
+     * @param replacement StoredValue which supersedes ownedSv, or nullptr.
      */
     virtual void markItemStale(std::lock_guard<std::mutex>& listWriteLg,
-                               StoredValue::UniquePtr ownedSv) = 0;
+                               StoredValue::UniquePtr ownedSv,
+                               StoredValue* replacement) = 0;
 
     /**
      * Remove from sequence list and delete all OSVs which are purgable.
index 86ee89c..5921345 100644 (file)
@@ -497,6 +497,24 @@ public:
      */
     std::unique_ptr<Item> toItemWithNoValue(uint16_t vbucket) const;
 
+    void setNext(UniquePtr&& nextSv) {
+        if (stale) {
+            throw std::logic_error(
+                    "StoredValue::setNext: StoredValue is stale,"
+                    "cannot set chain next value");
+        }
+        chain_next_or_replacement = std::move(nextSv);
+    }
+
+    UniquePtr& getNext() {
+        if (stale) {
+            throw std::logic_error(
+                    "StoredValue::getNext: StoredValue is stale,"
+                    "cannot get chain next value");
+        }
+        return chain_next_or_replacement;
+    }
+
     /**
      * Set the memory threshold on the current bucket quota for accepting a new mutation
      */
@@ -590,7 +608,7 @@ protected:
                 HashTable& ht,
                 bool isOrdered)
         : value(itm.getValue()),
-          next(std::move(n)),
+          chain_next_or_replacement(std::move(n)),
           cas(itm.getCas()),
           revSeqno(itm.getRevSeqno()),
           bySeqno(itm.getBySeqno()),
@@ -646,7 +664,7 @@ protected:
                 EPStats& stats,
                 HashTable& ht)
         : value(other.value),
-          next(std::move(n)),
+          chain_next_or_replacement(std::move(n)),
           cas(other.cas),
           revSeqno(other.revSeqno),
           bySeqno(other.bySeqno),
@@ -696,9 +714,16 @@ protected:
     friend std::ostream& operator<<(std::ostream& os, const HashTable& ht);
 
     value_t            value;          // 8 bytes
-    // Used to implement HashTable chaining (for elements hashing to the same
+
+    // Serves two purposes -
+    // 1. Used to implement HashTable chaining (for elements hashing to the same
     // bucket).
-    UniquePtr next; // 8 bytes
+    // 2. Once the stored value has been marked stale, this is used to point at
+    // the replacement stored value. In this case, *we do not have ownership*,
+    // so we release the ptr in the destructor. The replacement is needed to
+    // determine if it would also appear in a given rangeRead - we should return
+    // only the newer version if so.
+    UniquePtr chain_next_or_replacement; // 8 bytes
     uint64_t           cas;            //!< CAS identifier.
     uint64_t           revSeqno;       //!< Revision id sequence number
     int64_t            bySeqno;        //!< By sequence id number
@@ -749,6 +774,15 @@ public:
     // Guarded by the SequenceList's writeLock.
     boost::intrusive::list_member_hook<> seqno_hook;
 
+    ~OrderedStoredValue() {
+        if (stale) {
+            // This points to the replacement OSV which we do not actually own.
+            // We are reusing a unique_ptr so we explicitly release it in this
+            // case. We /do/ own the chain_next if we are not stale.
+            chain_next_or_replacement.release();
+        }
+    }
+
     /**
      * True if a newer version of the same key exists in the HashTable.
      * Note: Only true for OrderedStoredValues which are no longer in the
@@ -764,10 +798,26 @@ public:
      * Marks that newer instance of this item is added in the HashTable
      * @param writeLock The SeqList writeLock which guards the stale param.
      */
-    void markStale(std::lock_guard<std::mutex>& writeGuard) {
+    void markStale(std::lock_guard<std::mutex>& writeGuard,
+                   StoredValue* newSv) {
+        // next is a UniquePtr which is up to this point was used for chaining
+        // in the HashTable. Now this item is stale, we are reusing this to
+        // point to the updated version of this StoredValue. _BUT_ we do not
+        // own the new SV. At destruction, we must release this ptr if
+        // we are stale.
+        chain_next_or_replacement.reset(newSv);
         stale = true;
     }
 
+    StoredValue* getReplacementIfStale(
+            std::lock_guard<std::mutex>& writeGuard) const {
+        if (!stale) {
+            return nullptr;
+        }
+
+        return chain_next_or_replacement.get();
+    }
+
     /**
      * Check if the contents of the StoredValue is same as that of the other
      * one. Does not consider the intrusive hash bucket link.
index 75ee162..6f2eef8 100644 (file)
@@ -141,7 +141,6 @@ protected:
         StoredDocKey sKey = makeStoredDocKey(key);
         auto hbl = ht.getLockedBucket(sKey);
         auto ownedSv = ht.unlocked_release(hbl, osv->getKey());
-        basicLL->markItemStale(listWriteLg, std::move(ownedSv));
 
         /* Add a new storedvalue for the append */
         Item itm(sKey,
@@ -153,7 +152,8 @@ protected:
                  /*ext_len*/ 0,
                  /*theCas*/ 0,
                  /*bySeqno*/ highSeqno + 1);
-        auto newSv = ht.unlocked_addNewStoredValue(hbl, itm);
+        auto* newSv = ht.unlocked_addNewStoredValue(hbl, itm);
+        basicLL->markItemStale(listWriteLg, std::move(ownedSv), newSv);
 
         basicLL->appendToList(
                 lg, listWriteLg, *(newSv->toOrderedStoredValue()));
@@ -413,10 +413,18 @@ TEST_F(BasicLinkedListTest, MarkStale) {
     size_t svSize = ownedSv->size();
     size_t svMetaDataSize = ownedSv->metaDataSize();
 
+    // obtain a replacement SV
+    addNewItemsToList(numItems + 1, keyPrefix, 1);
+    OrderedStoredValue* replacement =
+            ht.find(makeStoredDocKey(keyPrefix + std::to_string(numItems + 1)),
+                    TrackReference::No,
+                    WantsDeleted::Yes)
+                    ->toOrderedStoredValue();
+
     /* Mark the item stale */
     {
         std::lock_guard<std::mutex> writeGuard(basicLL->getListWriteLock());
-        basicLL->markItemStale(writeGuard, std::move(ownedSv));
+        basicLL->markItemStale(writeGuard, std::move(ownedSv), replacement);
     }
 
     /* Check if the StoredValue is marked stale */
@@ -428,8 +436,8 @@ TEST_F(BasicLinkedListTest, MarkStale) {
     /* Check if the stale count incremented to 1 */
     EXPECT_EQ(1, basicLL->getNumStaleItems());
 
-    /* Check if the total item count in the linked list is 1 */
-    EXPECT_EQ(1, basicLL->getNumItems());
+    /* Check if the total item count in the linked list is 2 */
+    EXPECT_EQ(2, basicLL->getNumItems());
 
     /* Check memory usage of the list as it owns the stale item */
     EXPECT_EQ(svSize, basicLL->getStaleValueBytes());
index da60b55..30cc445 100644 (file)
@@ -510,23 +510,114 @@ TEST_F(EphemeralVBucketTest, AppendUpdatesHighestDedupedSeqno) {
 
     ASSERT_EQ(0, mockEpheVB->getLL()->getHighestDedupedSeqno());
 
-    /* Set up a mock backfill by setting the range of the backfill */
-    mockEpheVB->registerFakeReadRange(1, numItems);
+    {
+        auto itr = mockEpheVB->getLL()->makeRangeIterator();
 
-    /* Update the items */
+        /* Update the items */
+        setMany(keys, MutationStatus::WasClean);
+    }
+
+    ASSERT_EQ(6, mockEpheVB->getLL()->getHighestDedupedSeqno());
+}
+
+TEST_F(EphemeralVBucketTest, SnapshotHasNoDuplicates) {
+    /* Add 2 items and then update all of them */
+    const int numItems = 2;
+
+    auto keys = generateKeys(numItems);
     setMany(keys, MutationStatus::WasClean);
 
-    mockEpheVB->resetReadRange();
+    {
+        auto itr = mockEpheVB->getLL()->makeRangeIterator();
+
+        /* Update the items  */
+        setMany(keys, MutationStatus::WasClean);
+    }
+
+    // backfill to infinity would include both the stale and updated versions,
+    // ensure we receive the right number of items
+    auto res = mockEpheVB->inMemoryBackfill(
+            1, std::numeric_limits<seqno_t>::max());
+    EXPECT_EQ(ENGINE_SUCCESS, std::get<0>(res));
+    EXPECT_EQ(numItems, std::get<1>(res).size());
+    EXPECT_EQ(numItems * 2, std::get<2>(res));
+}
 
-    // TODO: expect 0 once the purger handles updating the HDDS
-    // Updating the HDDS at append time is only a temporary measure - this will
-    // be changed to having the tombstone purger update the HDDS shortly. Once
-    // that is done, we would expect HDDS == 0 at this point, and HDDS == 6 only
-    // after the purger runs.
+TEST_F(EphemeralVBucketTest, SnapshotIncludesNonDuplicateStaleItems) {
+    /* Add 2 items and then update all of them */
+    const int numItems = 2;
 
-    EXPECT_EQ(6, mockEpheVB->getLL()->getHighestDedupedSeqno());
+    auto keys = generateKeys(numItems);
+    setMany(keys, MutationStatus::WasClean);
 
-    EXPECT_EQ(3, mockEpheVB->purgeTombstones(0));
+    {
+        auto itr = mockEpheVB->getLL()->makeRangeIterator();
 
-    EXPECT_EQ(6, mockEpheVB->getLL()->getHighestDedupedSeqno());
+        /* Update the items  */
+        setMany(keys, MutationStatus::WasClean);
+    }
+
+    // backfill to numItems would /not/ include both the stale and updated
+    // versions, ensure we receive only one copy
+    auto res = mockEpheVB->inMemoryBackfill(1, numItems);
+    EXPECT_EQ(ENGINE_SUCCESS, std::get<0>(res));
+    EXPECT_EQ(numItems, std::get<1>(res).size());
+    EXPECT_EQ(numItems * 2, std::get<2>(res));
+}
+
+TEST_F(EphemeralVBucketTest, SnapshotHasNoDuplicatesWithInterveningItems) {
+    // Add 3 items, begin a rangeRead, add 1 more item, then update the first 2
+    const int numItems = 2;
+
+    auto keysToUpdate = generateKeys(numItems);
+    auto firstFillerKey = makeStoredDocKey(std::to_string(numItems + 1));
+    auto secondFillerKey = makeStoredDocKey(std::to_string(numItems + 2));
+
+    setMany(keysToUpdate, MutationStatus::WasClean);
+    EXPECT_EQ(MutationStatus::WasClean, setOne(firstFillerKey));
+
+    {
+        auto itr = mockEpheVB->getLL()->makeRangeIterator();
+
+        EXPECT_EQ(MutationStatus::WasClean, setOne(secondFillerKey));
+
+        /* Update the items  */
+        setMany(keysToUpdate, MutationStatus::WasClean);
+    }
+
+    // backfill to infinity would include both the stale and updated versions,
+    // ensure we receive the right number of items
+    auto res = mockEpheVB->inMemoryBackfill(
+            1, std::numeric_limits<seqno_t>::max());
+    EXPECT_EQ(ENGINE_SUCCESS, std::get<0>(res));
+    EXPECT_EQ(numItems * 2, std::get<1>(res).size()); // how many items returned
+    EXPECT_EQ(numItems * 3, std::get<2>(res)); // extended end of readRange
+}
+
+TEST_F(EphemeralVBucketTest, SnapshotHasNoDuplicatesWithMultipleStale) {
+    /* repeatedly update two items, ensure the backfill ignores all stale
+     * versions */
+    const int numItems = 2;
+    const int updateIterations = 10;
+
+    auto keys = generateKeys(numItems);
+
+    setMany(keys, MutationStatus::WasClean);
+    for (int i = 0; i < updateIterations; ++i) {
+        /* Set up a mock backfill, cover all items */
+        {
+            auto itr = mockEpheVB->getLL()->makeRangeIterator();
+            /* Update the items  */
+            setMany(keys, MutationStatus::WasClean);
+        }
+    }
+
+    // backfill to infinity would include both the stale and updated versions,
+    // ensure we receive the right number of items
+    auto res = mockEpheVB->inMemoryBackfill(
+            1, std::numeric_limits<seqno_t>::max());
+    EXPECT_EQ(ENGINE_SUCCESS, std::get<0>(res));
+    EXPECT_EQ(numItems, std::get<1>(res).size()); // how many items returned
+    EXPECT_EQ(numItems * (updateIterations + 1),
+              std::get<2>(res)); // extended end of readRange
 }
index 704c5b4..ea52bef 100644 (file)
@@ -51,7 +51,7 @@ void VBucketTest::TearDown() {
 std::vector<StoredDocKey> VBucketTest::generateKeys(int num, int start) {
     std::vector<StoredDocKey> rv;
 
-    for (int i = start; i < num; i++) {
+    for (int i = start; i < num + start; i++) {
         rv.push_back(makeStoredDocKey(std::to_string(i)));
     }