1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2017 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #include "linked_list.h"
21 BasicLinkedList::BasicLinkedList(uint16_t vbucketId, EPStats& st)
27 highestDedupedSeqno(0),
28 highestPurgedDeletedSeqno(0),
35 BasicLinkedList::~BasicLinkedList() {
36 /* Delete stale items here, other items are deleted by the hash
38 std::lock_guard<std::mutex> writeGuard(getListWriteLock());
39 seqList.remove_and_dispose_if(
40 [&writeGuard](const OrderedStoredValue& v) {
41 return v.isStale(writeGuard);
43 [this](OrderedStoredValue* v) {
44 this->st.currentSize.fetch_sub(v->metaDataSize());
48 /* Erase all the list elements (does not destroy elements, just removes
49 them from the list) */
53 void BasicLinkedList::appendToList(std::lock_guard<std::mutex>& seqLock,
54 std::lock_guard<std::mutex>& writeLock,
55 OrderedStoredValue& v) {
59 SequenceList::UpdateStatus BasicLinkedList::updateListElem(
60 std::lock_guard<std::mutex>& seqLock,
61 std::lock_guard<std::mutex>& writeLock,
62 OrderedStoredValue& v) {
63 /* Lock that needed for consistent read of SeqRange 'readRange' */
64 std::lock_guard<SpinLock> lh(rangeLock);
66 if (readRange.fallsInRange(v.getBySeqno())) {
67 /* Range read is in middle of a point-in-time snapshot, hence we cannot
68 move the element to the end of the list. Return a temp failure */
69 return UpdateStatus::Append;
72 /* Since there is no other reads or writes happenning in this range, we can
73 move the item to the end of the list */
74 auto it = seqList.iterator_to(v);
78 return UpdateStatus::Success;
81 std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
82 BasicLinkedList::rangeRead(seqno_t start, seqno_t end) {
83 std::vector<UniqueItemPtr> empty;
84 if ((start > end) || (start <= 0)) {
85 LOG(EXTENSION_LOG_WARNING,
86 "BasicLinkedList::rangeRead(): "
87 "(vb:%d) ERANGE: start %" PRIi64 " > end %" PRIi64,
91 return std::make_tuple(
94 /* MSVC not happy with std::vector<UniqueItemPtr>() */,
98 /* Allows only 1 rangeRead for now */
99 std::lock_guard<std::mutex> lckGd(rangeReadLock);
102 std::lock_guard<std::mutex> listWriteLg(getListWriteLock());
103 std::lock_guard<SpinLock> lh(rangeLock);
104 if (start > highSeqno) {
105 LOG(EXTENSION_LOG_WARNING,
106 "BasicLinkedList::rangeRead(): "
107 "(vb:%d) ERANGE: start %" PRIi64 " > highSeqno %" PRIi64,
110 static_cast<seqno_t>(highSeqno));
111 /* If the request is for an invalid range, return before iterating
113 return std::make_tuple(ENGINE_ERANGE, std::move(empty), 0);
116 /* Mark the initial read range */
117 end = std::min(end, static_cast<seqno_t>(highSeqno));
118 end = std::max(end, static_cast<seqno_t>(highestDedupedSeqno));
119 readRange = SeqRange(1, end);
122 /* Read items in the range */
123 std::vector<UniqueItemPtr> items;
125 for (const auto& osv : seqList) {
126 if (osv.getBySeqno() > end) {
132 std::lock_guard<SpinLock> lh(rangeLock);
133 readRange.setBegin(osv.getBySeqno()); /* [EPHE TODO]: should we
134 update the min every time ?
138 if (osv.getBySeqno() < start) {
143 /* Check if this OSV has been made stale and has been superseded by a
144 * newer version. If it has, and the replacement is /also/ in the range
145 * we are reading, we should skip this item to avoid duplicates */
146 StoredValue* replacement;
148 std::lock_guard<std::mutex> writeGuard(getListWriteLock());
149 replacement = osv.getReplacementIfStale(writeGuard);
153 replacement->toOrderedStoredValue()->getBySeqno() <= end) {
158 items.push_back(UniqueItemPtr(osv.toItem(false, vbid)));
159 } catch (const std::bad_alloc&) {
160 /* [EPHE TODO]: Do we handle backfill in a better way ?
161 Perhaps do backfilling partially (that is
162 backfill ==> stream; backfill ==> stream ..so on )?
164 LOG(EXTENSION_LOG_WARNING,
165 "BasicLinkedList::rangeRead(): "
166 "(vb %d) ENOMEM while trying to copy "
167 "item with seqno %" PRIi64 "before streaming it",
170 return std::make_tuple(ENGINE_ENOMEM, std::move(empty), 0);
174 /* Done with range read, reset the range */
176 std::lock_guard<SpinLock> lh(rangeLock);
180 /* Return all the range read items */
181 return std::make_tuple(ENGINE_SUCCESS, std::move(items), end);
184 void BasicLinkedList::updateHighSeqno(std::lock_guard<std::mutex>& listWriteLg,
185 const OrderedStoredValue& v) {
186 highSeqno = v.getBySeqno();
188 void BasicLinkedList::updateHighestDedupedSeqno(
189 std::lock_guard<std::mutex>& listWriteLg, const OrderedStoredValue& v) {
190 highestDedupedSeqno = v.getBySeqno();
193 void BasicLinkedList::markItemStale(std::lock_guard<std::mutex>& listWriteLg,
194 StoredValue::UniquePtr ownedSv,
195 StoredValue* newSv) {
196 /* Release the StoredValue as BasicLinkedList does not want it to be of
198 StoredValue* v = ownedSv.release();
200 /* Update the stats tracking the memory owned by the list */
201 staleSize.fetch_add(v->size());
202 staleMetaDataSize.fetch_add(v->metaDataSize());
203 st.currentSize.fetch_add(v->metaDataSize());
206 v->toOrderedStoredValue()->markStale(listWriteLg, newSv);
209 size_t BasicLinkedList::purgeTombstones() {
210 // Purge items marked as stale from the seqList.
212 // Strategy - we try to ensure that this function does not block
213 // frontend-writes (adding new OrderedStoredValues (OSVs) to the seqList).
214 // To achieve this (safely),
215 // we (try to) acquire the rangeReadLock and setup a 'read' range for the
216 // whole of the seqList. This prevents any other readers from iterating
217 // the list (and accessing stale items) while we purge on it; but permits
218 // front-end operations to continue as they:
219 // a) Only read/modify non-stale items (we only change stale items) and
220 // b) Do not change the list membership of anything within the read-range.
221 // However, we do need to be careful about what members of OSVs we access
222 // here - the only OSVs we can safely access are ones marked stale as they
223 // are no longer in the HashTable (and hence subject to HashTable locks).
224 // To check if an item is stale we need to acquire the writeLock
225 // (OSV::stale is guarded by it) for each list item. While this isn't
226 // ideal (that's the same lock needed by front-end operations), we can
227 // release the lock between each element so front-end operations can
228 // have the opportunity to acquire it.
230 // Attempt to acquire the readRangeLock, to block anyone else concurrently
231 // reading from the list while we remove elements from it.
232 std::unique_lock<std::mutex> rrGuard(rangeReadLock, std::try_to_lock);
234 // If we cannot acquire the lock then another thread is
235 // running a range read. Given these are typically long-running,
236 // return without blocking.
240 // Determine the start and end iterators.
241 OrderedLL::iterator startIt;
242 OrderedLL::iterator endIt;
244 std::lock_guard<std::mutex> writeGuard(getListWriteLock());
245 if (seqList.empty()) {
246 // Nothing in sequence list - nothing to purge.
249 // Determine the {start} and {end} iterator (inclusive). Note
250 // that (at most) one item is added to the seqList before its
251 // sequence number is set (as the seqno comes from Ckpt
252 // manager); if that is the case (such an item is guaranteed
253 // to not be stale), then move end to the previous item
254 // (i.e. we don't consider this "in-flight" item), as long as
255 // there is at least two elements.
256 startIt = seqList.begin();
257 endIt = std::prev(seqList.end());
258 // Need rangeLock for highSeqno & readRange
259 std::lock_guard<SpinLock> rangeGuard(rangeLock);
260 if ((startIt != endIt) && (!endIt->isStale(writeGuard))) {
261 endIt = std::prev(endIt);
263 readRange = SeqRange(startIt->getBySeqno(), endIt->getBySeqno());
266 // Iterate across all but the last item in the seqList, looking
268 // Note the for() loop terminates one element before endIt - we
269 // actually want an inclusive iteration but as we are comparing
270 // essentially random addresses (and we don't want to 'touch' the
271 // element after endIt), we loop to one before endIt, then handle
272 // endIt explicilty at the end.
273 // Note(2): Iterator is manually incremented outside the for() loop as it
274 // is invalidated when we erase items.
275 size_t purgedCount = 0;
277 for (auto it = startIt; it != endIt;) {
279 std::lock_guard<std::mutex> writeGuard(getListWriteLock());
280 stale = it->isStale(writeGuard);
282 // Only stale items are purged.
288 // Checks pass, remove from list and delete.
289 it = purgeListElem(it);
292 // Handle the last element.
294 std::lock_guard<std::mutex> writeGuard(getListWriteLock());
295 stale = endIt->isStale(writeGuard);
298 purgeListElem(endIt);
302 // Complete; reset the readRange.
304 std::lock_guard<SpinLock> lh(rangeLock);
310 void BasicLinkedList::updateNumDeletedItems(bool oldDeleted, bool newDeleted) {
311 if (oldDeleted && !newDeleted) {
313 } else if (!oldDeleted && newDeleted) {
318 uint64_t BasicLinkedList::getNumStaleItems() const {
319 return numStaleItems;
322 size_t BasicLinkedList::getStaleValueBytes() const {
326 size_t BasicLinkedList::getStaleMetadataBytes() const {
327 return staleMetaDataSize;
330 uint64_t BasicLinkedList::getNumDeletedItems() const {
331 std::lock_guard<std::mutex> lckGd(getListWriteLock());
332 return numDeletedItems;
335 uint64_t BasicLinkedList::getNumItems() const {
336 std::lock_guard<std::mutex> lckGd(getListWriteLock());
337 return seqList.size();
340 uint64_t BasicLinkedList::getHighSeqno() const {
341 std::lock_guard<std::mutex> lckGd(getListWriteLock());
345 uint64_t BasicLinkedList::getHighestDedupedSeqno() const {
346 std::lock_guard<std::mutex> lckGd(getListWriteLock());
347 return highestDedupedSeqno;
350 seqno_t BasicLinkedList::getHighestPurgedDeletedSeqno() const {
351 return highestPurgedDeletedSeqno;
354 uint64_t BasicLinkedList::getRangeReadBegin() const {
355 std::lock_guard<SpinLock> lh(rangeLock);
356 return readRange.getBegin();
359 uint64_t BasicLinkedList::getRangeReadEnd() const {
360 std::lock_guard<SpinLock> lh(rangeLock);
361 return readRange.getEnd();
363 std::mutex& BasicLinkedList::getListWriteLock() const {
367 SequenceList::RangeIterator BasicLinkedList::makeRangeIterator() {
368 return SequenceList::RangeIterator(
369 std::make_unique<RangeIteratorLL>(*this));
372 void BasicLinkedList::dump() const {
373 std::cerr << *this << std::endl;
376 std::ostream& operator <<(std::ostream& os, const BasicLinkedList& ll) {
377 os << "BasicLinkedList[" << &ll << "] with numItems:" << ll.seqList.size()
378 << " deletedItems:" << ll.numDeletedItems
379 << " staleItems:" << ll.getNumStaleItems()
380 << " highPurgeSeqno:" << ll.getHighestPurgedDeletedSeqno()
381 << " elements:[" << std::endl;
383 for (const auto& val : ll.seqList) {
384 os << " " << val << std::endl;
387 os << "] (count:" << count << ")";
391 OrderedLL::iterator BasicLinkedList::purgeListElem(OrderedLL::iterator it) {
392 StoredValue::UniquePtr purged(&*it);
394 std::lock_guard<std::mutex> lckGd(getListWriteLock());
395 it = seqList.erase(it);
398 /* Update the stats tracking the memory owned by the list */
399 staleSize.fetch_sub(purged->size());
400 staleMetaDataSize.fetch_sub(purged->metaDataSize());
401 st.currentSize.fetch_sub(purged->metaDataSize());
403 // Similary for the item counts:
405 if (purged->isDeleted()) {
409 if (purged->isDeleted()) {
410 highestPurgedDeletedSeqno = std::max(seqno_t(highestPurgedDeletedSeqno),
411 purged->getBySeqno());
416 BasicLinkedList::RangeIteratorLL::RangeIteratorLL(BasicLinkedList& ll)
418 readLockHolder(list.rangeReadLock),
421 std::lock_guard<std::mutex> listWriteLg(list.getListWriteLock());
422 std::lock_guard<SpinLock> lh(list.rangeLock);
423 if (list.highSeqno < 1) {
424 /* No need of holding a lock for the snapshot as there are no items;
425 Also iterator range is at default (0, 0) */
426 readLockHolder.unlock();
430 /* Iterator to the beginning of linked list */
431 currIt = list.seqList.begin();
433 /* Number of items that can be iterated over */
434 numRemaining = list.seqList.size();
436 /* The minimum seqno in the iterator that must be read to get a consistent
438 earlySnapShotEndSeqno = list.highestDedupedSeqno;
440 /* Mark the snapshot range on linked list. The range that can be read by the
441 iterator is inclusive of the start and the end. */
443 SeqRange(currIt->getBySeqno(), list.seqList.back().getBySeqno());
445 /* Keep the range in the iterator obj. We store the range end seqno as one
446 higher than the end seqno that can be read by this iterator.
447 This is because, we must identify the end point of the iterator, and
448 we the read is inclusive of the end points of list.readRange.
450 Further, since use the class 'SeqRange' for 'itrRange' we cannot use
451 curr() == end() + 1 to identify the end point because 'SeqRange' does
452 not internally allow curr > end */
453 itrRange = SeqRange(currIt->getBySeqno(),
454 list.seqList.back().getBySeqno() + 1);
457 BasicLinkedList::RangeIteratorLL::~RangeIteratorLL() {
458 std::lock_guard<SpinLock> lh(list.rangeLock);
459 list.readRange.reset();
460 /* As readLockHolder goes out of scope here, it will automatically release
461 the snapshot read lock on the linked list */
464 OrderedStoredValue& BasicLinkedList::RangeIteratorLL::operator*() const {
465 if (curr() >= end()) {
466 /* We can't read beyond the range end */
467 throw std::out_of_range(
468 "BasicLinkedList::RangeIteratorLL::operator*()"
469 ": Trying to read beyond range end seqno " +
470 std::to_string(end()));
475 BasicLinkedList::RangeIteratorLL& BasicLinkedList::RangeIteratorLL::
477 if (curr() >= end()) {
478 throw std::out_of_range(
479 "BasicLinkedList::RangeIteratorLL::operator++()"
480 ": Trying to move the iterator beyond range end"
482 std::to_string(end()));
487 /* Check if the iterator is pointing to the last element. Increment beyond
488 the last element indicates the end of the iteration */
489 if (curr() == itrRange.getEnd() - 1) {
490 std::lock_guard<SpinLock> lh(list.rangeLock);
491 /* Release snapshot read lock on the linked list */
492 list.readRange.reset();
493 readLockHolder.unlock();
495 /* Update the begin to end() so the client can see that the iteration
497 itrRange.setBegin(end());
503 /* As the iterator moves we reduce the snapshot range being read on the
504 linked list. This helps reduce the stale items in the list during
505 heavy update load from the front end */
506 std::lock_guard<SpinLock> lh(list.rangeLock);
507 list.readRange.setBegin(currIt->getBySeqno());
510 /* Also update the current range stored in the iterator obj */
511 itrRange.setBegin(currIt->getBySeqno());