std::vector<UniqueItemPtr> items;
for (const auto& osv : seqList) {
- if (osv.getBySeqno() > end) {
- /* we are done */
+ int64_t currSeqno(osv.getBySeqno());
+
+ if (currSeqno > end || currSeqno < 0) {
+ /* We have read all the items in the requested range, or the osv
+ * does not yet have a valid seqno; either way we are done */
break;
}
{
std::lock_guard<SpinLock> lh(rangeLock);
- readRange.setBegin(osv.getBySeqno()); /* [EPHE TODO]: should we
+ readRange.setBegin(currSeqno); /* [EPHE TODO]: should we
update the min every time ?
*/
}
- if (osv.getBySeqno() < start) {
+ if (currSeqno < start) {
/* skip this item */
continue;
}
"(vb %d) ENOMEM while trying to copy "
"item with seqno %" PRIi64 "before streaming it",
vbid,
- osv.getBySeqno());
+ currSeqno);
return std::make_tuple(ENGINE_ENOMEM, std::move(empty), 0);
}
}
EXPECT_EQ(numItems * (updateIterations + 1),
std::get<2>(res)); // extended end of readRange
}
+
+TEST_F(EphemeralVBucketTest, RangeReadStopsOnInvalidSeqno) {
+ /* MB-24376: rangeRead has to stop if it encounters an OSV with a seqno of
+ * -1; this item is definitely past the end of the rangeRead, and has not
+ * yet had its seqno updated in queueDirty */
+ const int numItems = 2;
+
+ // store two items
+ auto keys = generateKeys(numItems);
+ setMany(keys, MutationStatus::WasClean);
+
+ auto lastKey = makeStoredDocKey(std::to_string(numItems + 1));
+ Item i(lastKey, 0, 0, lastKey.data(), lastKey.size());
+
+ // set item with no queueItemCtx - will not be queueDirty'd, and will
+ // keep seqno -1
+ EXPECT_EQ(MutationStatus::WasClean,
+ public_processSet(i, i.getCas(), false));
+
+ EXPECT_EQ(-1, mockEpheVB->getLL()->getSeqList().back().getBySeqno());
+
+ auto res = mockEpheVB->inMemoryBackfill(
+ 1, std::numeric_limits<seqno_t>::max());
+
+ EXPECT_EQ(ENGINE_SUCCESS, std::get<0>(res));
+ EXPECT_EQ(numItems, std::get<1>(res).size());
+ EXPECT_EQ(numItems, std::get<2>(res));
+}
\ No newline at end of file
return std::make_pair(std::move(hbl), storedVal);
}
-MutationStatus VBucketTest::public_processSet(Item& itm, const uint64_t cas) {
+MutationStatus VBucketTest::public_processSet(Item& itm,
+ const uint64_t cas,
+ bool withCtx) {
auto hbl_sv = lockAndFind(itm.getKey());
VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
GenerateCas::No,
cas,
true,
false,
- &queueItmCtx)
+ withCtx ? &queueItmCtx : nullptr)
.first;
}
std::pair<HashTable::HashBucketLock, StoredValue*> lockAndFind(
const StoredDocKey& key);
- MutationStatus public_processSet(Item& itm, const uint64_t cas);
+ MutationStatus public_processSet(Item& itm,
+ const uint64_t cas,
+ bool withCtx = true);
AddStatus public_processAdd(Item& itm);