1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2017 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #include "linked_list.h"
21 BasicLinkedList::BasicLinkedList(uint16_t vbucketId, EPStats& st)
27 highestDedupedSeqno(0),
28 highestPurgedDeletedSeqno(0),
35 BasicLinkedList::~BasicLinkedList() {
36 /* Delete stale items here, other items are deleted by the hash
38 std::lock_guard<std::mutex> writeGuard(writeLock);
39 seqList.remove_and_dispose_if(
40 [&writeGuard](const OrderedStoredValue& v) {
41 return v.isStale(writeGuard);
43 [this](OrderedStoredValue* v) {
44 this->st.currentSize.fetch_sub(v->metaDataSize());
48 /* Erase all the list elements (does not destroy elements, just removes
49 them from the list) */
53 void BasicLinkedList::appendToList(std::lock_guard<std::mutex>& seqLock,
54 OrderedStoredValue& v) {
55 /* Allow only one write to the list at a time */
56 std::lock_guard<std::mutex> lckGd(writeLock);
61 SequenceList::UpdateStatus BasicLinkedList::updateListElem(
62 std::lock_guard<std::mutex>& seqLock, OrderedStoredValue& v) {
63 /* Allow only one write to the list at a time */
64 std::lock_guard<std::mutex> lckGd(writeLock);
66 /* Lock that needed for consistent read of SeqRange 'readRange' */
67 std::lock_guard<SpinLock> lh(rangeLock);
69 if (readRange.fallsInRange(v.getBySeqno())) {
70 /* Range read is in middle of a point-in-time snapshot, hence we cannot
71 move the element to the end of the list. Return a temp failure */
72 return UpdateStatus::Append;
75 /* Since there is no other reads or writes happenning in this range, we can
76 move the item to the end of the list */
77 auto it = seqList.iterator_to(v);
81 return UpdateStatus::Success;
84 std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
85 BasicLinkedList::rangeRead(seqno_t start, seqno_t end) {
86 std::vector<UniqueItemPtr> empty;
87 if ((start > end) || (start <= 0)) {
88 LOG(EXTENSION_LOG_WARNING,
89 "BasicLinkedList::rangeRead(): "
90 "(vb:%d) ERANGE: start %" PRIi64 " > end %" PRIi64,
94 return std::make_tuple(
97 /* MSVC not happy with std::vector<UniqueItemPtr>() */,
101 /* Allows only 1 rangeRead for now */
102 std::lock_guard<std::mutex> lckGd(rangeReadLock);
105 std::lock_guard<SpinLock> lh(rangeLock);
106 if (start > highSeqno) {
107 LOG(EXTENSION_LOG_WARNING,
108 "BasicLinkedList::rangeRead(): "
109 "(vb:%d) ERANGE: start %" PRIi64 " > highSeqno %" PRIi64,
112 static_cast<seqno_t>(highSeqno));
113 /* If the request is for an invalid range, return before iterating
115 return std::make_tuple(ENGINE_ERANGE, std::move(empty), 0);
118 /* Mark the initial read range */
119 end = std::min(end, static_cast<seqno_t>(highSeqno));
120 end = std::max(end, static_cast<seqno_t>(highestDedupedSeqno));
121 readRange = SeqRange(1, end);
124 /* Read items in the range */
125 std::vector<UniqueItemPtr> items;
127 for (const auto& osv : seqList) {
128 if (osv.getBySeqno() > end) {
134 std::lock_guard<SpinLock> lh(rangeLock);
135 readRange.setBegin(osv.getBySeqno()); /* [EPHE TODO]: should we
136 update the min every time ?
140 if (osv.getBySeqno() < start) {
146 items.push_back(UniqueItemPtr(osv.toItem(false, vbid)));
147 } catch (const std::bad_alloc&) {
148 /* [EPHE TODO]: Do we handle backfill in a better way ?
149 Perhaps do backfilling partially (that is
150 backfill ==> stream; backfill ==> stream ..so on )?
152 LOG(EXTENSION_LOG_WARNING,
153 "BasicLinkedList::rangeRead(): "
154 "(vb %d) ENOMEM while trying to copy "
155 "item with seqno %" PRIi64 "before streaming it",
158 return std::make_tuple(ENGINE_ENOMEM, std::move(empty), 0);
162 /* Done with range read, reset the range */
164 std::lock_guard<SpinLock> lh(rangeLock);
168 /* Return all the range read items */
169 return std::make_tuple(ENGINE_SUCCESS, std::move(items), end);
172 void BasicLinkedList::updateHighSeqno(
173 std::lock_guard<std::mutex>& highSeqnoLock,
174 const OrderedStoredValue& v) {
175 highSeqno = v.getBySeqno();
177 void BasicLinkedList::updateHighestDedupedSeqno(
178 std::lock_guard<std::mutex>& highSeqnoLock,
179 const OrderedStoredValue& v) {
180 highestDedupedSeqno = v.getBySeqno();
183 void BasicLinkedList::markItemStale(StoredValue::UniquePtr ownedSv) {
184 /* Release the StoredValue as BasicLinkedList does not want it to be of
186 StoredValue* v = ownedSv.release();
188 /* Update the stats tracking the memory owned by the list */
189 staleSize.fetch_add(v->size());
190 staleMetaDataSize.fetch_add(v->metaDataSize());
191 st.currentSize.fetch_add(v->metaDataSize());
195 std::lock_guard<std::mutex> writeGuard(writeLock);
196 v->toOrderedStoredValue()->markStale(writeGuard);
200 size_t BasicLinkedList::purgeTombstones() {
201 // Purge items marked as stale from the seqList.
203 // Strategy - we try to ensure that this function does not block
204 // frontend-writes (adding new OrderedStoredValues (OSVs) to the seqList).
205 // To achieve this (safely),
206 // we (try to) acquire the rangeReadLock and setup a 'read' range for the
207 // whole of the seqList. This prevents any other readers from iterating
208 // the list (and accessing stale items) while we purge on it; but permits
209 // front-end operations to continue as they:
210 // a) Only read/modify non-stale items (we only change stale items) and
211 // b) Do not change the list membership of anything within the read-range.
212 // However, we do need to be careful about what members of OSVs we access
213 // here - the only OSVs we can safely access are ones marked stale as they
214 // are no longer in the HashTable (and hence subject to HashTable locks).
215 // To check if an item is stale we need to acquire the writeLock
216 // (OSV::stale is guarded by it) for each list item. While this isn't
217 // ideal (that's the same lock needed by front-end operations), we can
218 // release the lock between each element so front-end operations can
219 // have the opportunity to acquire it.
221 // Attempt to acquire the readRangeLock, to block anyone else concurrently
222 // reading from the list while we remove elements from it.
223 std::unique_lock<std::mutex> rrGuard(rangeReadLock, std::try_to_lock);
225 // If we cannot acquire the lock then another thread is
226 // running a range read. Given these are typically long-running,
227 // return without blocking.
231 // Determine the start and end iterators.
232 OrderedLL::iterator startIt;
233 OrderedLL::iterator endIt;
235 std::lock_guard<std::mutex> writeGuard(writeLock);
236 if (seqList.empty()) {
237 // Nothing in sequence list - nothing to purge.
240 // Determine the {start} and {end} iterator (inclusive). Note
241 // that (at most) one item is added to the seqList before its
242 // sequence number is set (as the seqno comes from Ckpt
243 // manager); if that is the case (such an item is guaranteed
244 // to not be stale), then move end to the previous item
245 // (i.e. we don't consider this "in-flight" item), as long as
246 // there is at least two elements.
247 startIt = seqList.begin();
248 endIt = std::prev(seqList.end());
249 // Need rangeLock for highSeqno & readRange
250 std::lock_guard<SpinLock> rangeGuard(rangeLock);
251 if ((startIt != endIt) && (!endIt->isStale(writeGuard))) {
252 endIt = std::prev(endIt);
254 readRange = SeqRange(startIt->getBySeqno(), endIt->getBySeqno());
257 // Iterate across all but the last item in the seqList, looking
259 // Note the for() loop terminates one element before endIt - we
260 // actually want an inclusive iteration but as we are comparing
261 // essentially random addresses (and we don't want to 'touch' the
262 // element after endIt), we loop to one before endIt, then handle
263 // endIt explicilty at the end.
264 // Note(2): Iterator is manually incremented outside the for() loop as it
265 // is invalidated when we erase items.
266 size_t purgedCount = 0;
268 for (auto it = startIt; it != endIt;) {
270 std::lock_guard<std::mutex> writeGuard(writeLock);
271 stale = it->isStale(writeGuard);
273 // Only stale items are purged.
279 // Checks pass, remove from list and delete.
280 it = purgeListElem(it);
283 // Handle the last element.
285 std::lock_guard<std::mutex> writeGuard(writeLock);
286 stale = endIt->isStale(writeGuard);
289 purgeListElem(endIt);
293 // Complete; reset the readRange.
295 std::lock_guard<SpinLock> lh(rangeLock);
301 void BasicLinkedList::updateNumDeletedItems(bool oldDeleted, bool newDeleted) {
302 if (oldDeleted && !newDeleted) {
304 } else if (!oldDeleted && newDeleted) {
309 uint64_t BasicLinkedList::getNumStaleItems() const {
310 return numStaleItems;
313 size_t BasicLinkedList::getStaleValueBytes() const {
317 size_t BasicLinkedList::getStaleMetadataBytes() const {
318 return staleMetaDataSize;
321 uint64_t BasicLinkedList::getNumDeletedItems() const {
322 std::lock_guard<std::mutex> lckGd(writeLock);
323 return numDeletedItems;
326 uint64_t BasicLinkedList::getNumItems() const {
327 std::lock_guard<std::mutex> lckGd(writeLock);
328 return seqList.size();
331 uint64_t BasicLinkedList::getHighSeqno() const {
332 std::lock_guard<std::mutex> lckGd(highSeqnosLock);
336 uint64_t BasicLinkedList::getHighestDedupedSeqno() const {
337 std::lock_guard<std::mutex> lckGd(highSeqnosLock);
338 return highestDedupedSeqno;
341 seqno_t BasicLinkedList::getHighestPurgedDeletedSeqno() const {
342 return highestPurgedDeletedSeqno;
345 uint64_t BasicLinkedList::getRangeReadBegin() const {
346 std::lock_guard<SpinLock> lh(rangeLock);
347 return readRange.getBegin();
350 uint64_t BasicLinkedList::getRangeReadEnd() const {
351 std::lock_guard<SpinLock> lh(rangeLock);
352 return readRange.getEnd();
354 std::mutex& BasicLinkedList::getHighSeqnosLock() const {
355 return highSeqnosLock;
358 SequenceList::RangeIterator BasicLinkedList::makeRangeIterator() {
359 return SequenceList::RangeIterator(
360 std::make_unique<RangeIteratorLL>(*this));
363 void BasicLinkedList::dump() const {
364 std::cerr << *this << std::endl;
367 std::ostream& operator <<(std::ostream& os, const BasicLinkedList& ll) {
368 os << "BasicLinkedList[" << &ll << "] with numItems:" << ll.seqList.size()
369 << " deletedItems:" << ll.numDeletedItems
370 << " staleItems:" << ll.getNumStaleItems()
371 << " highPurgeSeqno:" << ll.getHighestPurgedDeletedSeqno()
372 << " elements:[" << std::endl;
374 for (const auto& val : ll.seqList) {
375 os << " " << val << std::endl;
378 os << "] (count:" << count << ")";
382 OrderedLL::iterator BasicLinkedList::purgeListElem(OrderedLL::iterator it) {
383 StoredValue::UniquePtr purged(&*it);
385 std::lock_guard<std::mutex> lckGd(writeLock);
386 it = seqList.erase(it);
389 /* Update the stats tracking the memory owned by the list */
390 staleSize.fetch_sub(purged->size());
391 staleMetaDataSize.fetch_sub(purged->metaDataSize());
392 st.currentSize.fetch_sub(purged->metaDataSize());
394 // Similary for the item counts:
396 if (purged->isDeleted()) {
400 if (purged->isDeleted()) {
401 highestPurgedDeletedSeqno = std::max(seqno_t(highestPurgedDeletedSeqno),
402 purged->getBySeqno());
407 BasicLinkedList::RangeIteratorLL::RangeIteratorLL(BasicLinkedList& ll)
408 : list(ll), readLockHolder(list.rangeReadLock), itrRange(0, 0) {
409 std::lock_guard<SpinLock> lh(list.rangeLock);
410 if (list.highSeqno < 1) {
411 /* No need of holding a lock for the snapshot as there are no items;
412 Also iterator range is at default (0, 0) */
413 readLockHolder.unlock();
417 /* Iterator to the beginning of linked list */
418 currIt = list.seqList.begin();
420 /* Mark the snapshot range on linked list. The range that can be read by the
421 iterator is inclusive of the start and the end. */
422 list.readRange = SeqRange(currIt->getBySeqno(), list.highSeqno);
424 /* Keep the range in the iterator obj. We store the range end seqno as one
425 higher than the end seqno that can be read by this iterator.
426 This is because, we must identify the end point of the iterator, and
427 we the read is inclusive of the end points of list.readRange.
429 Further, since use the class 'SeqRange' for 'itrRange' we cannot use
430 curr() == end() + 1 to identify the end point because 'SeqRange' does
431 not internally allow curr > end */
432 itrRange = SeqRange(currIt->getBySeqno(), list.highSeqno + 1);
435 BasicLinkedList::RangeIteratorLL::~RangeIteratorLL() {
436 std::lock_guard<SpinLock> lh(list.rangeLock);
437 list.readRange.reset();
438 /* As readLockHolder goes out of scope here, it will automatically release
439 the snapshot read lock on the linked list */
442 OrderedStoredValue& BasicLinkedList::RangeIteratorLL::operator*() const {
443 if (curr() >= end()) {
444 /* We can't read beyond the range end */
445 throw std::out_of_range(
446 "BasicLinkedList::RangeIteratorLL::operator*()"
447 ": Trying to read beyond range end seqno " +
448 std::to_string(end()));
453 BasicLinkedList::RangeIteratorLL& BasicLinkedList::RangeIteratorLL::
455 if (curr() >= end()) {
456 throw std::out_of_range(
457 "BasicLinkedList::RangeIteratorLL::operator++()"
458 ": Trying to move the iterator beyond range end"
460 std::to_string(end()));
463 /* Check if the iterator is pointing to the last element. Increment beyond
464 the last element indicates the end of the iteration */
465 if (curr() == itrRange.getEnd() - 1) {
466 std::lock_guard<SpinLock> lh(list.rangeLock);
467 /* Release snapshot read lock on the linked list */
468 list.readRange.reset();
469 readLockHolder.unlock();
471 /* Update the begin to end() so the client can see that the iteration
473 itrRange.setBegin(end());
479 /* As the iterator moves we reduce the snapshot range being read on the
480 linked list. This helps reduce the stale items in the list during
481 heavy update load from the front end */
482 std::lock_guard<SpinLock> lh(list.rangeLock);
483 list.readRange.setBegin(currIt->getBySeqno());
486 /* Also update the current range stored in the iterator obj */
487 itrRange.setBegin(currIt->getBySeqno());