"dynamic": false,
"type": "size_t"
},
+ "dcp_ephemeral_backfill_type": {
+ "default": "buffered",
+ "descr": "Type of memory backfill done in Ephemeral buckets",
+ "type": "std::string",
+ "validator": {
+ "enum": [
+ "none",
+ "buffered"
+ ]
+ }
+ },
"dcp_flow_control_policy": {
"default": "aggressive",
"descr": "Flow control policy used on consumer side buffer",
*
* @return vbid
*/
- virtual uint16_t getVBucketId() = 0;
+ uint16_t getVBucketId() const {
+ return stream->getVBucket();
+ }
/**
* Indicates if the DCP stream associated with the backfill is dead
std::to_string(state));
}
-uint16_t DCPBackfillDisk::getVBucketId() {
- return stream->getVBucket();
-}
-
void DCPBackfillDisk::cancel() {
LockHolder lh(lock);
if (state != backfill_state_done) {
backfill_status_t run() override;
- uint16_t getVBucketId() override;
-
bool isStreamDead() override {
return !stream->isActive();
}
return backfill_finished;
}
-uint16_t DCPBackfillMemory::getVBucketId() {
- return stream->getVBucket();
+DCPBackfillMemoryBuffered::DCPBackfillMemoryBuffered(EphemeralVBucketPtr evb,
+ const active_stream_t& s,
+ uint64_t startSeqno,
+ uint64_t endSeqno)
+ : DCPBackfill(s, startSeqno, endSeqno),
+ weakVb(evb),
+ state(BackfillState::Init),
+ rangeItr(nullptr) {
+}
+
+backfill_status_t DCPBackfillMemoryBuffered::run() {
+ auto evb = weakVb.lock();
+ if (!evb) {
+ /* We don't have to close the stream here. Task doing vbucket state
+ change should handle stream closure */
+ LOG(EXTENSION_LOG_WARNING,
+ "DCPBackfillMemoryBuffered::run(): "
+ "(vb:%d) running backfill ended prematurely as weakVb can't be "
+ "locked; start seqno:%" PRIi64 ", end seqno:%" PRIi64,
+ getVBucketId(),
+ startSeqno,
+ endSeqno);
+ cancel();
+ return backfill_finished;
+ }
+
+ switch (state) {
+ case BackfillState::Init:
+ return create(*evb);
+ case BackfillState::Scanning:
+ return scan();
+ case BackfillState::Done:
+ return backfill_finished;
+ }
+
+ throw std::logic_error("DCPBackfillDisk::run: Invalid backfill state " +
+ backfillStateToString(state));
+}
+
+void DCPBackfillMemoryBuffered::cancel() {
+ if (state != BackfillState::Done) {
+ complete(true);
+ }
+}
+
+backfill_status_t DCPBackfillMemoryBuffered::create(EphemeralVBucket& evb) {
+ /* Create range read cursor */
+ try {
+ rangeItr = evb.makeRangeIterator();
+ } catch (const std::bad_alloc&) {
+ stream->getLogger().log(
+ EXTENSION_LOG_WARNING,
+ "Alloc error when trying to create a range iterator"
+ "on the sequence list for (vb %" PRIu16 ")",
+ getVBucketId());
+ /* Try backfilling again later; here we snooze because system has
+ hit ENOMEM */
+ return backfill_snooze;
+ }
+
+ /* Advance the cursor till start, mark snapshot and update backfill
+ remaining count */
+ while (rangeItr.curr() != rangeItr.end()) {
+ if (static_cast<uint64_t>((*rangeItr).getBySeqno()) >= startSeqno) {
+ /* Incr backfill remaining
+ [EPHE TODO]: This will be inaccurate if do not backfill till end
+ of the iterator
+ */
+ stream->incrBackfillRemaining(rangeItr.count());
+
+ /* Determine the endSeqno of the current snapshot.
+ We want to send till requested endSeqno, but if that cannot
+ constitute a snapshot then we need to send till the point
+ which can be called as snapshot end */
+ endSeqno = std::max(
+ endSeqno,
+ static_cast<uint64_t>(rangeItr.getEarlySnapShotEnd()));
+
+ /* We want to send items only till the point it is necessary to do
+ so */
+ endSeqno =
+ std::min(endSeqno, static_cast<uint64_t>(rangeItr.back()));
+
+ /* Mark disk snapshot */
+ stream->markDiskSnapshot(startSeqno, endSeqno);
+
+ /* Change the backfill state */
+ transitionState(BackfillState::Scanning);
+
+ /* Jump to scan here itself */
+ return scan();
+ }
+ ++rangeItr;
+ }
+
+ /* Backfill is not needed as startSeqno > rangeItr end seqno */
+ complete(false);
+ return backfill_success;
+}
+
+backfill_status_t DCPBackfillMemoryBuffered::scan() {
+ if (!(stream->isActive())) {
+ /* Stop prematurely if the stream state changes */
+ complete(true);
+ return backfill_success;
+ }
+
+ /* Read items */
+ UniqueItemPtr item;
+ while (static_cast<uint64_t>(rangeItr.curr()) <= endSeqno) {
+ try {
+ item = (*rangeItr).toItem(false, getVBucketId());
+ } catch (const std::bad_alloc&) {
+ stream->getLogger().log(
+ EXTENSION_LOG_WARNING,
+ "Alloc error when trying to create an "
+ "item copy from hash table. Item seqno:%" PRIi64
+ ", vb:%" PRIu16,
+ (*rangeItr).getBySeqno(),
+ getVBucketId());
+ /* Try backfilling again later; here we snooze because system has
+ hit ENOMEM */
+ return backfill_snooze;
+ }
+
+ if (!stream->backfillReceived(
+ std::move(item), BACKFILL_FROM_MEMORY, /*force*/ false)) {
+ /* Try backfill again later; here we do not snooze because we
+ want to check if other backfills can be run by the
+ backfillMgr */
+ stream->getLogger().log(EXTENSION_LOG_WARNING,
+ "Deferring the backfill for (vb %d) as "
+ "scan buffer or backfill buffer is full",
+ getVBucketId());
+ return backfill_success;
+ }
+ ++rangeItr;
+ }
+
+ /* Backfill has ran to completion */
+ complete(false);
+
+ return backfill_success;
+}
+
+void DCPBackfillMemoryBuffered::complete(bool cancelled) {
+ uint16_t vbid = getVBucketId();
+
+ /* [EPHE TODO]: invalidate cursor sooner before it gets deleted */
+
+ stream->completeBackfill();
+
+ EXTENSION_LOG_LEVEL severity =
+ cancelled ? EXTENSION_LOG_NOTICE : EXTENSION_LOG_INFO;
+ stream->getLogger().log(severity,
+ "(vb %d) Backfill task (%" PRIu64 " to %" PRIu64
+ ") %s",
+ vbid,
+ startSeqno,
+ endSeqno,
+ cancelled ? "cancelled" : "finished");
+
+ transitionState(BackfillState::Done);
+}
+
+void DCPBackfillMemoryBuffered::transitionState(BackfillState newState) {
+ if (state == newState) {
+ return;
+ }
+
+ bool validTransition = false;
+ switch (newState) {
+ case BackfillState::Init:
+ /* Not valid to transition back to 'init' */
+ break;
+ case BackfillState::Scanning:
+ if (state == BackfillState::Init) {
+ validTransition = true;
+ }
+ break;
+ case BackfillState::Done:
+ if (state == BackfillState::Init || state == BackfillState::Scanning) {
+ validTransition = true;
+ }
+ break;
+ }
+
+ if (!validTransition) {
+ throw std::invalid_argument(
+ "DCPBackfillMemoryBuffered::transitionState:"
+ " newState (which is " +
+ backfillStateToString(newState) +
+ ") is not valid for current state (which is " +
+ backfillStateToString(state) + ")");
+ }
+
+ state = newState;
+}
+
+std::string DCPBackfillMemoryBuffered::backfillStateToString(
+ BackfillState state) {
+ switch (state) {
+ case BackfillState::Init:
+ return "initalizing";
+ case BackfillState::Scanning:
+ return "scanning";
+ case BackfillState::Done:
+ return "done";
+ }
+ return "Invalid state"; // dummy to avert certain compiler warnings
}
backfill_status_t run() override;
- uint16_t getVBucketId() override;
-
bool isStreamDead() override {
return !stream->isActive();
}
*/
std::weak_ptr<EphemeralVBucket> weakVb;
};
+
+/**
+ * Concrete class that does backfill from in-memory ordered data strucuture and
+ * informs the DCP stream of the backfill progress.
+ *
+ * This class calls one synchronous vBucket API to read items in the sequential
+ * order from the in-memory ordered data structure and calls the DCP stream
+ * for disk snapshot, backfill items and backfill completion.
+ */
+class DCPBackfillMemoryBuffered : public DCPBackfill {
+public:
+ DCPBackfillMemoryBuffered(EphemeralVBucketPtr evb,
+ const active_stream_t& s,
+ uint64_t startSeqno,
+ uint64_t endSeqno);
+
+ backfill_status_t run() override;
+
+ bool isStreamDead() override {
+ return !stream->isActive();
+ }
+
+ void cancel() override;
+
+private:
+ /* The possible states of the DCPBackfillMemoryBuffered */
+ enum class BackfillState : uint8_t { Init, Scanning, Done };
+
+ static std::string backfillStateToString(BackfillState state);
+
+ /**
+ * Creates a range iterator on Ephemeral VBucket to read items as a snapshot
+ * in sequential order. Backfill snapshot range is decided here.
+ *
+ * @param evb Ref to the ephemeral vbucket on which backfill is run
+ */
+ backfill_status_t create(EphemeralVBucket& evb);
+
+ /**
+ * Reads the items in the snapshot (iterator) one by one. In case of high
+ * memory usage postpones the reading of items, and reading can be resumed
+ * later on from that point.
+ */
+ backfill_status_t scan();
+
+ /**
+ * Indicates the completion to the stream.
+ *
+ * @param cancelled indicates if the backfill finished fully or was
+ * cancelled in between; for debug
+ */
+ void complete(bool cancelled);
+
+ /**
+ * Makes valid transitions on the backfill state machine
+ */
+ void transitionState(BackfillState newState);
+
+ /**
+ * Ensures there can be no cyclic dependency with VB pointers in the
+ * complex DCP slab of objects and tasks.
+ */
+ std::weak_ptr<EphemeralVBucket> weakVb;
+
+ BackfillState state;
+
+ /**
+ * Range iterator (on the vbucket) created for the backfill
+ */
+ SequenceList::RangeIterator rangeItr;
+};
purgeSeqno,
maxCas,
collectionsManifest),
- seqList(std::make_unique<BasicLinkedList>(i, st)) {
+ seqList(std::make_unique<BasicLinkedList>(i, st)),
+ backfillType(BackfillType::None) {
+ /* Get the flow control policy */
+ std::string dcpBackfillType = config.getDcpEphemeralBackfillType();
+ if (!dcpBackfillType.compare("buffered")) {
+ backfillType = BackfillType::Buffered;
+ }
}
size_t EphemeralVBucket::getNumItems() const {
/* create a memory backfill object */
EphemeralVBucketPtr evb =
std::static_pointer_cast<EphemeralVBucket>(shared_from_this());
- return std::make_unique<DCPBackfillMemory>(
- evb, stream, startSeqno, endSeqno);
+ if (backfillType == BackfillType::Buffered) {
+ return std::make_unique<DCPBackfillMemoryBuffered>(
+ evb, stream, startSeqno, endSeqno);
+ } else {
+ return std::make_unique<DCPBackfillMemory>(
+ evb, stream, startSeqno, endSeqno);
+ }
}
std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
return seqList->rangeRead(start, end);
}
+SequenceList::RangeIterator EphemeralVBucket::makeRangeIterator() {
+ return seqList->makeRangeIterator();
+}
+
/* Vb level backfill queue is for items in a huge snapshot (disk backfill
snapshots from DCP are typically huge) that could not be fit on a
checkpoint. They update all stats, checkpoint seqno, but are not put
std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
inMemoryBackfill(uint64_t start, uint64_t end);
+ /**
+ * Returns a range iterator for the underlying SequenceList obj
+ */
+ SequenceList::RangeIterator makeRangeIterator();
+
void dump() const override;
uint64_t getPersistenceSeqno() const override {
* (removed from seqList and deleted).
*/
EPStats::Counter seqListPurgeCount;
+
+ /**
+ * Enum indicating if the backfill is memory managed or not
+ */
+ enum class BackfillType : uint8_t { None, Buffered };
+ BackfillType backfillType;
};
using EphemeralVBucketPtr = std::shared_ptr<EphemeralVBucket>;
}
BasicLinkedList::RangeIteratorLL::RangeIteratorLL(BasicLinkedList& ll)
- : list(ll), readLockHolder(list.rangeReadLock), itrRange(0, 0) {
+ : list(ll),
+ readLockHolder(list.rangeReadLock),
+ itrRange(0, 0),
+ numRemaining(0) {
std::lock_guard<std::mutex> listWriteLg(list.getListWriteLock());
std::lock_guard<SpinLock> lh(list.rangeLock);
if (list.highSeqno < 1) {
/* Iterator to the beginning of linked list */
currIt = list.seqList.begin();
+ /* Number of items that can be iterated over */
+ numRemaining = list.seqList.size();
+
+ /* The minimum seqno in the iterator that must be read to get a consistent
+ read snapshot */
+ earlySnapShotEndSeqno = list.highestDedupedSeqno;
+
/* Mark the snapshot range on linked list. The range that can be read by the
iterator is inclusive of the start and the end. */
- list.readRange = SeqRange(currIt->getBySeqno(), list.highSeqno);
+ list.readRange =
+ SeqRange(currIt->getBySeqno(), list.seqList.back().getBySeqno());
/* Keep the range in the iterator obj. We store the range end seqno as one
higher than the end seqno that can be read by this iterator.
Further, since use the class 'SeqRange' for 'itrRange' we cannot use
curr() == end() + 1 to identify the end point because 'SeqRange' does
not internally allow curr > end */
- itrRange = SeqRange(currIt->getBySeqno(), list.highSeqno + 1);
+ itrRange = SeqRange(currIt->getBySeqno(),
+ list.seqList.back().getBySeqno() + 1);
}
BasicLinkedList::RangeIteratorLL::~RangeIteratorLL() {
std::to_string(end()));
}
+ --numRemaining;
+
/* Check if the iterator is pointing to the last element. Increment beyond
the last element indicates the end of the iteration */
if (curr() == itrRange.getEnd() - 1) {
return itrRange.getEnd();
}
+ seqno_t back() const override {
+ return itrRange.getEnd() - 1;
+ }
+
+ uint64_t count() const override {
+ return numRemaining;
+ }
+
+ seqno_t getEarlySnapShotEnd() const override {
+ return earlySnapShotEndSeqno;
+ }
+
private:
/* Ref to BasicLinkedList object which is iterated by this iterator.
By setting the member variables of the list obj appropriately we
/* Current range of the iterator */
SeqRange itrRange;
+
+ /* Number of items that can be iterated over by this (forward only)
+ iterator at that instance */
+ uint64_t numRemaining;
+
+ /* Indicates the minimum seqno in the iterator that can give a
+ consistent read snapshot */
+ seqno_t earlySnapShotEndSeqno;
};
friend class RangeIteratorLL;
return *this;
}
-seqno_t SequenceList::RangeIterator::curr() {
+seqno_t SequenceList::RangeIterator::curr() const {
return rangeIterImpl->curr();
}
-seqno_t SequenceList::RangeIterator::end() {
+seqno_t SequenceList::RangeIterator::end() const {
return rangeIterImpl->end();
}
+
+seqno_t SequenceList::RangeIterator::back() const {
+ return rangeIterImpl->back();
+}
+
+uint64_t SequenceList::RangeIterator::count() const {
+ return rangeIterImpl->count();
+}
+
+seqno_t SequenceList::RangeIterator::getEarlySnapShotEnd() const {
+ return rangeIterImpl->getEarlySnapShotEnd();
+}
* in the iterator range
*/
virtual seqno_t end() const = 0;
+
+ /**
+ * Seqno of the last item in the iterator
+ */
+ virtual seqno_t back() const = 0;
+
+ /**
+ * Returns the number of items that can be iterated over by this
+ * (forward only) iterator at that instance
+ */
+ virtual uint64_t count() const = 0;
+
+ /**
+ * Indicates the minimum seqno in the iterator that must be read to
+ * get a consistent read snapshot
+ */
+ virtual seqno_t getEarlySnapShotEnd() const = 0;
};
public:
: rangeIterImpl(std::move(other.rangeIterImpl)) {
}
+ RangeIterator& operator=(RangeIterator&& other) {
+ rangeIterImpl = std::move(other.rangeIterImpl);
+ return *this;
+ }
+
RangeIterator(RangeIterator& other) = delete;
/**
* Curr iterator position, indicated by the seqno of the item at that
* position
*/
- seqno_t curr();
+ seqno_t curr() const;
/**
- * Curr iterator position, indicated by the seqno of the item at that
- * position
+ * End position of iterator, indicated by the seqno > highest_seqno
+ * in the iterator range
+ */
+ seqno_t end() const;
+
+ /**
+ * Seqno of the last item in the iterator
+ */
+ seqno_t back() const;
+
+ /**
+ * Returns the number of items that can be iterated over by this
+ * (forward only) iterator at that instance
+ */
+ uint64_t count() const;
+
+ /**
+ * Indicates the minimum seqno in the iterator that must be read to
+ * get a consistent read snapshot
*/
- seqno_t end();
+ seqno_t getEarlySnapShotEnd() const;
private:
/* Pointer to the abstract class of range iterator implementation */
"ep_dcp_conn_buffer_size_max",
"ep_dcp_conn_buffer_size_perc",
"ep_dcp_enable_noop",
+ "ep_dcp_ephemeral_backfill_type",
"ep_dcp_flow_control_policy",
"ep_dcp_max_unacked_bytes",
"ep_dcp_min_compression_ratio",
"ep_dcp_consumer_process_buffered_messages_batch_size",
"ep_dcp_consumer_process_buffered_messages_yield_limit",
"ep_dcp_enable_noop",
+ "ep_dcp_ephemeral_backfill_type",
"ep_dcp_flow_control_policy",
"ep_dcp_idle_timeout",
"ep_dcp_max_unacked_bytes",
backfill_state_scanning state. */
exp_backfill_task_runs = 4 + num_items;
} else {
- /* In case of ephemeral bucket there are no backfill states and no
- limitations on memory used by a single backfill. Hence we expect
- one backfill run for reading all items + one post all backfilling
- is complete */
- exp_backfill_task_runs = 2;
+ if (get_str_stat(h, h1, "ep_dcp_ephemeral_backfill_type") ==
+ std::string("buffered")) {
+ /* Backfill task runs are expected as below:
+ once for backfill_state_init + once post all backfills are
+ finished. Here since we have dcp_scan_byte_limit = 100, we expect
+ the backfill task to run additional 'num_items' during
+ BackfillState::scanning state. */
+ exp_backfill_task_runs = 2 + num_items;
+ } else {
+ /* In case of ephemeral bucket there are no backfill states and no
+ limitations on memory used by a single backfill. Hence we expect
+ one backfill run for reading all items + one post all backfilling
+ is complete */
+ exp_backfill_task_runs = 2;
+ }
}
checkeq(exp_backfill_task_runs,
get_histo_stat(h, h1, "BackfillManagerTask", "runtimes",
return makeResponseFromItem(item);
}
- void waitForStreamClose() {
- while (getState() != StreamState::Dead) {
- usleep(10);
- }
- }
-
/**
* Consumes numItems from the stream readyQ
*/
items are read correctly */
}
-/* Test a backfill fail scenario */
-TEST_P(StreamTest, BackfillFail) {
- if (bucketType == "persistent") {
- /* This test simulates a backfill failure for an ephemeral bucket
- only. */
- return;
- }
-
- /* Add 3 items */
- int numItems = 3;
- for (int i = 0; i < numItems; ++i) {
- std::string key("key" + std::to_string(i));
- store_item(vbid, key, "value");
- }
-
- /* Set up a DCP stream for the backfill */
- setup_dcp_stream();
- MockActiveStream* mock_stream =
- dynamic_cast<MockActiveStream*>(stream.get());
-
- /* We want the backfill task to run in a background thread */
- ExecutorPool::get()->setNumAuxIO(1);
-
- /* Schedule a backfill task with an incorrect param:
- here backfill_start > vb_highSeqno */
- producer->scheduleBackfillManager(*vb0,
- mock_stream,
- /*backfillStart*/numItems + 5,
- /*backfillEnd*/numItems + 10);
-
- /* Expect stream to be closed */
- mock_stream->waitForStreamClose();
-}
-
/* Stream items from a DCP backfill with very small backfill buffer.
However small the backfill buffer is, backfill must not stop, it must
proceed to completion eventually */