1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2017 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #include "linked_list.h"
21 BasicLinkedList::BasicLinkedList(uint16_t vbucketId, EPStats& st)
27 highestDedupedSeqno(0),
28 highestPurgedDeletedSeqno(0),
35 BasicLinkedList::~BasicLinkedList() {
36 /* Delete stale items here, other items are deleted by the hash
38 std::lock_guard<std::mutex> writeGuard(writeLock);
39 seqList.remove_and_dispose_if(
40 [&writeGuard](const OrderedStoredValue& v) {
41 return v.isStale(writeGuard);
43 [this](OrderedStoredValue* v) {
44 this->st.currentSize.fetch_sub(v->metaDataSize());
48 /* Erase all the list elements (does not destroy elements, just removes
49 them from the list) */
53 void BasicLinkedList::appendToList(std::lock_guard<std::mutex>& seqLock,
54 OrderedStoredValue& v) {
55 /* Allow only one write to the list at a time */
56 std::lock_guard<std::mutex> lckGd(writeLock);
61 SequenceList::UpdateStatus BasicLinkedList::updateListElem(
62 std::lock_guard<std::mutex>& seqLock, OrderedStoredValue& v) {
63 /* Allow only one write to the list at a time */
64 std::lock_guard<std::mutex> lckGd(writeLock);
66 /* Lock that needed for consistent read of SeqRange 'readRange' */
67 std::lock_guard<SpinLock> lh(rangeLock);
69 if (readRange.fallsInRange(v.getBySeqno())) {
70 /* Range read is in middle of a point-in-time snapshot, hence we cannot
71 move the element to the end of the list. Return a temp failure */
72 return UpdateStatus::Append;
75 /* Since there is no other reads or writes happenning in this range, we can
76 move the item to the end of the list */
77 auto it = seqList.iterator_to(v);
81 return UpdateStatus::Success;
84 std::pair<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>>
85 BasicLinkedList::rangeRead(seqno_t start, seqno_t end) {
86 std::vector<UniqueItemPtr> empty;
87 if ((start > end) || (start <= 0)) {
88 LOG(EXTENSION_LOG_WARNING,
89 "BasicLinkedList::rangeRead(): "
90 "(vb:%d) ERANGE: start %" PRIi64 " > end %" PRIi64,
94 return {ENGINE_ERANGE, std::move(empty)
95 /* MSVC not happy with std::vector<UniqueItemPtr>() */};
98 /* Allows only 1 rangeRead for now */
99 std::lock_guard<std::mutex> lckGd(rangeReadLock);
102 std::lock_guard<SpinLock> lh(rangeLock);
103 if (start > highSeqno) {
104 LOG(EXTENSION_LOG_WARNING,
105 "BasicLinkedList::rangeRead(): "
106 "(vb:%d) ERANGE: start %" PRIi64 " > highSeqno %" PRIi64,
109 static_cast<seqno_t>(highSeqno));
110 /* If the request is for an invalid range, return before iterating
112 return {ENGINE_ERANGE, std::move(empty)};
115 /* Mark the initial read range */
116 end = std::min(end, static_cast<seqno_t>(highSeqno));
117 end = std::max(end, static_cast<seqno_t>(highestDedupedSeqno));
118 readRange = SeqRange(1, end);
121 /* Read items in the range */
122 std::vector<UniqueItemPtr> items;
124 for (const auto& osv : seqList) {
125 if (osv.getBySeqno() > end) {
131 std::lock_guard<SpinLock> lh(rangeLock);
132 readRange.setBegin(osv.getBySeqno()); /* [EPHE TODO]: should we
133 update the min every time ?
137 if (osv.getBySeqno() < start) {
143 items.push_back(UniqueItemPtr(osv.toItem(false, vbid)));
144 } catch (const std::bad_alloc&) {
145 /* [EPHE TODO]: Do we handle backfill in a better way ?
146 Perhaps do backfilling partially (that is
147 backfill ==> stream; backfill ==> stream ..so on )?
149 LOG(EXTENSION_LOG_WARNING,
150 "BasicLinkedList::rangeRead(): "
151 "(vb %d) ENOMEM while trying to copy "
152 "item with seqno %" PRIi64 "before streaming it",
155 return {ENGINE_ENOMEM, std::move(empty)};
159 /* Done with range read, reset the range */
161 std::lock_guard<SpinLock> lh(rangeLock);
165 /* Return all the range read items */
166 return {ENGINE_SUCCESS, std::move(items)};
169 void BasicLinkedList::updateHighSeqno(const OrderedStoredValue& v) {
170 std::lock_guard<SpinLock> lh(rangeLock);
171 highSeqno = v.getBySeqno();
174 void BasicLinkedList::markItemStale(StoredValue::UniquePtr ownedSv) {
175 /* Release the StoredValue as BasicLinkedList does not want it to be of
177 StoredValue* v = ownedSv.release();
179 /* Update the stats tracking the memory owned by the list */
180 staleSize.fetch_add(v->size());
181 staleMetaDataSize.fetch_add(v->metaDataSize());
182 st.currentSize.fetch_add(v->metaDataSize());
186 std::lock_guard<std::mutex> writeGuard(writeLock);
187 v->toOrderedStoredValue()->markStale(writeGuard);
191 size_t BasicLinkedList::purgeTombstones() {
192 // Purge items marked as stale from the seqList.
194 // Strategy - we try to ensure that this function does not block
195 // frontend-writes (adding new OrderedStoredValues (OSVs) to the seqList).
196 // To achieve this (safely),
197 // we (try to) acquire the rangeReadLock and setup a 'read' range for the
198 // whole of the seqList. This prevents any other readers from iterating
199 // the list (and accessing stale items) while we purge on it; but permits
200 // front-end operations to continue as they:
201 // a) Only read/modify non-stale items (we only change stale items) and
202 // b) Do not change the list membership of anything within the read-range.
203 // However, we do need to be careful about what members of OSVs we access
204 // here - the only OSVs we can safely access are ones marked stale as they
205 // are no longer in the HashTable (and hence subject to HashTable locks).
206 // To check if an item is stale we need to acquire the writeLock
207 // (OSV::stale is guarded by it) for each list item. While this isn't
208 // ideal (that's the same lock needed by front-end operations), we can
209 // release the lock between each element so front-end operations can
210 // have the opportunity to acquire it.
212 // Attempt to acquire the readRangeLock, to block anyone else concurrently
213 // reading from the list while we remove elements from it.
214 std::unique_lock<std::mutex> rrGuard(rangeReadLock, std::try_to_lock);
216 // If we cannot acquire the lock then another thread is
217 // running a range read. Given these are typically long-running,
218 // return without blocking.
222 // Determine the start and end iterators.
223 OrderedLL::iterator startIt;
224 OrderedLL::iterator endIt;
226 std::lock_guard<std::mutex> writeGuard(writeLock);
227 if (seqList.empty()) {
228 // Nothing in sequence list - nothing to purge.
231 // Determine the {start} and {end} iterator (inclusive). Note
232 // that (at most) one item is added to the seqList before its
233 // sequence number is set (as the seqno comes from Ckpt
234 // manager); if that is the case (such an item is guaranteed
235 // to not be stale), then move end to the previous item
236 // (i.e. we don't consider this "in-flight" item), as long as
237 // there is at least two elements.
238 startIt = seqList.begin();
239 endIt = std::prev(seqList.end());
240 // Need rangeLock for highSeqno & readRange
241 std::lock_guard<SpinLock> rangeGuard(rangeLock);
242 if ((startIt != endIt) && (!endIt->isStale(writeGuard))) {
243 endIt = std::prev(endIt);
245 readRange = SeqRange(startIt->getBySeqno(), endIt->getBySeqno());
248 // Iterate across all but the last item in the seqList, looking
250 // Note the for() loop terminates one element before endIt - we
251 // actually want an inclusive iteration but as we are comparing
252 // essentially random addresses (and we don't want to 'touch' the
253 // element after endIt), we loop to one before endIt, then handle
254 // endIt explicilty at the end.
255 // Note(2): Iterator is manually incremented outside the for() loop as it
256 // is invalidated when we erase items.
257 size_t purgedCount = 0;
259 for (auto it = startIt; it != endIt;) {
261 std::lock_guard<std::mutex> writeGuard(writeLock);
262 stale = it->isStale(writeGuard);
264 // Only stale items are purged.
270 // Checks pass, remove from list and delete.
271 it = purgeListElem(it);
274 // Handle the last element.
276 std::lock_guard<std::mutex> writeGuard(writeLock);
277 stale = endIt->isStale(writeGuard);
280 purgeListElem(endIt);
284 // Complete; reset the readRange.
286 std::lock_guard<SpinLock> lh(rangeLock);
292 void BasicLinkedList::updateNumDeletedItems(bool oldDeleted, bool newDeleted) {
293 if (oldDeleted && !newDeleted) {
295 } else if (!oldDeleted && newDeleted) {
300 uint64_t BasicLinkedList::getNumStaleItems() const {
301 return numStaleItems;
304 size_t BasicLinkedList::getStaleValueBytes() const {
308 size_t BasicLinkedList::getStaleMetadataBytes() const {
309 return staleMetaDataSize;
312 uint64_t BasicLinkedList::getNumDeletedItems() const {
313 std::lock_guard<std::mutex> lckGd(writeLock);
314 return numDeletedItems;
317 uint64_t BasicLinkedList::getNumItems() const {
318 std::lock_guard<std::mutex> lckGd(writeLock);
319 return seqList.size();
322 uint64_t BasicLinkedList::getHighSeqno() const {
323 std::lock_guard<SpinLock> lh(rangeLock);
327 uint64_t BasicLinkedList::getHighestDedupedSeqno() const {
328 std::lock_guard<std::mutex> lckGd(writeLock);
329 return highestDedupedSeqno;
332 seqno_t BasicLinkedList::getHighestPurgedDeletedSeqno() const {
333 return highestPurgedDeletedSeqno;
336 uint64_t BasicLinkedList::getRangeReadBegin() const {
337 std::lock_guard<SpinLock> lh(rangeLock);
338 return readRange.getBegin();
341 uint64_t BasicLinkedList::getRangeReadEnd() const {
342 std::lock_guard<SpinLock> lh(rangeLock);
343 return readRange.getEnd();
346 void BasicLinkedList::dump() const {
347 std::cerr << *this << std::endl;
350 std::ostream& operator <<(std::ostream& os, const BasicLinkedList& ll) {
351 os << "BasicLinkedList[" << &ll << "] with numItems:" << ll.seqList.size()
352 << " deletedItems:" << ll.numDeletedItems
353 << " staleItems:" << ll.getNumStaleItems()
354 << " highPurgeSeqno:" << ll.getHighestPurgedDeletedSeqno()
355 << " elements:[" << std::endl;
357 for (const auto& val : ll.seqList) {
358 os << " " << val << std::endl;
361 os << "] (count:" << count << ")";
365 OrderedLL::iterator BasicLinkedList::purgeListElem(OrderedLL::iterator it) {
366 StoredValue::UniquePtr purged(&*it);
368 std::lock_guard<std::mutex> lckGd(writeLock);
369 it = seqList.erase(it);
372 /* Update the stats tracking the memory owned by the list */
373 staleSize.fetch_sub(purged->size());
374 staleMetaDataSize.fetch_sub(purged->metaDataSize());
375 st.currentSize.fetch_sub(purged->metaDataSize());
377 // Similary for the item counts:
379 if (purged->isDeleted()) {
383 if (purged->isDeleted()) {
384 highestPurgedDeletedSeqno = std::max(seqno_t(highestPurgedDeletedSeqno),
385 purged->getBySeqno());