MB-24151: Implement general range read iterator in seqlist
[ep-engine.git] / src / linked_list.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "linked_list.h"
19 #include <mutex>
20
21 BasicLinkedList::BasicLinkedList(uint16_t vbucketId, EPStats& st)
22     : SequenceList(),
23       readRange(0, 0),
24       staleSize(0),
25       staleMetaDataSize(0),
26       highSeqno(0),
27       highestDedupedSeqno(0),
28       highestPurgedDeletedSeqno(0),
29       numStaleItems(0),
30       numDeletedItems(0),
31       vbid(vbucketId),
32       st(st) {
33 }
34
35 BasicLinkedList::~BasicLinkedList() {
36     /* Delete stale items here, other items are deleted by the hash
37        table */
38     std::lock_guard<std::mutex> writeGuard(writeLock);
39     seqList.remove_and_dispose_if(
40             [&writeGuard](const OrderedStoredValue& v) {
41                 return v.isStale(writeGuard);
42             },
43             [this](OrderedStoredValue* v) {
44                 this->st.currentSize.fetch_sub(v->metaDataSize());
45                 delete v;
46             });
47
48     /* Erase all the list elements (does not destroy elements, just removes
49        them from the list) */
50     seqList.clear();
51 }
52
53 void BasicLinkedList::appendToList(std::lock_guard<std::mutex>& seqLock,
54                                    OrderedStoredValue& v) {
55     /* Allow only one write to the list at a time */
56     std::lock_guard<std::mutex> lckGd(writeLock);
57
58     seqList.push_back(v);
59 }
60
61 SequenceList::UpdateStatus BasicLinkedList::updateListElem(
62         std::lock_guard<std::mutex>& seqLock, OrderedStoredValue& v) {
63     /* Allow only one write to the list at a time */
64     std::lock_guard<std::mutex> lckGd(writeLock);
65
66     /* Lock that needed for consistent read of SeqRange 'readRange' */
67     std::lock_guard<SpinLock> lh(rangeLock);
68
69     if (readRange.fallsInRange(v.getBySeqno())) {
70         /* Range read is in middle of a point-in-time snapshot, hence we cannot
71            move the element to the end of the list. Return a temp failure */
72         return UpdateStatus::Append;
73     }
74
75     /* Since there is no other reads or writes happenning in this range, we can
76        move the item to the end of the list */
77     auto it = seqList.iterator_to(v);
78     seqList.erase(it);
79     seqList.push_back(v);
80
81     return UpdateStatus::Success;
82 }
83
84 std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
85 BasicLinkedList::rangeRead(seqno_t start, seqno_t end) {
86     std::vector<UniqueItemPtr> empty;
87     if ((start > end) || (start <= 0)) {
88         LOG(EXTENSION_LOG_WARNING,
89             "BasicLinkedList::rangeRead(): "
90             "(vb:%d) ERANGE: start %" PRIi64 " > end %" PRIi64,
91             vbid,
92             start,
93             end);
94         return std::make_tuple(
95                 ENGINE_ERANGE,
96                 std::move(empty)
97                 /* MSVC not happy with std::vector<UniqueItemPtr>() */,
98                 0);
99     }
100
101     /* Allows only 1 rangeRead for now */
102     std::lock_guard<std::mutex> lckGd(rangeReadLock);
103
104     {
105         std::lock_guard<SpinLock> lh(rangeLock);
106         if (start > highSeqno) {
107             LOG(EXTENSION_LOG_WARNING,
108                 "BasicLinkedList::rangeRead(): "
109                 "(vb:%d) ERANGE: start %" PRIi64 " > highSeqno %" PRIi64,
110                 vbid,
111                 start,
112                 static_cast<seqno_t>(highSeqno));
113             /* If the request is for an invalid range, return before iterating
114                through the list */
115             return std::make_tuple(ENGINE_ERANGE, std::move(empty), 0);
116         }
117
118         /* Mark the initial read range */
119         end = std::min(end, static_cast<seqno_t>(highSeqno));
120         end = std::max(end, static_cast<seqno_t>(highestDedupedSeqno));
121         readRange = SeqRange(1, end);
122     }
123
124     /* Read items in the range */
125     std::vector<UniqueItemPtr> items;
126
127     for (const auto& osv : seqList) {
128         if (osv.getBySeqno() > end) {
129             /* we are done */
130             break;
131         }
132
133         {
134             std::lock_guard<SpinLock> lh(rangeLock);
135             readRange.setBegin(osv.getBySeqno()); /* [EPHE TODO]: should we
136                                                      update the min every time ?
137                                                    */
138         }
139
140         if (osv.getBySeqno() < start) {
141             /* skip this item */
142             continue;
143         }
144
145         try {
146             items.push_back(UniqueItemPtr(osv.toItem(false, vbid)));
147         } catch (const std::bad_alloc&) {
148             /* [EPHE TODO]: Do we handle backfill in a better way ?
149                             Perhaps do backfilling partially (that is
150                             backfill ==> stream; backfill ==> stream ..so on )?
151              */
152             LOG(EXTENSION_LOG_WARNING,
153                 "BasicLinkedList::rangeRead(): "
154                 "(vb %d) ENOMEM while trying to copy "
155                 "item with seqno %" PRIi64 "before streaming it",
156                 vbid,
157                 osv.getBySeqno());
158             return std::make_tuple(ENGINE_ENOMEM, std::move(empty), 0);
159         }
160     }
161
162     /* Done with range read, reset the range */
163     {
164         std::lock_guard<SpinLock> lh(rangeLock);
165         readRange.reset();
166     }
167
168     /* Return all the range read items */
169     return std::make_tuple(ENGINE_SUCCESS, std::move(items), end);
170 }
171
172 void BasicLinkedList::updateHighSeqno(
173         std::lock_guard<std::mutex>& highSeqnoLock,
174         const OrderedStoredValue& v) {
175     highSeqno = v.getBySeqno();
176 }
177 void BasicLinkedList::updateHighestDedupedSeqno(
178         std::lock_guard<std::mutex>& highSeqnoLock,
179         const OrderedStoredValue& v) {
180     highestDedupedSeqno = v.getBySeqno();
181 }
182
183 void BasicLinkedList::markItemStale(StoredValue::UniquePtr ownedSv) {
184     /* Release the StoredValue as BasicLinkedList does not want it to be of
185        owned type */
186     StoredValue* v = ownedSv.release();
187
188     /* Update the stats tracking the memory owned by the list */
189     staleSize.fetch_add(v->size());
190     staleMetaDataSize.fetch_add(v->metaDataSize());
191     st.currentSize.fetch_add(v->metaDataSize());
192
193     ++numStaleItems;
194     {
195         std::lock_guard<std::mutex> writeGuard(writeLock);
196         v->toOrderedStoredValue()->markStale(writeGuard);
197     }
198 }
199
200 size_t BasicLinkedList::purgeTombstones() {
201     // Purge items marked as stale from the seqList.
202     //
203     // Strategy - we try to ensure that this function does not block
204     // frontend-writes (adding new OrderedStoredValues (OSVs) to the seqList).
205     // To achieve this (safely),
206     // we (try to) acquire the rangeReadLock and setup a 'read' range for the
207     // whole of the seqList. This prevents any other readers from iterating
208     // the list (and accessing stale items) while we purge on it; but permits
209     // front-end operations to continue as they:
210     //   a) Only read/modify non-stale items (we only change stale items) and
211     //   b) Do not change the list membership of anything within the read-range.
212     // However, we do need to be careful about what members of OSVs we access
213     // here - the only OSVs we can safely access are ones marked stale as they
214     // are no longer in the HashTable (and hence subject to HashTable locks).
215     // To check if an item is stale we need to acquire the writeLock
216     // (OSV::stale is guarded by it) for each list item. While this isn't
217     // ideal (that's the same lock needed by front-end operations), we can
218     // release the lock between each element so front-end operations can
219     // have the opportunity to acquire it.
220     //
221     // Attempt to acquire the readRangeLock, to block anyone else concurrently
222     // reading from the list while we remove elements from it.
223     std::unique_lock<std::mutex> rrGuard(rangeReadLock, std::try_to_lock);
224     if (!rrGuard) {
225         // If we cannot acquire the lock then another thread is
226         // running a range read. Given these are typically long-running,
227         // return without blocking.
228         return 0;
229     }
230
231     // Determine the start and end iterators.
232     OrderedLL::iterator startIt;
233     OrderedLL::iterator endIt;
234     {
235         std::lock_guard<std::mutex> writeGuard(writeLock);
236         if (seqList.empty()) {
237             // Nothing in sequence list - nothing to purge.
238             return 0;
239         }
240         // Determine the {start} and {end} iterator (inclusive). Note
241         // that (at most) one item is added to the seqList before its
242         // sequence number is set (as the seqno comes from Ckpt
243         // manager); if that is the case (such an item is guaranteed
244         // to not be stale), then move end to the previous item
245         // (i.e. we don't consider this "in-flight" item), as long as
246         // there is at least two elements.
247         startIt = seqList.begin();
248         endIt = std::prev(seqList.end());
249         // Need rangeLock for highSeqno & readRange
250         std::lock_guard<SpinLock> rangeGuard(rangeLock);
251         if ((startIt != endIt) && (!endIt->isStale(writeGuard))) {
252             endIt = std::prev(endIt);
253         }
254         readRange = SeqRange(startIt->getBySeqno(), endIt->getBySeqno());
255     }
256
257     // Iterate across all but the last item in the seqList, looking
258     // for stale items.
259     // Note the for() loop terminates one element before endIt - we
260     // actually want an inclusive iteration but as we are comparing
261     // essentially random addresses (and we don't want to 'touch' the
262     // element after endIt), we loop to one before endIt, then handle
263     // endIt explicilty at the end.
264     // Note(2): Iterator is manually incremented outside the for() loop as it
265     // is invalidated when we erase items.
266     size_t purgedCount = 0;
267     bool stale;
268     for (auto it = startIt; it != endIt;) {
269         {
270             std::lock_guard<std::mutex> writeGuard(writeLock);
271             stale = it->isStale(writeGuard);
272         }
273         // Only stale items are purged.
274         if (!stale) {
275             ++it;
276             continue;
277         }
278
279         // Checks pass, remove from list and delete.
280         it = purgeListElem(it);
281         ++purgedCount;
282     }
283     // Handle the last element.
284     {
285         std::lock_guard<std::mutex> writeGuard(writeLock);
286         stale = endIt->isStale(writeGuard);
287     }
288     if (stale) {
289         purgeListElem(endIt);
290         ++purgedCount;
291     }
292
293     // Complete; reset the readRange.
294     {
295         std::lock_guard<SpinLock> lh(rangeLock);
296         readRange.reset();
297     }
298     return purgedCount;
299 }
300
301 void BasicLinkedList::updateNumDeletedItems(bool oldDeleted, bool newDeleted) {
302     if (oldDeleted && !newDeleted) {
303         --numDeletedItems;
304     } else if (!oldDeleted && newDeleted) {
305         ++numDeletedItems;
306     }
307 }
308
309 uint64_t BasicLinkedList::getNumStaleItems() const {
310     return numStaleItems;
311 }
312
313 size_t BasicLinkedList::getStaleValueBytes() const {
314     return staleSize;
315 }
316
317 size_t BasicLinkedList::getStaleMetadataBytes() const {
318     return staleMetaDataSize;
319 }
320
321 uint64_t BasicLinkedList::getNumDeletedItems() const {
322     std::lock_guard<std::mutex> lckGd(writeLock);
323     return numDeletedItems;
324 }
325
326 uint64_t BasicLinkedList::getNumItems() const {
327     std::lock_guard<std::mutex> lckGd(writeLock);
328     return seqList.size();
329 }
330
331 uint64_t BasicLinkedList::getHighSeqno() const {
332     std::lock_guard<std::mutex> lckGd(highSeqnosLock);
333     return highSeqno;
334 }
335
336 uint64_t BasicLinkedList::getHighestDedupedSeqno() const {
337     std::lock_guard<std::mutex> lckGd(highSeqnosLock);
338     return highestDedupedSeqno;
339 }
340
341 seqno_t BasicLinkedList::getHighestPurgedDeletedSeqno() const {
342     return highestPurgedDeletedSeqno;
343 }
344
345 uint64_t BasicLinkedList::getRangeReadBegin() const {
346     std::lock_guard<SpinLock> lh(rangeLock);
347     return readRange.getBegin();
348 }
349
350 uint64_t BasicLinkedList::getRangeReadEnd() const {
351     std::lock_guard<SpinLock> lh(rangeLock);
352     return readRange.getEnd();
353 }
354 std::mutex& BasicLinkedList::getHighSeqnosLock() const {
355     return highSeqnosLock;
356 }
357
358 SequenceList::RangeIterator BasicLinkedList::makeRangeIterator() {
359     return SequenceList::RangeIterator(
360             std::make_unique<RangeIteratorLL>(*this));
361 }
362
363 void BasicLinkedList::dump() const {
364     std::cerr << *this << std::endl;
365 }
366
367 std::ostream& operator <<(std::ostream& os, const BasicLinkedList& ll) {
368     os << "BasicLinkedList[" << &ll << "] with numItems:" << ll.seqList.size()
369        << " deletedItems:" << ll.numDeletedItems
370        << " staleItems:" << ll.getNumStaleItems()
371        << " highPurgeSeqno:" << ll.getHighestPurgedDeletedSeqno()
372        << " elements:[" << std::endl;
373     size_t count = 0;
374     for (const auto& val : ll.seqList) {
375         os << "    " << val << std::endl;
376         ++count;
377     }
378     os << "] (count:" << count << ")";
379     return os;
380 }
381
382 OrderedLL::iterator BasicLinkedList::purgeListElem(OrderedLL::iterator it) {
383     StoredValue::UniquePtr purged(&*it);
384     {
385         std::lock_guard<std::mutex> lckGd(writeLock);
386         it = seqList.erase(it);
387     }
388
389     /* Update the stats tracking the memory owned by the list */
390     staleSize.fetch_sub(purged->size());
391     staleMetaDataSize.fetch_sub(purged->metaDataSize());
392     st.currentSize.fetch_sub(purged->metaDataSize());
393
394     // Similary for the item counts:
395     --numStaleItems;
396     if (purged->isDeleted()) {
397         --numDeletedItems;
398     }
399
400     if (purged->isDeleted()) {
401         highestPurgedDeletedSeqno = std::max(seqno_t(highestPurgedDeletedSeqno),
402                                              purged->getBySeqno());
403     }
404     return it;
405 }
406
407 BasicLinkedList::RangeIteratorLL::RangeIteratorLL(BasicLinkedList& ll)
408     : list(ll), readLockHolder(list.rangeReadLock), itrRange(0, 0) {
409     std::lock_guard<SpinLock> lh(list.rangeLock);
410     if (list.highSeqno < 1) {
411         /* No need of holding a lock for the snapshot as there are no items;
412            Also iterator range is at default (0, 0) */
413         readLockHolder.unlock();
414         return;
415     }
416
417     /* Iterator to the beginning of linked list */
418     currIt = list.seqList.begin();
419
420     /* Mark the snapshot range on linked list. The range that can be read by the
421        iterator is inclusive of the start and the end. */
422     list.readRange = SeqRange(currIt->getBySeqno(), list.highSeqno);
423
424     /* Keep the range in the iterator obj. We store the range end seqno as one
425        higher than the end seqno that can be read by this iterator.
426        This is because, we must identify the end point of the iterator, and
427        we the read is inclusive of the end points of list.readRange.
428
429        Further, since use the class 'SeqRange' for 'itrRange' we cannot use
430        curr() == end() + 1 to identify the end point because 'SeqRange' does
431        not internally allow curr > end */
432     itrRange = SeqRange(currIt->getBySeqno(), list.highSeqno + 1);
433 }
434
435 BasicLinkedList::RangeIteratorLL::~RangeIteratorLL() {
436     std::lock_guard<SpinLock> lh(list.rangeLock);
437     list.readRange.reset();
438     /* As readLockHolder goes out of scope here, it will automatically release
439        the snapshot read lock on the linked list */
440 }
441
442 OrderedStoredValue& BasicLinkedList::RangeIteratorLL::operator*() const {
443     if (curr() >= end()) {
444         /* We can't read beyond the range end */
445         throw std::out_of_range(
446                 "BasicLinkedList::RangeIteratorLL::operator*()"
447                 ": Trying to read beyond range end seqno " +
448                 std::to_string(end()));
449     }
450     return *currIt;
451 }
452
453 BasicLinkedList::RangeIteratorLL& BasicLinkedList::RangeIteratorLL::
454 operator++() {
455     if (curr() >= end()) {
456         throw std::out_of_range(
457                 "BasicLinkedList::RangeIteratorLL::operator++()"
458                 ": Trying to move the iterator beyond range end"
459                 " seqno " +
460                 std::to_string(end()));
461     }
462
463     /* Check if the iterator is pointing to the last element. Increment beyond
464        the last element indicates the end of the iteration */
465     if (curr() == itrRange.getEnd() - 1) {
466         std::lock_guard<SpinLock> lh(list.rangeLock);
467         /* Release snapshot read lock on the linked list */
468         list.readRange.reset();
469         readLockHolder.unlock();
470
471         /* Update the begin to end() so the client can see that the iteration
472            has ended */
473         itrRange.setBegin(end());
474         return *this;
475     }
476
477     ++currIt;
478     {
479         /* As the iterator moves we reduce the snapshot range being read on the
480            linked list. This helps reduce the stale items in the list during
481            heavy update load from the front end */
482         std::lock_guard<SpinLock> lh(list.rangeLock);
483         list.readRange.setBegin(currIt->getBySeqno());
484     }
485
486     /* Also update the current range stored in the iterator obj */
487     itrRange.setBegin(currIt->getBySeqno());
488
489     return *this;
490 }