1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2017 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #include "linked_list.h"
21 BasicLinkedList::BasicLinkedList(uint16_t vbucketId, EPStats& st)
27 highestDedupedSeqno(0),
28 highestPurgedDeletedSeqno(0),
35 BasicLinkedList::~BasicLinkedList() {
36 /* Delete stale items here, other items are deleted by the hash
38 std::lock_guard<std::mutex> writeGuard(writeLock);
39 seqList.remove_and_dispose_if(
40 [&writeGuard](const OrderedStoredValue& v) {
41 return v.isStale(writeGuard);
43 [this](OrderedStoredValue* v) {
44 this->st.currentSize.fetch_sub(v->metaDataSize());
48 /* Erase all the list elements (does not destroy elements, just removes
49 them from the list) */
53 void BasicLinkedList::appendToList(std::lock_guard<std::mutex>& seqLock,
54 OrderedStoredValue& v) {
55 /* Allow only one write to the list at a time */
56 std::lock_guard<std::mutex> lckGd(writeLock);
61 SequenceList::UpdateStatus BasicLinkedList::updateListElem(
62 std::lock_guard<std::mutex>& seqLock, OrderedStoredValue& v) {
63 /* Allow only one write to the list at a time */
64 std::lock_guard<std::mutex> lckGd(writeLock);
66 /* Lock that needed for consistent read of SeqRange 'readRange' */
67 std::lock_guard<SpinLock> lh(rangeLock);
69 if (readRange.fallsInRange(v.getBySeqno())) {
70 /* Range read is in middle of a point-in-time snapshot, hence we cannot
71 move the element to the end of the list. Return a temp failure */
72 return UpdateStatus::Append;
75 /* Since there is no other reads or writes happenning in this range, we can
76 move the item to the end of the list */
77 auto it = seqList.iterator_to(v);
81 return UpdateStatus::Success;
84 std::pair<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>>
85 BasicLinkedList::rangeRead(seqno_t start, seqno_t end) {
86 std::vector<UniqueItemPtr> empty;
87 if ((start > end) || (start <= 0)) {
88 LOG(EXTENSION_LOG_WARNING,
89 "BasicLinkedList::rangeRead(): "
90 "(vb:%d) ERANGE: start %" PRIi64 " > end %" PRIi64,
94 return {ENGINE_ERANGE, std::move(empty)
95 /* MSVC not happy with std::vector<UniqueItemPtr>() */};
98 /* Allows only 1 rangeRead for now */
99 std::lock_guard<std::mutex> lckGd(rangeReadLock);
102 std::lock_guard<SpinLock> lh(rangeLock);
103 if (start > highSeqno) {
104 LOG(EXTENSION_LOG_WARNING,
105 "BasicLinkedList::rangeRead(): "
106 "(vb:%d) ERANGE: start %" PRIi64 " > highSeqno %" PRIi64,
109 static_cast<seqno_t>(highSeqno));
110 /* If the request is for an invalid range, return before iterating
112 return {ENGINE_ERANGE, std::move(empty)};
115 /* Mark the initial read range */
116 end = std::min(end, static_cast<seqno_t>(highSeqno));
117 end = std::max(end, static_cast<seqno_t>(highestDedupedSeqno));
118 readRange = SeqRange(1, end);
121 /* Read items in the range */
122 std::vector<UniqueItemPtr> items;
124 for (const auto& osv : seqList) {
125 if (osv.getBySeqno() > end) {
131 std::lock_guard<SpinLock> lh(rangeLock);
132 readRange.setBegin(osv.getBySeqno()); /* [EPHE TODO]: should we
133 update the min every time ?
137 if (osv.getBySeqno() < start) {
143 items.push_back(UniqueItemPtr(osv.toItem(false, vbid)));
144 } catch (const std::bad_alloc&) {
145 /* [EPHE TODO]: Do we handle backfill in a better way ?
146 Perhaps do backfilling partially (that is
147 backfill ==> stream; backfill ==> stream ..so on )?
149 LOG(EXTENSION_LOG_WARNING,
150 "BasicLinkedList::rangeRead(): "
151 "(vb %d) ENOMEM while trying to copy "
152 "item with seqno %" PRIi64 "before streaming it",
155 return {ENGINE_ENOMEM, std::move(empty)};
159 /* Done with range read, reset the range */
161 std::lock_guard<SpinLock> lh(rangeLock);
165 /* Return all the range read items */
166 return {ENGINE_SUCCESS, std::move(items)};
169 void BasicLinkedList::updateHighSeqno(const OrderedStoredValue& v) {
170 std::lock_guard<SpinLock> lh(rangeLock);
171 highSeqno = v.getBySeqno();
177 void BasicLinkedList::markItemStale(StoredValue::UniquePtr ownedSv) {
178 /* Release the StoredValue as BasicLinkedList does not want it to be of
180 StoredValue* v = ownedSv.release();
182 /* Update the stats tracking the memory owned by the list */
183 staleSize.fetch_add(v->size());
184 staleMetaDataSize.fetch_add(v->metaDataSize());
185 st.currentSize.fetch_add(v->metaDataSize());
189 std::lock_guard<std::mutex> writeGuard(writeLock);
190 v->toOrderedStoredValue()->markStale(writeGuard);
194 size_t BasicLinkedList::purgeTombstones() {
195 // Purge items marked as stale from the seqList.
197 // Strategy - we try to ensure that this function does not block
198 // frontend-writes (adding new OrderedStoredValues (OSVs) to the seqList).
199 // To achieve this (safely),
200 // we (try to) acquire the rangeReadLock and setup a 'read' range for the
201 // whole of the seqList. This prevents any other readers from iterating
202 // the list (and accessing stale items) while we purge on it; but permits
203 // front-end operations to continue as they:
204 // a) Only read/modify non-stale items (we only change stale items) and
205 // b) Do not change the list membership of anything within the read-range.
206 // However, we do need to be careful about what members of OSVs we access
207 // here - the only OSVs we can safely access are ones marked stale as they
208 // are no longer in the HashTable (and hence subject to HashTable locks).
209 // To check if an item is stale we need to acquire the writeLock
210 // (OSV::stale is guarded by it) for each list item. While this isn't
211 // ideal (that's the same lock needed by front-end operations), we can
212 // release the lock between each element so front-end operations can
213 // have the opportunity to acquire it.
215 // Attempt to acquire the readRangeLock, to block anyone else concurrently
216 // reading from the list while we remove elements from it.
217 std::unique_lock<std::mutex> rrGuard(rangeReadLock, std::try_to_lock);
219 // If we cannot acquire the lock then another thread is
220 // running a range read. Given these are typically long-running,
221 // return without blocking.
225 // Determine the start and end iterators.
226 OrderedLL::iterator startIt;
227 OrderedLL::iterator endIt;
229 std::lock_guard<std::mutex> writeGuard(writeLock);
230 if (seqList.empty()) {
231 // Nothing in sequence list - nothing to purge.
234 // Determine the {start} and {end} iterator (inclusive). Note
235 // that (at most) one item is added to the seqList before its
236 // sequence number is set (as the seqno comes from Ckpt
237 // manager); if that is the case (such an item is guaranteed
238 // to not be stale), then move end to the previous item
239 // (i.e. we don't consider this "in-flight" item), as long as
240 // there is at least two elements.
241 startIt = seqList.begin();
242 endIt = std::prev(seqList.end());
243 // Need rangeLock for highSeqno & readRange
244 std::lock_guard<SpinLock> rangeGuard(rangeLock);
245 if ((startIt != endIt) && (!endIt->isStale(writeGuard))) {
246 endIt = std::prev(endIt);
248 readRange = SeqRange(startIt->getBySeqno(), endIt->getBySeqno());
251 // Iterate across all but the last item in the seqList, looking
253 // Note the for() loop terminates one element before endIt - we
254 // actually want an inclusive iteration but as we are comparing
255 // essentially random addresses (and we don't want to 'touch' the
256 // element after endIt), we loop to one before endIt, then handle
257 // endIt explicilty at the end.
258 // Note(2): Iterator is manually incremented outside the for() loop as it
259 // is invalidated when we erase items.
260 size_t purgedCount = 0;
262 for (auto it = startIt; it != endIt;) {
264 std::lock_guard<std::mutex> writeGuard(writeLock);
265 stale = it->isStale(writeGuard);
267 // Only stale items are purged.
273 // Checks pass, remove from list and delete.
274 it = purgeListElem(it);
277 // Handle the last element.
279 std::lock_guard<std::mutex> writeGuard(writeLock);
280 stale = endIt->isStale(writeGuard);
283 purgeListElem(endIt);
287 // Complete; reset the readRange.
289 std::lock_guard<SpinLock> lh(rangeLock);
295 uint64_t BasicLinkedList::getNumStaleItems() const {
296 return numStaleItems;
299 size_t BasicLinkedList::getStaleValueBytes() const {
303 size_t BasicLinkedList::getStaleMetadataBytes() const {
304 return staleMetaDataSize;
307 uint64_t BasicLinkedList::getNumDeletedItems() const {
308 std::lock_guard<std::mutex> lckGd(writeLock);
309 return numDeletedItems;
312 uint64_t BasicLinkedList::getNumItems() const {
313 std::lock_guard<std::mutex> lckGd(writeLock);
314 return seqList.size();
317 uint64_t BasicLinkedList::getHighSeqno() const {
318 std::lock_guard<SpinLock> lh(rangeLock);
322 uint64_t BasicLinkedList::getHighestDedupedSeqno() const {
323 std::lock_guard<std::mutex> lckGd(writeLock);
324 return highestDedupedSeqno;
327 seqno_t BasicLinkedList::getHighestPurgedDeletedSeqno() const {
328 return highestPurgedDeletedSeqno;
331 uint64_t BasicLinkedList::getRangeReadBegin() const {
332 std::lock_guard<SpinLock> lh(rangeLock);
333 return readRange.getBegin();
336 uint64_t BasicLinkedList::getRangeReadEnd() const {
337 std::lock_guard<SpinLock> lh(rangeLock);
338 return readRange.getEnd();
341 void BasicLinkedList::dump() const {
342 std::cerr << *this << std::endl;
345 std::ostream& operator <<(std::ostream& os, const BasicLinkedList& ll) {
346 os << "BasicLinkedList[" << &ll << "] with numItems:" << ll.seqList.size()
347 << " deletedItems:" << ll.numDeletedItems
348 << " staleItems:" << ll.getNumStaleItems()
349 << " highPurgeSeqno:" << ll.getHighestPurgedDeletedSeqno()
350 << " elements:[" << std::endl;
351 for (const auto& val : ll.seqList) {
352 os << " " << val << std::endl;
358 OrderedLL::iterator BasicLinkedList::purgeListElem(OrderedLL::iterator it) {
359 StoredValue::UniquePtr purged(&*it);
361 std::lock_guard<std::mutex> lckGd(writeLock);
362 it = seqList.erase(it);
365 /* Update the stats tracking the memory owned by the list */
366 staleSize.fetch_sub(purged->size());
367 staleMetaDataSize.fetch_sub(purged->metaDataSize());
368 st.currentSize.fetch_sub(purged->metaDataSize());
370 // Similary for the item counts:
372 if (purged->isDeleted()) {
376 if (purged->isDeleted()) {
377 highestPurgedDeletedSeqno = std::max(seqno_t(highestPurgedDeletedSeqno),
378 purged->getBySeqno());