MB-24376: Stop rangeRead on invalid Seqno
[ep-engine.git] / src / linked_list.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "linked_list.h"
19 #include <mutex>
20
21 BasicLinkedList::BasicLinkedList(uint16_t vbucketId, EPStats& st)
22     : SequenceList(),
23       readRange(0, 0),
24       staleSize(0),
25       staleMetaDataSize(0),
26       highSeqno(0),
27       highestDedupedSeqno(0),
28       highestPurgedDeletedSeqno(0),
29       numStaleItems(0),
30       numDeletedItems(0),
31       vbid(vbucketId),
32       st(st) {
33 }
34
35 BasicLinkedList::~BasicLinkedList() {
36     /* Delete stale items here, other items are deleted by the hash
37        table */
38     std::lock_guard<std::mutex> writeGuard(getListWriteLock());
39     seqList.remove_and_dispose_if(
40             [&writeGuard](const OrderedStoredValue& v) {
41                 return v.isStale(writeGuard);
42             },
43             [this](OrderedStoredValue* v) {
44                 this->st.currentSize.fetch_sub(v->metaDataSize());
45                 delete v;
46             });
47
48     /* Erase all the list elements (does not destroy elements, just removes
49        them from the list) */
50     seqList.clear();
51 }
52
53 void BasicLinkedList::appendToList(std::lock_guard<std::mutex>& seqLock,
54                                    std::lock_guard<std::mutex>& writeLock,
55                                    OrderedStoredValue& v) {
56     seqList.push_back(v);
57 }
58
59 SequenceList::UpdateStatus BasicLinkedList::updateListElem(
60         std::lock_guard<std::mutex>& seqLock,
61         std::lock_guard<std::mutex>& writeLock,
62         OrderedStoredValue& v) {
63     /* Lock that needed for consistent read of SeqRange 'readRange' */
64     std::lock_guard<SpinLock> lh(rangeLock);
65
66     if (readRange.fallsInRange(v.getBySeqno())) {
67         /* Range read is in middle of a point-in-time snapshot, hence we cannot
68            move the element to the end of the list. Return a temp failure */
69         return UpdateStatus::Append;
70     }
71
72     /* Since there is no other reads or writes happenning in this range, we can
73        move the item to the end of the list */
74     auto it = seqList.iterator_to(v);
75     seqList.erase(it);
76     seqList.push_back(v);
77
78     return UpdateStatus::Success;
79 }
80
81 std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
82 BasicLinkedList::rangeRead(seqno_t start, seqno_t end) {
83     std::vector<UniqueItemPtr> empty;
84     if ((start > end) || (start <= 0)) {
85         LOG(EXTENSION_LOG_WARNING,
86             "BasicLinkedList::rangeRead(): "
87             "(vb:%d) ERANGE: start %" PRIi64 " > end %" PRIi64,
88             vbid,
89             start,
90             end);
91         return std::make_tuple(
92                 ENGINE_ERANGE,
93                 std::move(empty)
94                 /* MSVC not happy with std::vector<UniqueItemPtr>() */,
95                 0);
96     }
97
98     /* Allows only 1 rangeRead for now */
99     std::lock_guard<std::mutex> lckGd(rangeReadLock);
100
101     {
102         std::lock_guard<std::mutex> listWriteLg(getListWriteLock());
103         std::lock_guard<SpinLock> lh(rangeLock);
104         if (start > highSeqno) {
105             LOG(EXTENSION_LOG_WARNING,
106                 "BasicLinkedList::rangeRead(): "
107                 "(vb:%d) ERANGE: start %" PRIi64 " > highSeqno %" PRIi64,
108                 vbid,
109                 start,
110                 static_cast<seqno_t>(highSeqno));
111             /* If the request is for an invalid range, return before iterating
112                through the list */
113             return std::make_tuple(ENGINE_ERANGE, std::move(empty), 0);
114         }
115
116         /* Mark the initial read range */
117         end = std::min(end, static_cast<seqno_t>(highSeqno));
118         end = std::max(end, static_cast<seqno_t>(highestDedupedSeqno));
119         readRange = SeqRange(1, end);
120     }
121
122     /* Read items in the range */
123     std::vector<UniqueItemPtr> items;
124
125     for (const auto& osv : seqList) {
126         int64_t currSeqno(osv.getBySeqno());
127
128         if (currSeqno > end || currSeqno < 0) {
129             /* We have read all the items in the requested range, or the osv
130              * does not yet have a valid seqno; either way we are done */
131             break;
132         }
133
134         {
135             std::lock_guard<SpinLock> lh(rangeLock);
136             readRange.setBegin(currSeqno); /* [EPHE TODO]: should we
137                                                      update the min every time ?
138                                                    */
139         }
140
141         if (currSeqno < start) {
142             /* skip this item */
143             continue;
144         }
145
146         /* Check if this OSV has been made stale and has been superseded by a
147          * newer version. If it has, and the replacement is /also/ in the range
148          * we are reading, we should skip this item to avoid duplicates */
149         StoredValue* replacement;
150         {
151             std::lock_guard<std::mutex> writeGuard(getListWriteLock());
152             replacement = osv.getReplacementIfStale(writeGuard);
153         }
154
155         if (replacement &&
156             replacement->toOrderedStoredValue()->getBySeqno() <= end) {
157             continue;
158         }
159
160         try {
161             items.push_back(UniqueItemPtr(osv.toItem(false, vbid)));
162         } catch (const std::bad_alloc&) {
163             /* [EPHE TODO]: Do we handle backfill in a better way ?
164                             Perhaps do backfilling partially (that is
165                             backfill ==> stream; backfill ==> stream ..so on )?
166              */
167             LOG(EXTENSION_LOG_WARNING,
168                 "BasicLinkedList::rangeRead(): "
169                 "(vb %d) ENOMEM while trying to copy "
170                 "item with seqno %" PRIi64 "before streaming it",
171                 vbid,
172                 currSeqno);
173             return std::make_tuple(ENGINE_ENOMEM, std::move(empty), 0);
174         }
175     }
176
177     /* Done with range read, reset the range */
178     {
179         std::lock_guard<SpinLock> lh(rangeLock);
180         readRange.reset();
181     }
182
183     /* Return all the range read items */
184     return std::make_tuple(ENGINE_SUCCESS, std::move(items), end);
185 }
186
187 void BasicLinkedList::updateHighSeqno(std::lock_guard<std::mutex>& listWriteLg,
188                                       const OrderedStoredValue& v) {
189     highSeqno = v.getBySeqno();
190 }
191 void BasicLinkedList::updateHighestDedupedSeqno(
192         std::lock_guard<std::mutex>& listWriteLg, const OrderedStoredValue& v) {
193     highestDedupedSeqno = v.getBySeqno();
194 }
195
196 void BasicLinkedList::markItemStale(std::lock_guard<std::mutex>& listWriteLg,
197                                     StoredValue::UniquePtr ownedSv,
198                                     StoredValue* newSv) {
199     /* Release the StoredValue as BasicLinkedList does not want it to be of
200        owned type */
201     StoredValue* v = ownedSv.release();
202
203     /* Update the stats tracking the memory owned by the list */
204     staleSize.fetch_add(v->size());
205     staleMetaDataSize.fetch_add(v->metaDataSize());
206     st.currentSize.fetch_add(v->metaDataSize());
207
208     ++numStaleItems;
209     v->toOrderedStoredValue()->markStale(listWriteLg, newSv);
210 }
211
212 size_t BasicLinkedList::purgeTombstones() {
213     // Purge items marked as stale from the seqList.
214     //
215     // Strategy - we try to ensure that this function does not block
216     // frontend-writes (adding new OrderedStoredValues (OSVs) to the seqList).
217     // To achieve this (safely),
218     // we (try to) acquire the rangeReadLock and setup a 'read' range for the
219     // whole of the seqList. This prevents any other readers from iterating
220     // the list (and accessing stale items) while we purge on it; but permits
221     // front-end operations to continue as they:
222     //   a) Only read/modify non-stale items (we only change stale items) and
223     //   b) Do not change the list membership of anything within the read-range.
224     // However, we do need to be careful about what members of OSVs we access
225     // here - the only OSVs we can safely access are ones marked stale as they
226     // are no longer in the HashTable (and hence subject to HashTable locks).
227     // To check if an item is stale we need to acquire the writeLock
228     // (OSV::stale is guarded by it) for each list item. While this isn't
229     // ideal (that's the same lock needed by front-end operations), we can
230     // release the lock between each element so front-end operations can
231     // have the opportunity to acquire it.
232     //
233     // Attempt to acquire the readRangeLock, to block anyone else concurrently
234     // reading from the list while we remove elements from it.
235     std::unique_lock<std::mutex> rrGuard(rangeReadLock, std::try_to_lock);
236     if (!rrGuard) {
237         // If we cannot acquire the lock then another thread is
238         // running a range read. Given these are typically long-running,
239         // return without blocking.
240         return 0;
241     }
242
243     // Determine the start and end iterators.
244     OrderedLL::iterator startIt;
245     OrderedLL::iterator endIt;
246     {
247         std::lock_guard<std::mutex> writeGuard(getListWriteLock());
248         if (seqList.empty()) {
249             // Nothing in sequence list - nothing to purge.
250             return 0;
251         }
252         // Determine the {start} and {end} iterator (inclusive). Note
253         // that (at most) one item is added to the seqList before its
254         // sequence number is set (as the seqno comes from Ckpt
255         // manager); if that is the case (such an item is guaranteed
256         // to not be stale), then move end to the previous item
257         // (i.e. we don't consider this "in-flight" item), as long as
258         // there is at least two elements.
259         startIt = seqList.begin();
260         endIt = std::prev(seqList.end());
261         // Need rangeLock for highSeqno & readRange
262         std::lock_guard<SpinLock> rangeGuard(rangeLock);
263         if ((startIt != endIt) && (!endIt->isStale(writeGuard))) {
264             endIt = std::prev(endIt);
265         }
266         readRange = SeqRange(startIt->getBySeqno(), endIt->getBySeqno());
267     }
268
269     // Iterate across all but the last item in the seqList, looking
270     // for stale items.
271     // Note the for() loop terminates one element before endIt - we
272     // actually want an inclusive iteration but as we are comparing
273     // essentially random addresses (and we don't want to 'touch' the
274     // element after endIt), we loop to one before endIt, then handle
275     // endIt explicilty at the end.
276     // Note(2): Iterator is manually incremented outside the for() loop as it
277     // is invalidated when we erase items.
278     size_t purgedCount = 0;
279     bool stale;
280     for (auto it = startIt; it != endIt;) {
281         {
282             std::lock_guard<std::mutex> writeGuard(getListWriteLock());
283             stale = it->isStale(writeGuard);
284         }
285         // Only stale items are purged.
286         if (!stale) {
287             ++it;
288             continue;
289         }
290
291         // Checks pass, remove from list and delete.
292         it = purgeListElem(it);
293         ++purgedCount;
294     }
295     // Handle the last element.
296     {
297         std::lock_guard<std::mutex> writeGuard(getListWriteLock());
298         stale = endIt->isStale(writeGuard);
299     }
300     if (stale) {
301         purgeListElem(endIt);
302         ++purgedCount;
303     }
304
305     // Complete; reset the readRange.
306     {
307         std::lock_guard<SpinLock> lh(rangeLock);
308         readRange.reset();
309     }
310     return purgedCount;
311 }
312
313 void BasicLinkedList::updateNumDeletedItems(bool oldDeleted, bool newDeleted) {
314     if (oldDeleted && !newDeleted) {
315         --numDeletedItems;
316     } else if (!oldDeleted && newDeleted) {
317         ++numDeletedItems;
318     }
319 }
320
321 uint64_t BasicLinkedList::getNumStaleItems() const {
322     return numStaleItems;
323 }
324
325 size_t BasicLinkedList::getStaleValueBytes() const {
326     return staleSize;
327 }
328
329 size_t BasicLinkedList::getStaleMetadataBytes() const {
330     return staleMetaDataSize;
331 }
332
333 uint64_t BasicLinkedList::getNumDeletedItems() const {
334     std::lock_guard<std::mutex> lckGd(getListWriteLock());
335     return numDeletedItems;
336 }
337
338 uint64_t BasicLinkedList::getNumItems() const {
339     std::lock_guard<std::mutex> lckGd(getListWriteLock());
340     return seqList.size();
341 }
342
343 uint64_t BasicLinkedList::getHighSeqno() const {
344     std::lock_guard<std::mutex> lckGd(getListWriteLock());
345     return highSeqno;
346 }
347
348 uint64_t BasicLinkedList::getHighestDedupedSeqno() const {
349     std::lock_guard<std::mutex> lckGd(getListWriteLock());
350     return highestDedupedSeqno;
351 }
352
353 seqno_t BasicLinkedList::getHighestPurgedDeletedSeqno() const {
354     return highestPurgedDeletedSeqno;
355 }
356
357 uint64_t BasicLinkedList::getRangeReadBegin() const {
358     std::lock_guard<SpinLock> lh(rangeLock);
359     return readRange.getBegin();
360 }
361
362 uint64_t BasicLinkedList::getRangeReadEnd() const {
363     std::lock_guard<SpinLock> lh(rangeLock);
364     return readRange.getEnd();
365 }
366 std::mutex& BasicLinkedList::getListWriteLock() const {
367     return writeLock;
368 }
369
370 SequenceList::RangeIterator BasicLinkedList::makeRangeIterator() {
371     return SequenceList::RangeIterator(
372             std::make_unique<RangeIteratorLL>(*this));
373 }
374
375 void BasicLinkedList::dump() const {
376     std::cerr << *this << std::endl;
377 }
378
379 std::ostream& operator <<(std::ostream& os, const BasicLinkedList& ll) {
380     os << "BasicLinkedList[" << &ll << "] with numItems:" << ll.seqList.size()
381        << " deletedItems:" << ll.numDeletedItems
382        << " staleItems:" << ll.getNumStaleItems()
383        << " highPurgeSeqno:" << ll.getHighestPurgedDeletedSeqno()
384        << " elements:[" << std::endl;
385     size_t count = 0;
386     for (const auto& val : ll.seqList) {
387         os << "    " << val << std::endl;
388         ++count;
389     }
390     os << "] (count:" << count << ")";
391     return os;
392 }
393
394 OrderedLL::iterator BasicLinkedList::purgeListElem(OrderedLL::iterator it) {
395     StoredValue::UniquePtr purged(&*it);
396     {
397         std::lock_guard<std::mutex> lckGd(getListWriteLock());
398         it = seqList.erase(it);
399     }
400
401     /* Update the stats tracking the memory owned by the list */
402     staleSize.fetch_sub(purged->size());
403     staleMetaDataSize.fetch_sub(purged->metaDataSize());
404     st.currentSize.fetch_sub(purged->metaDataSize());
405
406     // Similary for the item counts:
407     --numStaleItems;
408     if (purged->isDeleted()) {
409         --numDeletedItems;
410     }
411
412     if (purged->isDeleted()) {
413         highestPurgedDeletedSeqno = std::max(seqno_t(highestPurgedDeletedSeqno),
414                                              purged->getBySeqno());
415     }
416     return it;
417 }
418
419 BasicLinkedList::RangeIteratorLL::RangeIteratorLL(BasicLinkedList& ll)
420     : list(ll),
421       readLockHolder(list.rangeReadLock),
422       itrRange(0, 0),
423       numRemaining(0) {
424     std::lock_guard<std::mutex> listWriteLg(list.getListWriteLock());
425     std::lock_guard<SpinLock> lh(list.rangeLock);
426     if (list.highSeqno < 1) {
427         /* No need of holding a lock for the snapshot as there are no items;
428            Also iterator range is at default (0, 0) */
429         readLockHolder.unlock();
430         return;
431     }
432
433     /* Iterator to the beginning of linked list */
434     currIt = list.seqList.begin();
435
436     /* Number of items that can be iterated over */
437     numRemaining = list.seqList.size();
438
439     /* The minimum seqno in the iterator that must be read to get a consistent
440        read snapshot */
441     earlySnapShotEndSeqno = list.highestDedupedSeqno;
442
443     /* Mark the snapshot range on linked list. The range that can be read by the
444        iterator is inclusive of the start and the end. */
445     list.readRange =
446             SeqRange(currIt->getBySeqno(), list.seqList.back().getBySeqno());
447
448     /* Keep the range in the iterator obj. We store the range end seqno as one
449        higher than the end seqno that can be read by this iterator.
450        This is because, we must identify the end point of the iterator, and
451        we the read is inclusive of the end points of list.readRange.
452
453        Further, since use the class 'SeqRange' for 'itrRange' we cannot use
454        curr() == end() + 1 to identify the end point because 'SeqRange' does
455        not internally allow curr > end */
456     itrRange = SeqRange(currIt->getBySeqno(),
457                         list.seqList.back().getBySeqno() + 1);
458 }
459
460 BasicLinkedList::RangeIteratorLL::~RangeIteratorLL() {
461     std::lock_guard<SpinLock> lh(list.rangeLock);
462     list.readRange.reset();
463     /* As readLockHolder goes out of scope here, it will automatically release
464        the snapshot read lock on the linked list */
465 }
466
467 OrderedStoredValue& BasicLinkedList::RangeIteratorLL::operator*() const {
468     if (curr() >= end()) {
469         /* We can't read beyond the range end */
470         throw std::out_of_range(
471                 "BasicLinkedList::RangeIteratorLL::operator*()"
472                 ": Trying to read beyond range end seqno " +
473                 std::to_string(end()));
474     }
475     return *currIt;
476 }
477
478 BasicLinkedList::RangeIteratorLL& BasicLinkedList::RangeIteratorLL::
479 operator++() {
480     if (curr() >= end()) {
481         throw std::out_of_range(
482                 "BasicLinkedList::RangeIteratorLL::operator++()"
483                 ": Trying to move the iterator beyond range end"
484                 " seqno " +
485                 std::to_string(end()));
486     }
487
488     --numRemaining;
489
490     /* Check if the iterator is pointing to the last element. Increment beyond
491        the last element indicates the end of the iteration */
492     if (curr() == itrRange.getEnd() - 1) {
493         std::lock_guard<SpinLock> lh(list.rangeLock);
494         /* Release snapshot read lock on the linked list */
495         list.readRange.reset();
496         readLockHolder.unlock();
497
498         /* Update the begin to end() so the client can see that the iteration
499            has ended */
500         itrRange.setBegin(end());
501         return *this;
502     }
503
504     ++currIt;
505     {
506         /* As the iterator moves we reduce the snapshot range being read on the
507            linked list. This helps reduce the stale items in the list during
508            heavy update load from the front end */
509         std::lock_guard<SpinLock> lh(list.rangeLock);
510         list.readRange.setBegin(currIt->getBySeqno());
511     }
512
513     /* Also update the current range stored in the iterator obj */
514     itrRange.setBegin(currIt->getBySeqno());
515
516     return *this;
517 }