MB-23795: Ephemeral Tombstone purging (VBucket-level) 08/76408/25
authorDave Rigby <daver@couchbase.com>
Thu, 6 Apr 2017 16:14:42 +0000 (17:14 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 13 Apr 2017 16:27:50 +0000 (16:27 +0000)
Add Ephemeral Tombstone purging for Ephemeral VBuckets. This patch
adds two purge operations - one for the HashTable, and one for the
sequenceList:

a) The HashTable visitor (EphemeralVBucket::HTTombstonePurger) visits
   all items in the HashTable, and identifies any deletes which are
   old enough to be purged (age > ephemeral_metadata_purge_age). Such
   items are marked as stale, and transferred from the HashTable to
   the SequenceList.

b) The SequenceList purger (BasicLinkedList::purgeTombstones()) then
   iterates over the sequencelist, and hard-deletes any items marked
   stale - both Alive and Deleted items. It is at this stage that the
   OSVs are actually deleted.

   Note the SequenceList purging is somewhat delicate, to ensure
   correctness while not blocking front-end writes. See the inline
   comments in BasicLinkedList::purgeTombstones() for further details.

Note this isn't yet driven by any tasks - only by the unit
tests. Subsequent patches will connect into tasks.

Change-Id: I937ed23317826c84cbdd0bb0b3749a99ff446497
Reviewed-on: http://review.couchbase.org/76408
Reviewed-by: Jim Walker <jim@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
13 files changed:
CMakeLists.txt
src/ephemeral_tombstone_purger.cc [new file with mode: 0644]
src/ephemeral_tombstone_purger.h [new file with mode: 0644]
src/ephemeral_vb.cc
src/ephemeral_vb.h
src/linked_list.cc
src/linked_list.h
src/seqlist.h
tests/mock/mock_basic_ll.h
tests/mock/mock_ephemeral_vb.h
tests/module_tests/ephemeral_vb_test.cc
tests/module_tests/vbucket_test.cc
tests/module_tests/vbucket_test.h

index c9243a3..19f6b3a 100644 (file)
@@ -178,6 +178,7 @@ ADD_LIBRARY(ep_objs OBJECT
             src/ep_time.cc
             src/ep_types.cc
             src/ephemeral_bucket.cc
+            src/ephemeral_tombstone_purger.cc
             src/ephemeral_vb.cc
             src/ephemeral_vb_count_visitor.cc
             src/executorpool.cc
diff --git a/src/ephemeral_tombstone_purger.cc b/src/ephemeral_tombstone_purger.cc
new file mode 100644 (file)
index 0000000..7d9b75b
--- /dev/null
@@ -0,0 +1,51 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2017 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#include "ephemeral_tombstone_purger.h"
+
+#include "atomic.h"
+#include "ep_engine.h"
+#include "ephemeral_vb.h"
+#include "seqlist.h"
+
+EphemeralVBucket::HTTombstonePurger::HTTombstonePurger(
+        EphemeralVBucket& vbucket, rel_time_t purgeAge)
+    : vbucket(vbucket),
+      now(ep_current_time()),
+      purgeAge(purgeAge),
+      numPurgedItems(0) {
+}
+
+void EphemeralVBucket::HTTombstonePurger::visit(
+        const HashTable::HashBucketLock& hbl, StoredValue* v) {
+    auto* osv = v->toOrderedStoredValue();
+
+    if (!osv->isDeleted()) {
+        return;
+    }
+
+    // Skip if deleted item is too young.
+    if (now - osv->getDeletedTime() < purgeAge) {
+        return;
+    }
+
+    // This item should be purged. Remove from the HashTable and move over to
+    // being owned by the sequence list.
+    auto ownedSV = vbucket.ht.unlocked_release(hbl, v->getKey());
+    vbucket.seqList->markItemStale(std::move(ownedSV));
+    ++numPurgedItems;
+}
diff --git a/src/ephemeral_tombstone_purger.h b/src/ephemeral_tombstone_purger.h
new file mode 100644 (file)
index 0000000..cee0aff
--- /dev/null
@@ -0,0 +1,60 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2017 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#pragma once
+
+#include "config.h"
+
+#include "ephemeral_vb.h"
+#include "kv_bucket_iface.h"
+
+/**
+ * HashTable Tombstone Purger visitor
+ *
+ * Visitor which is responsible for removing deleted items from the HashTable
+ * which are past their permitted lifetime.
+ *
+ * Ownership of such items is transferred to the SequenceList as 'stale' items;
+ * cleanup of the SequenceList is handled seperately (see
+ * SequenceList::purgeTombstones).
+*/
+class EphemeralVBucket::HTTombstonePurger : public HashTableVisitor {
+public:
+    HTTombstonePurger(EphemeralVBucket& vbucket, rel_time_t purgeAge);
+
+    void visit(const HashTable::HashBucketLock& lh, StoredValue* v) override;
+
+    /// Return the number of items purged from the HashTable.
+    size_t getNumPurged() const {
+        return numPurgedItems;
+    }
+
+protected:
+    /// VBucket being visited.
+    EphemeralVBucket& vbucket;
+
+    /// Time point the purge is running at. Set to ep_current_time in object
+    /// creation.
+    const rel_time_t now;
+
+    /// Items older than this age are purged. "Age" is defined as:
+    ///    now - delete_time.
+    const rel_time_t purgeAge;
+
+    /// Count of how many items have been purged.
+    size_t numPurgedItems;
+};
index a170221..e2f570f 100644 (file)
@@ -16,7 +16,9 @@
  */
 
 #include "ephemeral_vb.h"
+
 #include "dcp/backfill_memory.h"
+#include "ephemeral_tombstone_purger.h"
 #include "failover-table.h"
 #include "linked_list.h"
 #include "stored_value_factories.h"
@@ -54,8 +56,7 @@ EphemeralVBucket::EphemeralVBucket(id_type i,
               purgeSeqno,
               maxCas,
               collectionsManifest),
-      seqList(std::make_unique<BasicLinkedList>(i, st)),
-      autoDeleteCount(0) {
+      seqList(std::make_unique<BasicLinkedList>(i, st)) {
 }
 
 size_t EphemeralVBucket::getNumItems() const {
@@ -267,6 +268,24 @@ void EphemeralVBucket::queueBackfillItem(
     stats.memOverhead->fetch_add(sizeof(queued_item));
 }
 
+size_t EphemeralVBucket::purgeTombstones(rel_time_t purgeAge) {
+    // First mark all deleted items in the HashTable which can be purged as
+    // Stale - this removes them from the HashTable, transferring ownership to
+    // SequenceList.
+    HTTombstonePurger purger(*this, purgeAge);
+    ht.visit(purger);
+
+    // Secondly iterate over the sequence list and delete any stale items
+    auto seqListPurged = seqList->purgeTombstones();
+
+    // Update stats and return.
+    htDeletedPurgeCount += purger.getNumPurged();
+    seqListPurgeCount += seqListPurged;
+    setPurgeSeqno(seqList->getHighestPurgedDeletedSeqno());
+
+    return seqListPurged;
+}
+
 std::tuple<StoredValue*, MutationStatus, VBNotifyCtx>
 EphemeralVBucket::updateStoredValue(const HashTable::HashBucketLock& hbl,
                                     StoredValue& v,
index ca9801b..b0c65d3 100644 (file)
@@ -26,6 +26,7 @@ class SequenceList;
 class EphemeralVBucket : public VBucket {
 public:
     class CountVisitor;
+    class HTTombstonePurger;
 
     EphemeralVBucket(id_type i,
                      vbucket_state_t newState,
@@ -131,6 +132,13 @@ public:
     void queueBackfillItem(queued_item& qi,
                            const GenerateBySeqno generateBySeqno) override;
 
+    /** Purge the Tombstones in this VBucket which are older than the specified
+     *  duration.
+     * @param purgeAge Items older than this should be purged.
+     * @return Number of items purged.
+     */
+    size_t purgeTombstones(rel_time_t purgeAge);
+
 protected:
     /* Data structure for in-memory sequential storage */
     std::unique_ptr<SequenceList> seqList;
@@ -194,4 +202,15 @@ private:
      * Count of how many items have been deleted via the 'auto_delete' policy
      */
     EPStats::Counter autoDeleteCount;
+
+    /**
+     * Count of how many deleted items have been purged from the HashTable
+     * (marked as stale and transferred from HT to sequence list).
+     */
+    EPStats::Counter htDeletedPurgeCount;
+
+    /** Count of how many items have been purged from the sequence list
+     *  (removed from seqList and deleted).
+     */
+    EPStats::Counter seqListPurgeCount;
 };
index 50d5ed8..da0acfe 100644 (file)
@@ -25,6 +25,7 @@ BasicLinkedList::BasicLinkedList(uint16_t vbucketId, EPStats& st)
       staleMetaDataSize(0),
       highSeqno(0),
       highestDedupedSeqno(0),
+      highestPurgedDeletedSeqno(0),
       numStaleItems(0),
       numDeletedItems(0),
       vbid(vbucketId),
@@ -190,8 +191,108 @@ void BasicLinkedList::markItemStale(StoredValue::UniquePtr ownedSv) {
     }
 }
 
+size_t BasicLinkedList::purgeTombstones() {
+    // Purge items marked as stale from the seqList.
+    //
+    // Strategy - we try to ensure that this function does not block
+    // frontend-writes (adding new OrderedStoredValues (OSVs) to the seqList).
+    // To achieve this (safely),
+    // we (try to) acquire the rangeReadLock and setup a 'read' range for the
+    // whole of the seqList. This prevents any other readers from iterating
+    // the list (and accessing stale items) while we purge on it; but permits
+    // front-end operations to continue as they:
+    //   a) Only read/modify non-stale items (we only change stale items) and
+    //   b) Do not change the list membership of anything within the read-range.
+    // However, we do need to be careful about what members of OSVs we access
+    // here - the only OSVs we can safely access are ones marked stale as they
+    // are no longer in the HashTable (and hence subject to HashTable locks).
+    // To check if an item is stale we need to acquire the writeLock
+    // (OSV::stale is guarded by it) for each list item. While this isn't
+    // ideal (that's the same lock needed by front-end operations), we can
+    // release the lock between each element so front-end operations can
+    // have the opportunity to acquire it.
+    //
+    // Attempt to acquire the readRangeLock, to block anyone else concurrently
+    // reading from the list while we remove elements from it.
+    std::unique_lock<std::mutex> rrGuard(rangeReadLock, std::try_to_lock);
+    if (!rrGuard) {
+        // If we cannot acquire the lock then another thread is
+        // running a range read. Given these are typically long-running,
+        // return without blocking.
+        return 0;
+    }
+
+    // Determine the start and end iterators.
+    OrderedLL::iterator startIt;
+    OrderedLL::iterator endIt;
+    {
+        std::lock_guard<std::mutex> writeGuard(writeLock);
+        if (seqList.empty()) {
+            // Nothing in sequence list - nothing to purge.
+            return 0;
+        }
+        // Determine the {start} and {end} iterator (inclusive). Note
+        // that (at most) one item is added to the seqList before its
+        // sequence number is set (as the seqno comes from Ckpt
+        // manager); if that is the case (such an item is guaranteed
+        // to not be stale), then move end to the previous item
+        // (i.e. we don't consider this "in-flight" item), as long as
+        // there is at least two elements.
+        startIt = seqList.begin();
+        endIt = std::prev(seqList.end());
+        // Need rangeLock for highSeqno & readRange
+        std::lock_guard<SpinLock> rangeGuard(rangeLock);
+        if ((startIt != endIt) && (!endIt->isStale(writeGuard))) {
+            endIt = std::prev(endIt);
+        }
+        readRange = SeqRange(startIt->getBySeqno(), endIt->getBySeqno());
+    }
+
+    // Iterate across all but the last item in the seqList, looking
+    // for stale items.
+    // Note the for() loop terminates one element before endIt - we
+    // actually want an inclusive iteration but as we are comparing
+    // essentially random addresses (and we don't want to 'touch' the
+    // element after endIt), we loop to one before endIt, then handle
+    // endIt explicilty at the end.
+    // Note(2): Iterator is manually incremented outside the for() loop as it
+    // is invalidated when we erase items.
+    size_t purgedCount = 0;
+    bool stale;
+    for (auto it = startIt; it != endIt;) {
+        {
+            std::lock_guard<std::mutex> writeGuard(writeLock);
+            stale = it->isStale(writeGuard);
+        }
+        // Only stale items are purged.
+        if (!stale) {
+            ++it;
+            continue;
+        }
+
+        // Checks pass, remove from list and delete.
+        it = purgeListElem(it);
+        ++purgedCount;
+    }
+    // Handle the last element.
+    {
+        std::lock_guard<std::mutex> writeGuard(writeLock);
+        stale = endIt->isStale(writeGuard);
+    }
+    if (stale) {
+        purgeListElem(endIt);
+        ++purgedCount;
+    }
+
+    // Complete; reset the readRange.
+    {
+        std::lock_guard<SpinLock> lh(rangeLock);
+        readRange.reset();
+    }
+    return purgedCount;
+}
+
 uint64_t BasicLinkedList::getNumStaleItems() const {
-    std::lock_guard<std::mutex> lckGd(writeLock);
     return numStaleItems;
 }
 
@@ -223,6 +324,10 @@ uint64_t BasicLinkedList::getHighestDedupedSeqno() const {
     return highestDedupedSeqno;
 }
 
+seqno_t BasicLinkedList::getHighestPurgedDeletedSeqno() const {
+    return highestPurgedDeletedSeqno;
+}
+
 uint64_t BasicLinkedList::getRangeReadBegin() const {
     std::lock_guard<SpinLock> lh(rangeLock);
     return readRange.getBegin();
@@ -238,13 +343,39 @@ void BasicLinkedList::dump() const {
 }
 
 std::ostream& operator <<(std::ostream& os, const BasicLinkedList& ll) {
-    os << "BasicLinkedList[" << &ll << "] with numItems:"
-       << ll.getNumItems() << " deletedItems:" << ll.getNumDeletedItems()
-       << " staleItems:" << ll.getNumStaleItems() << " elements:["
-       << std::endl;
+    os << "BasicLinkedList[" << &ll << "] with numItems:" << ll.seqList.size()
+       << " deletedItems:" << ll.numDeletedItems
+       << " staleItems:" << ll.getNumStaleItems()
+       << " highPurgeSeqno:" << ll.getHighestPurgedDeletedSeqno()
+       << " elements:[" << std::endl;
     for (const auto& val : ll.seqList) {
         os << "    " << val << std::endl;
     }
     os << "]" << std::endl;
     return os;
 }
+
+OrderedLL::iterator BasicLinkedList::purgeListElem(OrderedLL::iterator it) {
+    StoredValue::UniquePtr purged(&*it);
+    {
+        std::lock_guard<std::mutex> lckGd(writeLock);
+        it = seqList.erase(it);
+    }
+
+    /* Update the stats tracking the memory owned by the list */
+    staleSize.fetch_sub(purged->size());
+    staleMetaDataSize.fetch_sub(purged->metaDataSize());
+    st.currentSize.fetch_sub(purged->metaDataSize());
+
+    // Similary for the item counts:
+    --numStaleItems;
+    if (purged->isDeleted()) {
+        --numDeletedItems;
+    }
+
+    if (purged->isDeleted()) {
+        highestPurgedDeletedSeqno = std::max(seqno_t(highestPurgedDeletedSeqno),
+                                             purged->getBySeqno());
+    }
+    return it;
+}
index d86667a..6ecf898 100644 (file)
@@ -28,8 +28,9 @@
 #include "seqlist.h"
 #include "stored-value.h"
 
-#include <relaxed_atomic.h>
 #include <boost/intrusive/list.hpp>
+#include <platform/non_negative_counter.h>
+#include <relaxed_atomic.h>
 
 /* This option will configure "list" to use the member hook */
 using MemberHookOption =
@@ -149,6 +150,8 @@ public:
 
     void markItemStale(StoredValue::UniquePtr ownedSv) override;
 
+    size_t purgeTombstones() override;
+
     uint64_t getNumStaleItems() const override;
 
     size_t getStaleValueBytes() const override;
@@ -163,6 +166,8 @@ public:
 
     uint64_t getHighestDedupedSeqno() const override;
 
+    seqno_t getHighestPurgedDeletedSeqno() const override;
+
     uint64_t getRangeReadBegin() const override;
 
     uint64_t getRangeReadEnd() const override;
@@ -174,7 +179,7 @@ protected:
     OrderedLL seqList;
 
     /**
-     * Lock that serializes writes (append, update, cleanStaleItems) on
+     * Lock that serializes writes (append, update, purgeTombstones) on
      * 'seqList'
      */
     mutable std::mutex writeLock;
@@ -193,6 +198,21 @@ protected:
      */
     mutable SpinLock rangeLock;
 
+    /**
+     * Lock that serializes range reads on the 'seqList' - i.e. serializes
+     * the addition / removal of range reads from the set in-flight.
+     * We need to serialize range reads because, range reads set a list level
+     * range in which items are read. If we have multiple range reads then we
+     * must handle the races in the updation of the range to have most inclusive
+     * range.
+     * For now we use this lock to allow only one range read at a time.
+     *
+     * It is also additionally used in purgeTombstones() to prevent the
+     * creation of any new rangeReads while purge is in-progress - see
+     * detailed comments there.
+     */
+    std::mutex rangeReadLock;
+
     /* Overall memory consumed by (stale) OrderedStoredValues owned by the
        list */
     Couchbase::RelaxedAtomic<size_t> staleSize;
@@ -202,15 +222,7 @@ protected:
     Couchbase::RelaxedAtomic<size_t> staleMetaDataSize;
 
 private:
-    /**
-     * Lock that serializes range reads on the 'seqList'.
-     * We need to serialize range reads because, range reads set a list level
-     * range in which items are read. If we have multiple range reads then we
-     * must handle the races in the updation of the range to have most inclusive
-     * range.
-     * For now we use this lock to allow only one range read at a time.
-     */
-    std::mutex rangeReadLock;
+    OrderedLL::iterator purgeListElem(OrderedLL::iterator it);
 
     /**
      * We need to keep track of the highest seqno separately because there is a
@@ -233,11 +245,19 @@ private:
     Monotonic<seqno_t> highestDedupedSeqno;
 
     /**
+     * The sequence number of the highest purged element.
+     *
+     * This should be non-decrementing, apart from a rollback where it will be
+     * reset.
+     */
+    Monotonic<seqno_t> highestPurgedDeletedSeqno;
+
+    /**
      * Indicates the number of elements in the list that are stale (old,
      * duplicate values). Stale items are owned by the list and hence must
      * periodically clean them up.
      */
-    uint64_t numStaleItems;
+    cb::NonNegativeCounter<uint64_t> numStaleItems;
 
     /**
      * Indicates the number of logically deleted items in the list.
@@ -245,7 +265,7 @@ private:
      * replication, we need to keep deleted items for while and periodically
      * purge them
      */
-    uint64_t numDeletedItems;
+    cb::NonNegativeCounter<uint64_t> numDeletedItems;
 
     /* Used only to log debug messages */
     const uint16_t vbid;
index c2de697..10174a9 100644 (file)
@@ -139,11 +139,21 @@ public:
      *       type)
      *
      * @param ownedSv StoredValue whose ownership is passed to the sequential
-     *                data strucuture.
+     *                data structure.
      */
     virtual void markItemStale(StoredValue::UniquePtr ownedSv) = 0;
 
     /**
+     * Remove from sequence list and delete all OSVs which are purgable.
+     * OSVs which can be purged are items which are outside the ReadRange and
+     * are Stale.
+     *
+     * @return The number of items purged from the sequence list (and hence
+     *         deleted).
+     */
+    virtual size_t purgeTombstones() = 0;
+
+    /**
      * Returns the number of stale items in the list.
      *
      * @return count of stale items
@@ -185,6 +195,11 @@ public:
     virtual uint64_t getHighestDedupedSeqno() const = 0;
 
     /**
+     * Returns the highest purged Deleted sequence number in the list.
+     */
+    virtual seqno_t getHighestPurgedDeletedSeqno() const = 0;
+
+    /**
      * Returns the current range read begin sequence number.
      */
     virtual uint64_t getRangeReadBegin() const = 0;
index ef21855..cd9f691 100644 (file)
@@ -32,6 +32,10 @@ public:
     MockBasicLinkedList(EPStats& st) : BasicLinkedList(0, st) {
     }
 
+    OrderedLL& getSeqList() {
+        return seqList;
+    }
+
     std::vector<seqno_t> getAllSeqnoForVerification() const {
         std::vector<seqno_t> allSeqnos;
         std::lock_guard<std::mutex> lckGd(writeLock);
@@ -42,6 +46,11 @@ public:
         return allSeqnos;
     }
 
+    /// Expose the rangeReadLock for testing.
+    std::mutex& getRangeReadLock() {
+        return rangeReadLock;
+    }
+
     /// Expose the writeLock for testins.
     std::mutex& getWriteLock() {
         return writeLock;
@@ -52,4 +61,9 @@ public:
         std::lock_guard<SpinLock> lh(rangeLock);
         readRange = SeqRange(start, end);
     }
+
+    void resetReadRange() {
+        std::lock_guard<SpinLock> lh(rangeLock);
+        readRange.reset();
+    }
 };
index a2e36cd..eab4511 100644 (file)
@@ -73,6 +73,10 @@ public:
         return mockLL->getNumItems();
     }
 
+    MockBasicLinkedList* getLL() {
+        return mockLL;
+    }
+
 private:
     /* non owning ptr to the linkedlist in the ephemeral vbucket obj */
     MockBasicLinkedList* mockLL;
index 042da58..62ee806 100644 (file)
 #include "../mock/mock_ephemeral_vb.h"
 #include "failover-table.h"
 #include "test_helpers.h"
+#include "thread_gate.h"
 #include "vbucket_test.h"
 
+#include <thread>
+
 class EphemeralVBucketTest : public VBucketTest {
 protected:
     void SetUp() {
@@ -189,3 +192,210 @@ TEST_F(EphemeralVBucketTest, SoftDeleteDuringBackfill) {
     EXPECT_EQ(numItems * 2 - /* since 2 items are deduped*/ 2,
               mockEpheVB->public_getNumListItems());
 }
+
+// EphemeralVB Tombstone Purging //////////////////////////////////////////////
+
+class EphTombstoneTest : public EphemeralVBucketTest {
+protected:
+    void SetUp() override {
+        EphemeralVBucketTest::SetUp();
+
+        // Store three items to work with.
+        keys = generateKeys(3);
+        setMany(keys, MutationStatus::WasClean);
+        ASSERT_EQ(3, vbucket->getNumItems());
+    }
+    std::vector<StoredDocKey> keys;
+};
+
+// Check an empty seqList is handled correctly.
+TEST_F(EphTombstoneTest, ZeroElementPurge) {
+    // Create a new empty VB (using parent class SetUp).
+    EphemeralVBucketTest::SetUp();
+    ASSERT_EQ(0, mockEpheVB->public_getNumListItems());
+
+    EXPECT_EQ(0, mockEpheVB->purgeTombstones(0));
+}
+
+// Check a seqList with one element is handled correctly.
+TEST_F(EphTombstoneTest, OneElementPurge) {
+    // Create a new empty VB (using parent class SetUp).
+    EphemeralVBucketTest::SetUp();
+    setOne(makeStoredDocKey("one"), MutationStatus::WasClean);
+    ASSERT_EQ(1, mockEpheVB->public_getNumListItems());
+
+    EXPECT_EQ(0, mockEpheVB->purgeTombstones(0));
+}
+
+// Check that nothing is purged if no items are stale.
+TEST_F(EphTombstoneTest, NoPurgeIfNoneStale) {
+    // Run purger - nothing should be removed.
+    EXPECT_EQ(0, mockEpheVB->purgeTombstones(0));
+    EXPECT_EQ(keys.size(), vbucket->getNumItems());
+}
+
+// Check that deletes are not purged if they are not old enough.
+TEST_F(EphTombstoneTest, NoPurgeIfNoneOldEnough) {
+    // Delete the first item "now"
+    softDeleteOne(keys.at(0), MutationStatus::WasDirty);
+    ASSERT_EQ(2, vbucket->getNumItems());
+    ASSERT_EQ(1, vbucket->getNumInMemoryDeletes());
+
+    // Advance time by 5 seconds and run the EphTombstonePurger specifying a
+    // purge_age of 10s - nothing
+    // should be purged.
+    TimeTraveller theTerminator(5);
+    EXPECT_EQ(0, mockEpheVB->purgeTombstones(10));
+
+    EXPECT_EQ(2, vbucket->getNumItems());
+    EXPECT_EQ(1, vbucket->getNumInMemoryDeletes());
+}
+
+// Check that items should be purged when they are old enough.
+TEST_F(EphTombstoneTest, OnePurgeIfDeletedItemOld) {
+    // Delete the first item "now"
+    softDeleteOne(keys.at(0), MutationStatus::WasDirty);
+    ASSERT_EQ(2, vbucket->getNumItems());
+    ASSERT_EQ(1, vbucket->getNumInMemoryDeletes());
+
+    // Delete the second item at time 30.
+    TimeTraveller looper(30);
+    softDeleteOne(keys.at(1), MutationStatus::WasDirty);
+    ASSERT_EQ(1, vbucket->getNumItems());
+    ASSERT_EQ(2, vbucket->getNumInMemoryDeletes());
+
+    // and the third at time 60.
+    TimeTraveller looper2(30);
+    softDeleteOne(keys.at(2), MutationStatus::WasDirty);
+    ASSERT_EQ(0, vbucket->getNumItems());
+    ASSERT_EQ(3, vbucket->getNumInMemoryDeletes());
+
+    // Run the EphTombstonePurger specifying a purge_age of 60s - only key0
+    // should be purged.
+    mockEpheVB->purgeTombstones(60);
+
+    EXPECT_EQ(0, vbucket->getNumItems());
+    EXPECT_EQ(2, vbucket->getNumInMemoryDeletes());
+    EXPECT_EQ(4, vbucket->getPurgeSeqno())
+            << "Should have purged up to 4th update (1st delete, after 3 sets)";
+    EXPECT_EQ(nullptr, findValue(keys.at(0)));
+    EXPECT_NE(nullptr, findValue(keys.at(1)));
+    EXPECT_NE(nullptr, findValue(keys.at(2)));
+}
+
+// Check that deleted items can be purged immediately.
+TEST_F(EphTombstoneTest, ImmediateDeletedPurge) {
+    // Advance to non-zero time.
+    TimeTraveller jamesCole(10);
+
+    // Delete the first item at 10s
+    softDeleteOne(keys.at(0), MutationStatus::WasDirty);
+    ASSERT_EQ(2, vbucket->getNumItems());
+    ASSERT_EQ(1, vbucket->getNumInMemoryDeletes());
+
+    // Run the EphTombstonePurger specifying a purge_age of 0s - key0 should
+    // be immediately purged.
+    mockEpheVB->purgeTombstones(0);
+    EXPECT_EQ(2, vbucket->getNumItems());
+    EXPECT_EQ(0, vbucket->getNumInMemoryDeletes());
+    EXPECT_EQ(4, vbucket->getPurgeSeqno())
+            << "Should have purged up to 4th update (1st delete, after 3 sets)";
+    EXPECT_EQ(nullptr, findValue(keys.at(0)));
+    EXPECT_NE(nullptr, findValue(keys.at(1)));
+    EXPECT_NE(nullptr, findValue(keys.at(2)));
+}
+
+// Check that alive, stale items have no constraint on age.
+TEST_F(EphTombstoneTest, ImmediatePurgeOfAliveStale) {
+    // Perform a mutation on the second element, with a (fake) Range Read in
+    // place; causing the initial OSV to be marked as stale and a new OSV to
+    // be added for that key.
+    auto& seqList = mockEpheVB->getLL()->getSeqList();
+    {
+        std::lock_guard<std::mutex> rrGuard(
+                mockEpheVB->getLL()->getRangeReadLock());
+        mockEpheVB->registerFakeReadRange(1, 2);
+        setOne(keys.at(1), MutationStatus::WasClean);
+
+        // Sanity check - our state is as expected:
+        ASSERT_EQ(3, vbucket->getNumItems());
+        ASSERT_EQ(4, seqList.size());
+        auto staleIt = std::next(seqList.begin());
+        auto newIt = seqList.rbegin();
+        ASSERT_EQ(staleIt->getKey(), newIt->getKey());
+        {
+            std::lock_guard<std::mutex> writeGuard(
+                    mockEpheVB->getLL()->getWriteLock());
+            ASSERT_TRUE(staleIt->isStale(writeGuard));
+            ASSERT_FALSE(newIt->isStale(writeGuard));
+        }
+
+        // Attempt a purge - should not remove anything as read range is in
+        // place.
+        EXPECT_EQ(0, mockEpheVB->purgeTombstones(0));
+        EXPECT_EQ(3, vbucket->getNumItems());
+        EXPECT_EQ(4, seqList.size());
+        EXPECT_EQ(0, vbucket->getPurgeSeqno());
+
+        // Clear the ReadRange (so we can actually purge items) and retry the
+        // purge which should now succeed.
+        mockEpheVB->getLL()->resetReadRange();
+    } // END rrGuard.
+
+    EXPECT_EQ(1, mockEpheVB->purgeTombstones(0));
+    EXPECT_EQ(3, vbucket->getNumItems());
+    EXPECT_EQ(3, seqList.size());
+}
+
+// Test that deleted items purged out of order are handled correctly (and
+// highestDeletedPurged is updated).
+TEST_F(EphTombstoneTest, PurgeOutOfOrder) {
+    // Delete the 3rd item.
+    softDeleteOne(keys.at(2), MutationStatus::WasDirty);
+
+    // Run the tombstone purger.
+    mockEpheVB->getLL()->resetReadRange();
+    ASSERT_EQ(1, mockEpheVB->purgeTombstones(0));
+    ASSERT_EQ(2, vbucket->getNumItems());
+    EXPECT_EQ(4, vbucket->getPurgeSeqno());
+
+    // Delete the 1st item
+    softDeleteOne(keys.at(0), MutationStatus::WasDirty);
+
+    // Run the tombstone purger. This should succeed, but with
+    // highestDeletedPurged unchanged.
+    ASSERT_EQ(1, mockEpheVB->purgeTombstones(0));
+    ASSERT_EQ(1, vbucket->getNumItems());
+    EXPECT_EQ(5, vbucket->getPurgeSeqno());
+}
+
+// Thread-safety test (intended to run via Valgrind / ASan / TSan) -
+// perform sets and deletes on 2 additional threads while the purger
+// runs constantly in the main thread.
+TEST_F(EphTombstoneTest, ConcurrentPurge) {
+    ThreadGate started(2);
+    std::atomic<size_t> completed(0);
+
+    auto writer = [this](
+            ThreadGate& started, std::atomic<size_t>& completed, size_t id) {
+        started.threadUp();
+        for (size_t ii = 0; ii < 5000; ++ii) {
+            auto key = makeStoredDocKey(std::to_string(id) + ":key_" +
+                                        std::to_string(ii));
+            Item item(key, /*flags*/ 0, /*expiry*/ 0, key.data(), key.size());
+            public_processSet(item, item.getCas());
+            softDeleteOne(key, MutationStatus::WasDirty);
+        }
+        ++completed;
+    };
+    std::thread fe1{writer, std::ref(started), std::ref(completed), 1};
+    std::thread fe2{writer, std::ref(started), std::ref(completed), 2};
+
+    size_t purged = 0;
+    do {
+        purged += mockEpheVB->purgeTombstones(0);
+    } while (completed != 2);
+
+    fe1.join();
+    fe2.join();
+}
index 80b3294..5068eee 100644 (file)
@@ -100,6 +100,10 @@ void VBucketTest::softDeleteMany(std::vector<StoredDocKey>& keys,
     }
 }
 
+StoredValue* VBucketTest::findValue(StoredDocKey& key) {
+    return vbucket->ht.find(key, TrackReference::Yes, WantsDeleted::Yes);
+}
+
 void VBucketTest::verifyValue(StoredDocKey& key,
                               const char* value,
                               TrackReference trackReference,
index 4d4f2b9..edb2de7 100644 (file)
@@ -63,6 +63,8 @@ protected:
 
     void softDeleteMany(std::vector<StoredDocKey>& keys, MutationStatus expect);
 
+    StoredValue* findValue(StoredDocKey& key);
+
     void verifyValue(StoredDocKey& key,
                      const char* value,
                      TrackReference trackReference,