MB-24151: Implement memory managed backfills using Range Iterators 55/78055/9
authorManu Dhundi <manu@couchbase.com>
Wed, 17 May 2017 23:21:28 +0000 (16:21 -0700)
committerDave Rigby <daver@couchbase.com>
Thu, 18 May 2017 15:48:44 +0000 (15:48 +0000)
This commit implements memory management in the backfills of
Ephemeral buckets. It borrows the idea from disk backfills where
the backfill is suspended upon high memory usage, that is items
are not put onto the readyQ of the stream.

At the core is a Backfill state machine which is driven by
the BackfillManagerTask. It keeps track of any suspended backill
and also resumes from the suspended point. It also creates a range
iterators on the in-memory sequence list and using this iterator
it reads (backfills) items one by one.

This commit also adds certain features to the sequence list iterators
to get snapshot numbers, estimate num items in backfill etc.

Change-Id: Id92b0693763e550f842fb7fb5911cfefd8935e79
Reviewed-on: http://review.couchbase.org/78055
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
16 files changed:
configuration.json
src/dcp/backfill.h
src/dcp/backfill_disk.cc
src/dcp/backfill_disk.h
src/dcp/backfill_memory.cc
src/dcp/backfill_memory.h
src/ephemeral_vb.cc
src/ephemeral_vb.h
src/linked_list.cc
src/linked_list.h
src/seqlist.cc
src/seqlist.h
tests/ep_testsuite.cc
tests/ep_testsuite_dcp.cc
tests/mock/mock_stream.h
tests/module_tests/dcp_test.cc

index e3bc23b..f9e9f03 100644 (file)
             "dynamic": false,
             "type": "size_t"
         },
+        "dcp_ephemeral_backfill_type": {
+            "default": "buffered",
+            "descr": "Type of memory backfill done in Ephemeral buckets",
+            "type": "std::string",
+            "validator": {
+                "enum": [
+                         "none",
+                         "buffered"
+                         ]
+            }
+        },
         "dcp_flow_control_policy": {
             "default": "aggressive",
             "descr": "Flow control policy used on consumer side buffer",
index 16089ab..cdcb891 100644 (file)
@@ -55,7 +55,9 @@ public:
      *
      * @return vbid
      */
-    virtual uint16_t getVBucketId() = 0;
+    uint16_t getVBucketId() const {
+        return stream->getVBucket();
+    }
 
     /**
      * Indicates if the DCP stream associated with the backfill is dead
index 31b2b28..bd5210b 100644 (file)
@@ -144,10 +144,6 @@ backfill_status_t DCPBackfillDisk::run() {
                            std::to_string(state));
 }
 
-uint16_t DCPBackfillDisk::getVBucketId() {
-    return stream->getVBucket();
-}
-
 void DCPBackfillDisk::cancel() {
     LockHolder lh(lock);
     if (state != backfill_state_done) {
index 8d97007..9291790 100644 (file)
@@ -72,8 +72,6 @@ public:
 
     backfill_status_t run() override;
 
-    uint16_t getVBucketId() override;
-
     bool isStreamDead() override {
         return !stream->isActive();
     }
index 63491ab..4e5773c 100644 (file)
@@ -102,6 +102,214 @@ backfill_status_t DCPBackfillMemory::run() {
     return backfill_finished;
 }
 
-uint16_t DCPBackfillMemory::getVBucketId() {
-    return stream->getVBucket();
+DCPBackfillMemoryBuffered::DCPBackfillMemoryBuffered(EphemeralVBucketPtr evb,
+                                                     const active_stream_t& s,
+                                                     uint64_t startSeqno,
+                                                     uint64_t endSeqno)
+    : DCPBackfill(s, startSeqno, endSeqno),
+      weakVb(evb),
+      state(BackfillState::Init),
+      rangeItr(nullptr) {
+}
+
+backfill_status_t DCPBackfillMemoryBuffered::run() {
+    auto evb = weakVb.lock();
+    if (!evb) {
+        /* We don't have to close the stream here. Task doing vbucket state
+         change should handle stream closure */
+        LOG(EXTENSION_LOG_WARNING,
+            "DCPBackfillMemoryBuffered::run(): "
+            "(vb:%d) running backfill ended prematurely as weakVb can't be "
+            "locked; start seqno:%" PRIi64 ", end seqno:%" PRIi64,
+            getVBucketId(),
+            startSeqno,
+            endSeqno);
+        cancel();
+        return backfill_finished;
+    }
+
+    switch (state) {
+    case BackfillState::Init:
+        return create(*evb);
+    case BackfillState::Scanning:
+        return scan();
+    case BackfillState::Done:
+        return backfill_finished;
+    }
+
+    throw std::logic_error("DCPBackfillDisk::run: Invalid backfill state " +
+                           backfillStateToString(state));
+}
+
+void DCPBackfillMemoryBuffered::cancel() {
+    if (state != BackfillState::Done) {
+        complete(true);
+    }
+}
+
+backfill_status_t DCPBackfillMemoryBuffered::create(EphemeralVBucket& evb) {
+    /* Create range read cursor */
+    try {
+        rangeItr = evb.makeRangeIterator();
+    } catch (const std::bad_alloc&) {
+        stream->getLogger().log(
+                EXTENSION_LOG_WARNING,
+                "Alloc error when trying to create a range iterator"
+                "on the sequence list for (vb %" PRIu16 ")",
+                getVBucketId());
+        /* Try backfilling again later; here we snooze because system has
+           hit ENOMEM */
+        return backfill_snooze;
+    }
+
+    /* Advance the cursor till start, mark snapshot and update backfill
+       remaining count */
+    while (rangeItr.curr() != rangeItr.end()) {
+        if (static_cast<uint64_t>((*rangeItr).getBySeqno()) >= startSeqno) {
+            /* Incr backfill remaining
+               [EPHE TODO]: This will be inaccurate if do not backfill till end
+                            of the iterator
+             */
+            stream->incrBackfillRemaining(rangeItr.count());
+
+            /* Determine the endSeqno of the current snapshot.
+               We want to send till requested endSeqno, but if that cannot
+               constitute a snapshot then we need to send till the point
+               which can be called as snapshot end */
+            endSeqno = std::max(
+                    endSeqno,
+                    static_cast<uint64_t>(rangeItr.getEarlySnapShotEnd()));
+
+            /* We want to send items only till the point it is necessary to do
+               so */
+            endSeqno =
+                    std::min(endSeqno, static_cast<uint64_t>(rangeItr.back()));
+
+            /* Mark disk snapshot */
+            stream->markDiskSnapshot(startSeqno, endSeqno);
+
+            /* Change the backfill state */
+            transitionState(BackfillState::Scanning);
+
+            /* Jump to scan here itself */
+            return scan();
+        }
+        ++rangeItr;
+    }
+
+    /* Backfill is not needed as startSeqno > rangeItr end seqno */
+    complete(false);
+    return backfill_success;
+}
+
+backfill_status_t DCPBackfillMemoryBuffered::scan() {
+    if (!(stream->isActive())) {
+        /* Stop prematurely if the stream state changes */
+        complete(true);
+        return backfill_success;
+    }
+
+    /* Read items */
+    UniqueItemPtr item;
+    while (static_cast<uint64_t>(rangeItr.curr()) <= endSeqno) {
+        try {
+            item = (*rangeItr).toItem(false, getVBucketId());
+        } catch (const std::bad_alloc&) {
+            stream->getLogger().log(
+                    EXTENSION_LOG_WARNING,
+                    "Alloc error when trying to create an "
+                    "item copy from hash table. Item seqno:%" PRIi64
+                    ", vb:%" PRIu16,
+                    (*rangeItr).getBySeqno(),
+                    getVBucketId());
+            /* Try backfilling again later; here we snooze because system has
+               hit ENOMEM */
+            return backfill_snooze;
+        }
+
+        if (!stream->backfillReceived(
+                    std::move(item), BACKFILL_FROM_MEMORY, /*force*/ false)) {
+            /* Try backfill again later; here we do not snooze because we
+               want to check if other backfills can be run by the
+               backfillMgr */
+            stream->getLogger().log(EXTENSION_LOG_WARNING,
+                                    "Deferring the backfill for (vb %d) as "
+                                    "scan buffer or backfill buffer is full",
+                                    getVBucketId());
+            return backfill_success;
+        }
+        ++rangeItr;
+    }
+
+    /* Backfill has ran to completion */
+    complete(false);
+
+    return backfill_success;
+}
+
+void DCPBackfillMemoryBuffered::complete(bool cancelled) {
+    uint16_t vbid = getVBucketId();
+
+    /* [EPHE TODO]: invalidate cursor sooner before it gets deleted */
+
+    stream->completeBackfill();
+
+    EXTENSION_LOG_LEVEL severity =
+            cancelled ? EXTENSION_LOG_NOTICE : EXTENSION_LOG_INFO;
+    stream->getLogger().log(severity,
+                            "(vb %d) Backfill task (%" PRIu64 " to %" PRIu64
+                            ") %s",
+                            vbid,
+                            startSeqno,
+                            endSeqno,
+                            cancelled ? "cancelled" : "finished");
+
+    transitionState(BackfillState::Done);
+}
+
+void DCPBackfillMemoryBuffered::transitionState(BackfillState newState) {
+    if (state == newState) {
+        return;
+    }
+
+    bool validTransition = false;
+    switch (newState) {
+    case BackfillState::Init:
+        /* Not valid to transition back to 'init' */
+        break;
+    case BackfillState::Scanning:
+        if (state == BackfillState::Init) {
+            validTransition = true;
+        }
+        break;
+    case BackfillState::Done:
+        if (state == BackfillState::Init || state == BackfillState::Scanning) {
+            validTransition = true;
+        }
+        break;
+    }
+
+    if (!validTransition) {
+        throw std::invalid_argument(
+                "DCPBackfillMemoryBuffered::transitionState:"
+                " newState (which is " +
+                backfillStateToString(newState) +
+                ") is not valid for current state (which is " +
+                backfillStateToString(state) + ")");
+    }
+
+    state = newState;
+}
+
+std::string DCPBackfillMemoryBuffered::backfillStateToString(
+        BackfillState state) {
+    switch (state) {
+    case BackfillState::Init:
+        return "initalizing";
+    case BackfillState::Scanning:
+        return "scanning";
+    case BackfillState::Done:
+        return "done";
+    }
+    return "Invalid state"; // dummy to avert certain compiler warnings
 }
index 6dcc768..8d20e50 100644 (file)
@@ -40,8 +40,6 @@ public:
 
     backfill_status_t run() override;
 
-    uint16_t getVBucketId() override;
-
     bool isStreamDead() override {
         return !stream->isActive();
     }
@@ -55,3 +53,74 @@ private:
      */
     std::weak_ptr<EphemeralVBucket> weakVb;
 };
+
+/**
+ * Concrete class that does backfill from in-memory ordered data strucuture and
+ * informs the DCP stream of the backfill progress.
+ *
+ * This class calls one synchronous vBucket API to read items in the sequential
+ * order from the in-memory ordered data structure and calls the DCP stream
+ * for disk snapshot, backfill items and backfill completion.
+ */
+class DCPBackfillMemoryBuffered : public DCPBackfill {
+public:
+    DCPBackfillMemoryBuffered(EphemeralVBucketPtr evb,
+                              const active_stream_t& s,
+                              uint64_t startSeqno,
+                              uint64_t endSeqno);
+
+    backfill_status_t run() override;
+
+    bool isStreamDead() override {
+        return !stream->isActive();
+    }
+
+    void cancel() override;
+
+private:
+    /* The possible states of the DCPBackfillMemoryBuffered */
+    enum class BackfillState : uint8_t { Init, Scanning, Done };
+
+    static std::string backfillStateToString(BackfillState state);
+
+    /**
+     * Creates a range iterator on Ephemeral VBucket to read items as a snapshot
+     * in sequential order. Backfill snapshot range is decided here.
+     *
+     * @param evb Ref to the ephemeral vbucket on which backfill is run
+     */
+    backfill_status_t create(EphemeralVBucket& evb);
+
+    /**
+     * Reads the items in the snapshot (iterator) one by one. In case of high
+     * memory usage postpones the reading of items, and reading can be resumed
+     * later on from that point.
+     */
+    backfill_status_t scan();
+
+    /**
+     * Indicates the completion to the stream.
+     *
+     * @param cancelled indicates if the backfill finished fully or was
+     *                  cancelled in between; for debug
+     */
+    void complete(bool cancelled);
+
+    /**
+     * Makes valid transitions on the backfill state machine
+     */
+    void transitionState(BackfillState newState);
+
+    /**
+     * Ensures there can be no cyclic dependency with VB pointers in the
+     * complex DCP slab of objects and tasks.
+     */
+    std::weak_ptr<EphemeralVBucket> weakVb;
+
+    BackfillState state;
+
+    /**
+     * Range iterator (on the vbucket) created for the backfill
+     */
+    SequenceList::RangeIterator rangeItr;
+};
index 5defa9d..f013fe9 100644 (file)
@@ -57,7 +57,13 @@ EphemeralVBucket::EphemeralVBucket(id_type i,
               purgeSeqno,
               maxCas,
               collectionsManifest),
-      seqList(std::make_unique<BasicLinkedList>(i, st)) {
+      seqList(std::make_unique<BasicLinkedList>(i, st)),
+      backfillType(BackfillType::None) {
+    /* Get the flow control policy */
+    std::string dcpBackfillType = config.getDcpEphemeralBackfillType();
+    if (!dcpBackfillType.compare("buffered")) {
+        backfillType = BackfillType::Buffered;
+    }
 }
 
 size_t EphemeralVBucket::getNumItems() const {
@@ -237,8 +243,13 @@ std::unique_ptr<DCPBackfill> EphemeralVBucket::createDCPBackfill(
     /* create a memory backfill object */
     EphemeralVBucketPtr evb =
             std::static_pointer_cast<EphemeralVBucket>(shared_from_this());
-    return std::make_unique<DCPBackfillMemory>(
-            evb, stream, startSeqno, endSeqno);
+    if (backfillType == BackfillType::Buffered) {
+        return std::make_unique<DCPBackfillMemoryBuffered>(
+                evb, stream, startSeqno, endSeqno);
+    } else {
+        return std::make_unique<DCPBackfillMemory>(
+                evb, stream, startSeqno, endSeqno);
+    }
 }
 
 std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
@@ -246,6 +257,10 @@ EphemeralVBucket::inMemoryBackfill(uint64_t start, uint64_t end) {
     return seqList->rangeRead(start, end);
 }
 
+SequenceList::RangeIterator EphemeralVBucket::makeRangeIterator() {
+    return seqList->makeRangeIterator();
+}
+
 /* Vb level backfill queue is for items in a huge snapshot (disk backfill
    snapshots from DCP are typically huge) that could not be fit on a
    checkpoint. They update all stats, checkpoint seqno, but are not put
index 8fdb1f3..91059e6 100644 (file)
@@ -123,6 +123,11 @@ public:
     std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
     inMemoryBackfill(uint64_t start, uint64_t end);
 
+    /**
+     * Returns a range iterator for the underlying SequenceList obj
+     */
+    SequenceList::RangeIterator makeRangeIterator();
+
     void dump() const override;
 
     uint64_t getPersistenceSeqno() const override {
@@ -230,6 +235,12 @@ private:
      *  (removed from seqList and deleted).
      */
     EPStats::Counter seqListPurgeCount;
+
+    /**
+     * Enum indicating if the backfill is memory managed or not
+     */
+    enum class BackfillType : uint8_t { None, Buffered };
+    BackfillType backfillType;
 };
 
 using EphemeralVBucketPtr = std::shared_ptr<EphemeralVBucket>;
index 56bfb7b..b7c8a9b 100644 (file)
@@ -414,7 +414,10 @@ OrderedLL::iterator BasicLinkedList::purgeListElem(OrderedLL::iterator it) {
 }
 
 BasicLinkedList::RangeIteratorLL::RangeIteratorLL(BasicLinkedList& ll)
-    : list(ll), readLockHolder(list.rangeReadLock), itrRange(0, 0) {
+    : list(ll),
+      readLockHolder(list.rangeReadLock),
+      itrRange(0, 0),
+      numRemaining(0) {
     std::lock_guard<std::mutex> listWriteLg(list.getListWriteLock());
     std::lock_guard<SpinLock> lh(list.rangeLock);
     if (list.highSeqno < 1) {
@@ -427,9 +430,17 @@ BasicLinkedList::RangeIteratorLL::RangeIteratorLL(BasicLinkedList& ll)
     /* Iterator to the beginning of linked list */
     currIt = list.seqList.begin();
 
+    /* Number of items that can be iterated over */
+    numRemaining = list.seqList.size();
+
+    /* The minimum seqno in the iterator that must be read to get a consistent
+       read snapshot */
+    earlySnapShotEndSeqno = list.highestDedupedSeqno;
+
     /* Mark the snapshot range on linked list. The range that can be read by the
        iterator is inclusive of the start and the end. */
-    list.readRange = SeqRange(currIt->getBySeqno(), list.highSeqno);
+    list.readRange =
+            SeqRange(currIt->getBySeqno(), list.seqList.back().getBySeqno());
 
     /* Keep the range in the iterator obj. We store the range end seqno as one
        higher than the end seqno that can be read by this iterator.
@@ -439,7 +450,8 @@ BasicLinkedList::RangeIteratorLL::RangeIteratorLL(BasicLinkedList& ll)
        Further, since use the class 'SeqRange' for 'itrRange' we cannot use
        curr() == end() + 1 to identify the end point because 'SeqRange' does
        not internally allow curr > end */
-    itrRange = SeqRange(currIt->getBySeqno(), list.highSeqno + 1);
+    itrRange = SeqRange(currIt->getBySeqno(),
+                        list.seqList.back().getBySeqno() + 1);
 }
 
 BasicLinkedList::RangeIteratorLL::~RangeIteratorLL() {
@@ -470,6 +482,8 @@ operator++() {
                 std::to_string(end()));
     }
 
+    --numRemaining;
+
     /* Check if the iterator is pointing to the last element. Increment beyond
        the last element indicates the end of the iteration */
     if (curr() == itrRange.getEnd() - 1) {
index c68973c..8c5133e 100644 (file)
@@ -324,6 +324,18 @@ private:
             return itrRange.getEnd();
         }
 
+        seqno_t back() const override {
+            return itrRange.getEnd() - 1;
+        }
+
+        uint64_t count() const override {
+            return numRemaining;
+        }
+
+        seqno_t getEarlySnapShotEnd() const override {
+            return earlySnapShotEndSeqno;
+        }
+
     private:
         /* Ref to BasicLinkedList object which is iterated by this iterator.
            By setting the member variables of the list obj appropriately we
@@ -338,6 +350,14 @@ private:
 
         /* Current range of the iterator */
         SeqRange itrRange;
+
+        /* Number of items that can be iterated over by this (forward only)
+           iterator at that instance */
+        uint64_t numRemaining;
+
+        /* Indicates the minimum seqno in the iterator that can give a
+           consistent read snapshot */
+        seqno_t earlySnapShotEndSeqno;
     };
 
     friend class RangeIteratorLL;
index 02d2705..b530274 100644 (file)
@@ -32,10 +32,22 @@ SequenceList::RangeIterator& SequenceList::RangeIterator::operator++() {
     return *this;
 }
 
-seqno_t SequenceList::RangeIterator::curr() {
+seqno_t SequenceList::RangeIterator::curr() const {
     return rangeIterImpl->curr();
 }
 
-seqno_t SequenceList::RangeIterator::end() {
+seqno_t SequenceList::RangeIterator::end() const {
     return rangeIterImpl->end();
 }
+
+seqno_t SequenceList::RangeIterator::back() const {
+    return rangeIterImpl->back();
+}
+
+uint64_t SequenceList::RangeIterator::count() const {
+    return rangeIterImpl->count();
+}
+
+seqno_t SequenceList::RangeIterator::getEarlySnapShotEnd() const {
+    return rangeIterImpl->getEarlySnapShotEnd();
+}
index 2efa347..ba8bd7c 100644 (file)
@@ -104,6 +104,23 @@ protected:
          * in the iterator range
          */
         virtual seqno_t end() const = 0;
+
+        /**
+         * Seqno of the last item in the iterator
+         */
+        virtual seqno_t back() const = 0;
+
+        /**
+         * Returns the number of items that can be iterated over by this
+         * (forward only) iterator at that instance
+         */
+        virtual uint64_t count() const = 0;
+
+        /**
+         * Indicates the minimum seqno in the iterator that must be read to
+         * get a consistent read snapshot
+         */
+        virtual seqno_t getEarlySnapShotEnd() const = 0;
     };
 
 public:
@@ -153,6 +170,11 @@ public:
             : rangeIterImpl(std::move(other.rangeIterImpl)) {
         }
 
+        RangeIterator& operator=(RangeIterator&& other) {
+            rangeIterImpl = std::move(other.rangeIterImpl);
+            return *this;
+        }
+
         RangeIterator(RangeIterator& other) = delete;
 
         /**
@@ -169,13 +191,30 @@ public:
          * Curr iterator position, indicated by the seqno of the item at that
          * position
          */
-        seqno_t curr();
+        seqno_t curr() const;
 
         /**
-         * Curr iterator position, indicated by the seqno of the item at that
-         * position
+         * End position of iterator, indicated by the seqno > highest_seqno
+         * in the iterator range
+         */
+        seqno_t end() const;
+
+        /**
+         * Seqno of the last item in the iterator
+         */
+        seqno_t back() const;
+
+        /**
+         * Returns the number of items that can be iterated over by this
+         * (forward only) iterator at that instance
+         */
+        uint64_t count() const;
+
+        /**
+         * Indicates the minimum seqno in the iterator that must be read to
+         * get a consistent read snapshot
          */
-        seqno_t end();
+        seqno_t getEarlySnapShotEnd() const;
 
     private:
         /* Pointer to the abstract class of range iterator implementation */
index 93df30a..8b58081 100644 (file)
@@ -6455,6 +6455,7 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
                 "ep_dcp_conn_buffer_size_max",
                 "ep_dcp_conn_buffer_size_perc",
                 "ep_dcp_enable_noop",
+                "ep_dcp_ephemeral_backfill_type",
                 "ep_dcp_flow_control_policy",
                 "ep_dcp_max_unacked_bytes",
                 "ep_dcp_min_compression_ratio",
@@ -6629,6 +6630,7 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
                 "ep_dcp_consumer_process_buffered_messages_batch_size",
                 "ep_dcp_consumer_process_buffered_messages_yield_limit",
                 "ep_dcp_enable_noop",
+                "ep_dcp_ephemeral_backfill_type",
                 "ep_dcp_flow_control_policy",
                 "ep_dcp_idle_timeout",
                 "ep_dcp_max_unacked_bytes",
index d840eb6..20b3293 100644 (file)
@@ -1968,11 +1968,21 @@ static enum test_result test_dcp_producer_disk_backfill_limits(ENGINE_HANDLE *h,
            backfill_state_scanning state. */
         exp_backfill_task_runs = 4 + num_items;
     } else {
-        /* In case of ephemeral bucket there are no backfill states and no
-           limitations on memory used by a single backfill. Hence we expect
-           one backfill run for reading all items + one post all backfilling
-           is complete */
-        exp_backfill_task_runs = 2;
+        if (get_str_stat(h, h1, "ep_dcp_ephemeral_backfill_type") ==
+            std::string("buffered")) {
+            /* Backfill task runs are expected as below:
+               once for backfill_state_init + once post all backfills are
+               finished. Here since we have dcp_scan_byte_limit = 100, we expect
+               the backfill task to run additional 'num_items' during
+               BackfillState::scanning state. */
+            exp_backfill_task_runs = 2 + num_items;
+        } else {
+            /* In case of ephemeral bucket there are no backfill states and no
+               limitations on memory used by a single backfill. Hence we expect
+               one backfill run for reading all items + one post all backfilling
+               is complete */
+            exp_backfill_task_runs = 2;
+        }
     }
     checkeq(exp_backfill_task_runs,
             get_histo_stat(h, h1, "BackfillManagerTask", "runtimes",
index 933b8d7..45d8519 100644 (file)
@@ -106,12 +106,6 @@ public:
         return makeResponseFromItem(item);
     }
 
-    void waitForStreamClose() {
-        while (getState() != StreamState::Dead) {
-            usleep(10);
-        }
-    }
-
     /**
      * Consumes numItems from the stream readyQ
      */
index a896948..3d9bfa9 100644 (file)
@@ -475,40 +475,6 @@ TEST_P(StreamTest, BackfillOnly) {
                items are read correctly */
 }
 
-/* Test a backfill fail scenario */
-TEST_P(StreamTest, BackfillFail) {
-    if (bucketType == "persistent") {
-        /* This test simulates a backfill failure for an ephemeral bucket
-           only. */
-        return;
-    }
-
-    /* Add 3 items */
-    int numItems = 3;
-    for (int i = 0; i < numItems; ++i) {
-        std::string key("key" + std::to_string(i));
-        store_item(vbid, key, "value");
-    }
-
-    /* Set up a DCP stream for the backfill */
-    setup_dcp_stream();
-    MockActiveStream* mock_stream =
-            dynamic_cast<MockActiveStream*>(stream.get());
-
-    /* We want the backfill task to run in a background thread */
-    ExecutorPool::get()->setNumAuxIO(1);
-
-    /* Schedule a backfill task with an incorrect param:
-       here backfill_start > vb_highSeqno */
-    producer->scheduleBackfillManager(*vb0,
-                                      mock_stream,
-                                      /*backfillStart*/numItems + 5,
-                                      /*backfillEnd*/numItems + 10);
-
-    /* Expect stream to be closed */
-    mock_stream->waitForStreamClose();
-}
-
 /* Stream items from a DCP backfill with very small backfill buffer.
    However small the backfill buffer is, backfill must not stop, it must
    proceed to completion eventually */