}
/* Get sequence of items (backfill) from memory */
- std::vector<UniqueItemPtr> items;
ENGINE_ERROR_CODE status;
- std::tie(status, items) = evb->inMemoryBackfill(startSeqno, endSeqno);
+ std::vector<UniqueItemPtr> items;
+ seqno_t adjustedEndSeqno;
+ std::tie(status, items, adjustedEndSeqno) =
+ evb->inMemoryBackfill(startSeqno, endSeqno);
/* Handle any failures */
if (status != ENGINE_SUCCESS) {
stream->incrBackfillRemaining(items.size());
/* Mark disk snapshot */
- stream->markDiskSnapshot(items.front()->getBySeqno(),
- items.back()->getBySeqno());
+ stream->markDiskSnapshot(startSeqno, adjustedEndSeqno);
/* Move every item to the stream */
for (auto& item : items) {
evb, stream, startSeqno, endSeqno);
}
-std::pair<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>>
+std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
EphemeralVBucket::inMemoryBackfill(uint64_t start, uint64_t end) {
return seqList->rangeRead(start, end);
}
uint64_t endSeqno) override;
/**
- * Reads backfill items from in memory ordered data structure
+ * Reads backfill items from in memory ordered data structure.
+ *
+ * Because the backfill may have to be extended to ensure consistency (e.g.,
+ * an item in the range has been updated and the new version is
+ * outside of the original range would result in a missing item), the
+ * end of the range may be at a higher seqno than was requested; this new
+ * end value is returned.
*
* @param startSeqno requested start sequence number of the backfill
* @param endSeqno requested end sequence number of the backfill
*
- * @return ENGINE_SUCCESS and items in the snapshot
+ * @return ENGINE_SUCCESS, items in the snapshot, adjusted endSeqno
* ENGINE_ENOMEM on no memory to copy items
* ENGINE_ERANGE on incorrect start and end
*/
- std::pair<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>> inMemoryBackfill(
- uint64_t start, uint64_t end);
+ std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
+ inMemoryBackfill(uint64_t start, uint64_t end);
void dump() const override;
return UpdateStatus::Success;
}
-std::pair<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>>
+std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
BasicLinkedList::rangeRead(seqno_t start, seqno_t end) {
std::vector<UniqueItemPtr> empty;
if ((start > end) || (start <= 0)) {
vbid,
start,
end);
- return {ENGINE_ERANGE, std::move(empty)
- /* MSVC not happy with std::vector<UniqueItemPtr>() */};
+ return std::make_tuple(
+ ENGINE_ERANGE,
+ std::move(empty)
+ /* MSVC not happy with std::vector<UniqueItemPtr>() */,
+ 0);
}
/* Allows only 1 rangeRead for now */
static_cast<seqno_t>(highSeqno));
/* If the request is for an invalid range, return before iterating
through the list */
- return {ENGINE_ERANGE, std::move(empty)};
+ return std::make_tuple(ENGINE_ERANGE, std::move(empty), 0);
}
/* Mark the initial read range */
"item with seqno %" PRIi64 "before streaming it",
vbid,
osv.getBySeqno());
- return {ENGINE_ENOMEM, std::move(empty)};
+ return std::make_tuple(ENGINE_ENOMEM, std::move(empty), 0);
}
}
}
/* Return all the range read items */
- return {ENGINE_SUCCESS, std::move(items)};
+ return std::make_tuple(ENGINE_SUCCESS, std::move(items), end);
}
void BasicLinkedList::updateHighSeqno(
std::lock_guard<std::mutex>& seqLock,
OrderedStoredValue& v) override;
- std::pair<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>> rangeRead(
- seqno_t start, seqno_t end) override;
+ std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
+ rangeRead(seqno_t start, seqno_t end) override;
void updateHighSeqno(std::lock_guard<std::mutex>& highSeqnoLock,
const OrderedStoredValue& v) override;
* @param start requested start seqno
* @param end requested end seqno
*
- * @return ENGINE_SUCCESS and items in the snapshot
+ * @return ENGINE_SUCCESS, items in the snapshot and adjusted endSeqNo
* ENGINE_ENOMEM on no memory to copy items
* ENGINE_ERANGE on incorrect start and end
*/
- virtual std::pair<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>> rangeRead(
- seqno_t start, seqno_t end) = 0;
+ virtual std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
+ rangeRead(seqno_t start, seqno_t end) = 0;
/**
* Updates the highSeqno in the list. Since seqno is generated and managed
/* Now do a range read */
ENGINE_ERROR_CODE status;
std::vector<UniqueItemPtr> items;
- std::tie(status, items) = basicLL->rangeRead(1, numItems);
+ seqno_t endSeqno;
+ std::tie(status, items, endSeqno) = basicLL->rangeRead(1, numItems);
EXPECT_EQ(ENGINE_SUCCESS, status);
EXPECT_EQ(numItems, items.size());
EXPECT_EQ(numItems, items.back()->getBySeqno());
+ EXPECT_EQ(numItems, endSeqno);
}
TEST_F(BasicLinkedListTest, TestRangeReadTillInf) {
/* Now do a range read */
ENGINE_ERROR_CODE status;
std::vector<UniqueItemPtr> items;
- std::tie(status, items) =
+ seqno_t endSeqno;
+ std::tie(status, items, endSeqno) =
basicLL->rangeRead(1, std::numeric_limits<seqno_t>::max());
EXPECT_EQ(ENGINE_SUCCESS, status);
EXPECT_EQ(numItems, items.size());
EXPECT_EQ(numItems, items.back()->getBySeqno());
+ EXPECT_EQ(numItems, endSeqno);
}
TEST_F(BasicLinkedListTest, TestRangeReadFromMid) {
/* Now do a range read */
ENGINE_ERROR_CODE status;
std::vector<UniqueItemPtr> items;
- std::tie(status, items) = basicLL->rangeRead(2, numItems);
+ seqno_t endSeqno;
+ std::tie(status, items, endSeqno) = basicLL->rangeRead(2, numItems);
EXPECT_EQ(ENGINE_SUCCESS, status);
EXPECT_EQ(numItems - 1, items.size());
EXPECT_EQ(numItems, items.back()->getBySeqno());
+ EXPECT_EQ(numItems, endSeqno);
}
TEST_F(BasicLinkedListTest, TestRangeReadStopBeforeEnd) {
/* Now request for a range read of just 2 items */
ENGINE_ERROR_CODE status;
std::vector<UniqueItemPtr> items;
- std::tie(status, items) = basicLL->rangeRead(1, numItems - 1);
+ seqno_t endSeqno;
+ std::tie(status, items, endSeqno) = basicLL->rangeRead(1, numItems - 1);
EXPECT_EQ(ENGINE_SUCCESS, status);
EXPECT_EQ(numItems - 1, items.size());
EXPECT_EQ(numItems - 1, items.back()->getBySeqno());
+ EXPECT_EQ(numItems - 1, endSeqno);
}
TEST_F(BasicLinkedListTest, TestRangeReadNegatives) {
std::vector<UniqueItemPtr> items;
/* Now do a range read with start > end */
- std::tie(status, items) = basicLL->rangeRead(2, 1);
+ std::tie(status, items, std::ignore) = basicLL->rangeRead(2, 1);
EXPECT_EQ(ENGINE_ERANGE, status);
/* Now do a range read with start > highSeqno */
- std::tie(status, items) = basicLL->rangeRead(numItems + 1, numItems + 2);
+ std::tie(status, items, std::ignore) =
+ basicLL->rangeRead(numItems + 1, numItems + 2);
EXPECT_EQ(ENGINE_ERANGE, status);
}
#include "test_helpers.h"
#include <gtest/gtest.h>
+#include <dcp/backfill_memory.h>
/*
* Mock of the DcpConnMap class. Wraps the real DcpConnMap, but exposes
producer.get()->closeAllStreams();
}
+/* MB-24159 - Test to confirm a dcp stream backfill from an ephemeral bucket
+ * over a range which includes /no/ items doesn't cause the producer to
+ * segfault.
+ * */
+
+TEST_P(StreamTest, backfillGetsNoItems) {
+ if (engine->getConfiguration().getBucketType() == "ephemeral") {
+ setup_dcp_stream(DcpProducer::MutationType::KeyOnly);
+ store_item(vbid, "key", "value1");
+ store_item(vbid, "key", "value2");
+
+ active_stream_t aStream = dynamic_cast<ActiveStream *>(stream.get());
+
+ auto evb = std::shared_ptr<EphemeralVBucket>(
+ std::dynamic_pointer_cast<EphemeralVBucket>(vb0));
+ auto dcpbfm = DCPBackfillMemory(evb, aStream, 1, 1);
+ dcpbfm.run();
+ producer.get()->closeAllStreams();
+ }
+}
+
/*
* Test that when have a producer with MutationType set to KeyAndValue an active
* stream created via a streamRequest returns false for isKeyOnly.
setMany(keys, MutationStatus::WasClean);
auto res = mockEpheVB->inMemoryBackfill(1, numItems);
- EXPECT_EQ(ENGINE_SUCCESS, res.first);
- EXPECT_EQ(numItems, res.second.size());
+ EXPECT_EQ(ENGINE_SUCCESS, std::get<0>(res));
+ EXPECT_EQ(numItems, std::get<1>(res).size());
}
TEST_F(EphemeralVBucketTest, UpdateDuringBackfill) {