b7c8a9b159f63001b61dcbb3fc30de21b1f0d51c
[ep-engine.git] / src / linked_list.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "linked_list.h"
19 #include <mutex>
20
21 BasicLinkedList::BasicLinkedList(uint16_t vbucketId, EPStats& st)
22     : SequenceList(),
23       readRange(0, 0),
24       staleSize(0),
25       staleMetaDataSize(0),
26       highSeqno(0),
27       highestDedupedSeqno(0),
28       highestPurgedDeletedSeqno(0),
29       numStaleItems(0),
30       numDeletedItems(0),
31       vbid(vbucketId),
32       st(st) {
33 }
34
35 BasicLinkedList::~BasicLinkedList() {
36     /* Delete stale items here, other items are deleted by the hash
37        table */
38     std::lock_guard<std::mutex> writeGuard(getListWriteLock());
39     seqList.remove_and_dispose_if(
40             [&writeGuard](const OrderedStoredValue& v) {
41                 return v.isStale(writeGuard);
42             },
43             [this](OrderedStoredValue* v) {
44                 this->st.currentSize.fetch_sub(v->metaDataSize());
45                 delete v;
46             });
47
48     /* Erase all the list elements (does not destroy elements, just removes
49        them from the list) */
50     seqList.clear();
51 }
52
53 void BasicLinkedList::appendToList(std::lock_guard<std::mutex>& seqLock,
54                                    std::lock_guard<std::mutex>& writeLock,
55                                    OrderedStoredValue& v) {
56     seqList.push_back(v);
57 }
58
59 SequenceList::UpdateStatus BasicLinkedList::updateListElem(
60         std::lock_guard<std::mutex>& seqLock,
61         std::lock_guard<std::mutex>& writeLock,
62         OrderedStoredValue& v) {
63     /* Lock that needed for consistent read of SeqRange 'readRange' */
64     std::lock_guard<SpinLock> lh(rangeLock);
65
66     if (readRange.fallsInRange(v.getBySeqno())) {
67         /* Range read is in middle of a point-in-time snapshot, hence we cannot
68            move the element to the end of the list. Return a temp failure */
69         return UpdateStatus::Append;
70     }
71
72     /* Since there is no other reads or writes happenning in this range, we can
73        move the item to the end of the list */
74     auto it = seqList.iterator_to(v);
75     seqList.erase(it);
76     seqList.push_back(v);
77
78     return UpdateStatus::Success;
79 }
80
81 std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
82 BasicLinkedList::rangeRead(seqno_t start, seqno_t end) {
83     std::vector<UniqueItemPtr> empty;
84     if ((start > end) || (start <= 0)) {
85         LOG(EXTENSION_LOG_WARNING,
86             "BasicLinkedList::rangeRead(): "
87             "(vb:%d) ERANGE: start %" PRIi64 " > end %" PRIi64,
88             vbid,
89             start,
90             end);
91         return std::make_tuple(
92                 ENGINE_ERANGE,
93                 std::move(empty)
94                 /* MSVC not happy with std::vector<UniqueItemPtr>() */,
95                 0);
96     }
97
98     /* Allows only 1 rangeRead for now */
99     std::lock_guard<std::mutex> lckGd(rangeReadLock);
100
101     {
102         std::lock_guard<std::mutex> listWriteLg(getListWriteLock());
103         std::lock_guard<SpinLock> lh(rangeLock);
104         if (start > highSeqno) {
105             LOG(EXTENSION_LOG_WARNING,
106                 "BasicLinkedList::rangeRead(): "
107                 "(vb:%d) ERANGE: start %" PRIi64 " > highSeqno %" PRIi64,
108                 vbid,
109                 start,
110                 static_cast<seqno_t>(highSeqno));
111             /* If the request is for an invalid range, return before iterating
112                through the list */
113             return std::make_tuple(ENGINE_ERANGE, std::move(empty), 0);
114         }
115
116         /* Mark the initial read range */
117         end = std::min(end, static_cast<seqno_t>(highSeqno));
118         end = std::max(end, static_cast<seqno_t>(highestDedupedSeqno));
119         readRange = SeqRange(1, end);
120     }
121
122     /* Read items in the range */
123     std::vector<UniqueItemPtr> items;
124
125     for (const auto& osv : seqList) {
126         if (osv.getBySeqno() > end) {
127             /* we are done */
128             break;
129         }
130
131         {
132             std::lock_guard<SpinLock> lh(rangeLock);
133             readRange.setBegin(osv.getBySeqno()); /* [EPHE TODO]: should we
134                                                      update the min every time ?
135                                                    */
136         }
137
138         if (osv.getBySeqno() < start) {
139             /* skip this item */
140             continue;
141         }
142
143         /* Check if this OSV has been made stale and has been superseded by a
144          * newer version. If it has, and the replacement is /also/ in the range
145          * we are reading, we should skip this item to avoid duplicates */
146         StoredValue* replacement;
147         {
148             std::lock_guard<std::mutex> writeGuard(getListWriteLock());
149             replacement = osv.getReplacementIfStale(writeGuard);
150         }
151
152         if (replacement &&
153             replacement->toOrderedStoredValue()->getBySeqno() <= end) {
154             continue;
155         }
156
157         try {
158             items.push_back(UniqueItemPtr(osv.toItem(false, vbid)));
159         } catch (const std::bad_alloc&) {
160             /* [EPHE TODO]: Do we handle backfill in a better way ?
161                             Perhaps do backfilling partially (that is
162                             backfill ==> stream; backfill ==> stream ..so on )?
163              */
164             LOG(EXTENSION_LOG_WARNING,
165                 "BasicLinkedList::rangeRead(): "
166                 "(vb %d) ENOMEM while trying to copy "
167                 "item with seqno %" PRIi64 "before streaming it",
168                 vbid,
169                 osv.getBySeqno());
170             return std::make_tuple(ENGINE_ENOMEM, std::move(empty), 0);
171         }
172     }
173
174     /* Done with range read, reset the range */
175     {
176         std::lock_guard<SpinLock> lh(rangeLock);
177         readRange.reset();
178     }
179
180     /* Return all the range read items */
181     return std::make_tuple(ENGINE_SUCCESS, std::move(items), end);
182 }
183
184 void BasicLinkedList::updateHighSeqno(std::lock_guard<std::mutex>& listWriteLg,
185                                       const OrderedStoredValue& v) {
186     highSeqno = v.getBySeqno();
187 }
188 void BasicLinkedList::updateHighestDedupedSeqno(
189         std::lock_guard<std::mutex>& listWriteLg, const OrderedStoredValue& v) {
190     highestDedupedSeqno = v.getBySeqno();
191 }
192
193 void BasicLinkedList::markItemStale(std::lock_guard<std::mutex>& listWriteLg,
194                                     StoredValue::UniquePtr ownedSv,
195                                     StoredValue* newSv) {
196     /* Release the StoredValue as BasicLinkedList does not want it to be of
197        owned type */
198     StoredValue* v = ownedSv.release();
199
200     /* Update the stats tracking the memory owned by the list */
201     staleSize.fetch_add(v->size());
202     staleMetaDataSize.fetch_add(v->metaDataSize());
203     st.currentSize.fetch_add(v->metaDataSize());
204
205     ++numStaleItems;
206     v->toOrderedStoredValue()->markStale(listWriteLg, newSv);
207 }
208
209 size_t BasicLinkedList::purgeTombstones() {
210     // Purge items marked as stale from the seqList.
211     //
212     // Strategy - we try to ensure that this function does not block
213     // frontend-writes (adding new OrderedStoredValues (OSVs) to the seqList).
214     // To achieve this (safely),
215     // we (try to) acquire the rangeReadLock and setup a 'read' range for the
216     // whole of the seqList. This prevents any other readers from iterating
217     // the list (and accessing stale items) while we purge on it; but permits
218     // front-end operations to continue as they:
219     //   a) Only read/modify non-stale items (we only change stale items) and
220     //   b) Do not change the list membership of anything within the read-range.
221     // However, we do need to be careful about what members of OSVs we access
222     // here - the only OSVs we can safely access are ones marked stale as they
223     // are no longer in the HashTable (and hence subject to HashTable locks).
224     // To check if an item is stale we need to acquire the writeLock
225     // (OSV::stale is guarded by it) for each list item. While this isn't
226     // ideal (that's the same lock needed by front-end operations), we can
227     // release the lock between each element so front-end operations can
228     // have the opportunity to acquire it.
229     //
230     // Attempt to acquire the readRangeLock, to block anyone else concurrently
231     // reading from the list while we remove elements from it.
232     std::unique_lock<std::mutex> rrGuard(rangeReadLock, std::try_to_lock);
233     if (!rrGuard) {
234         // If we cannot acquire the lock then another thread is
235         // running a range read. Given these are typically long-running,
236         // return without blocking.
237         return 0;
238     }
239
240     // Determine the start and end iterators.
241     OrderedLL::iterator startIt;
242     OrderedLL::iterator endIt;
243     {
244         std::lock_guard<std::mutex> writeGuard(getListWriteLock());
245         if (seqList.empty()) {
246             // Nothing in sequence list - nothing to purge.
247             return 0;
248         }
249         // Determine the {start} and {end} iterator (inclusive). Note
250         // that (at most) one item is added to the seqList before its
251         // sequence number is set (as the seqno comes from Ckpt
252         // manager); if that is the case (such an item is guaranteed
253         // to not be stale), then move end to the previous item
254         // (i.e. we don't consider this "in-flight" item), as long as
255         // there is at least two elements.
256         startIt = seqList.begin();
257         endIt = std::prev(seqList.end());
258         // Need rangeLock for highSeqno & readRange
259         std::lock_guard<SpinLock> rangeGuard(rangeLock);
260         if ((startIt != endIt) && (!endIt->isStale(writeGuard))) {
261             endIt = std::prev(endIt);
262         }
263         readRange = SeqRange(startIt->getBySeqno(), endIt->getBySeqno());
264     }
265
266     // Iterate across all but the last item in the seqList, looking
267     // for stale items.
268     // Note the for() loop terminates one element before endIt - we
269     // actually want an inclusive iteration but as we are comparing
270     // essentially random addresses (and we don't want to 'touch' the
271     // element after endIt), we loop to one before endIt, then handle
272     // endIt explicilty at the end.
273     // Note(2): Iterator is manually incremented outside the for() loop as it
274     // is invalidated when we erase items.
275     size_t purgedCount = 0;
276     bool stale;
277     for (auto it = startIt; it != endIt;) {
278         {
279             std::lock_guard<std::mutex> writeGuard(getListWriteLock());
280             stale = it->isStale(writeGuard);
281         }
282         // Only stale items are purged.
283         if (!stale) {
284             ++it;
285             continue;
286         }
287
288         // Checks pass, remove from list and delete.
289         it = purgeListElem(it);
290         ++purgedCount;
291     }
292     // Handle the last element.
293     {
294         std::lock_guard<std::mutex> writeGuard(getListWriteLock());
295         stale = endIt->isStale(writeGuard);
296     }
297     if (stale) {
298         purgeListElem(endIt);
299         ++purgedCount;
300     }
301
302     // Complete; reset the readRange.
303     {
304         std::lock_guard<SpinLock> lh(rangeLock);
305         readRange.reset();
306     }
307     return purgedCount;
308 }
309
310 void BasicLinkedList::updateNumDeletedItems(bool oldDeleted, bool newDeleted) {
311     if (oldDeleted && !newDeleted) {
312         --numDeletedItems;
313     } else if (!oldDeleted && newDeleted) {
314         ++numDeletedItems;
315     }
316 }
317
318 uint64_t BasicLinkedList::getNumStaleItems() const {
319     return numStaleItems;
320 }
321
322 size_t BasicLinkedList::getStaleValueBytes() const {
323     return staleSize;
324 }
325
326 size_t BasicLinkedList::getStaleMetadataBytes() const {
327     return staleMetaDataSize;
328 }
329
330 uint64_t BasicLinkedList::getNumDeletedItems() const {
331     std::lock_guard<std::mutex> lckGd(getListWriteLock());
332     return numDeletedItems;
333 }
334
335 uint64_t BasicLinkedList::getNumItems() const {
336     std::lock_guard<std::mutex> lckGd(getListWriteLock());
337     return seqList.size();
338 }
339
340 uint64_t BasicLinkedList::getHighSeqno() const {
341     std::lock_guard<std::mutex> lckGd(getListWriteLock());
342     return highSeqno;
343 }
344
345 uint64_t BasicLinkedList::getHighestDedupedSeqno() const {
346     std::lock_guard<std::mutex> lckGd(getListWriteLock());
347     return highestDedupedSeqno;
348 }
349
350 seqno_t BasicLinkedList::getHighestPurgedDeletedSeqno() const {
351     return highestPurgedDeletedSeqno;
352 }
353
354 uint64_t BasicLinkedList::getRangeReadBegin() const {
355     std::lock_guard<SpinLock> lh(rangeLock);
356     return readRange.getBegin();
357 }
358
359 uint64_t BasicLinkedList::getRangeReadEnd() const {
360     std::lock_guard<SpinLock> lh(rangeLock);
361     return readRange.getEnd();
362 }
363 std::mutex& BasicLinkedList::getListWriteLock() const {
364     return writeLock;
365 }
366
367 SequenceList::RangeIterator BasicLinkedList::makeRangeIterator() {
368     return SequenceList::RangeIterator(
369             std::make_unique<RangeIteratorLL>(*this));
370 }
371
372 void BasicLinkedList::dump() const {
373     std::cerr << *this << std::endl;
374 }
375
376 std::ostream& operator <<(std::ostream& os, const BasicLinkedList& ll) {
377     os << "BasicLinkedList[" << &ll << "] with numItems:" << ll.seqList.size()
378        << " deletedItems:" << ll.numDeletedItems
379        << " staleItems:" << ll.getNumStaleItems()
380        << " highPurgeSeqno:" << ll.getHighestPurgedDeletedSeqno()
381        << " elements:[" << std::endl;
382     size_t count = 0;
383     for (const auto& val : ll.seqList) {
384         os << "    " << val << std::endl;
385         ++count;
386     }
387     os << "] (count:" << count << ")";
388     return os;
389 }
390
391 OrderedLL::iterator BasicLinkedList::purgeListElem(OrderedLL::iterator it) {
392     StoredValue::UniquePtr purged(&*it);
393     {
394         std::lock_guard<std::mutex> lckGd(getListWriteLock());
395         it = seqList.erase(it);
396     }
397
398     /* Update the stats tracking the memory owned by the list */
399     staleSize.fetch_sub(purged->size());
400     staleMetaDataSize.fetch_sub(purged->metaDataSize());
401     st.currentSize.fetch_sub(purged->metaDataSize());
402
403     // Similary for the item counts:
404     --numStaleItems;
405     if (purged->isDeleted()) {
406         --numDeletedItems;
407     }
408
409     if (purged->isDeleted()) {
410         highestPurgedDeletedSeqno = std::max(seqno_t(highestPurgedDeletedSeqno),
411                                              purged->getBySeqno());
412     }
413     return it;
414 }
415
416 BasicLinkedList::RangeIteratorLL::RangeIteratorLL(BasicLinkedList& ll)
417     : list(ll),
418       readLockHolder(list.rangeReadLock),
419       itrRange(0, 0),
420       numRemaining(0) {
421     std::lock_guard<std::mutex> listWriteLg(list.getListWriteLock());
422     std::lock_guard<SpinLock> lh(list.rangeLock);
423     if (list.highSeqno < 1) {
424         /* No need of holding a lock for the snapshot as there are no items;
425            Also iterator range is at default (0, 0) */
426         readLockHolder.unlock();
427         return;
428     }
429
430     /* Iterator to the beginning of linked list */
431     currIt = list.seqList.begin();
432
433     /* Number of items that can be iterated over */
434     numRemaining = list.seqList.size();
435
436     /* The minimum seqno in the iterator that must be read to get a consistent
437        read snapshot */
438     earlySnapShotEndSeqno = list.highestDedupedSeqno;
439
440     /* Mark the snapshot range on linked list. The range that can be read by the
441        iterator is inclusive of the start and the end. */
442     list.readRange =
443             SeqRange(currIt->getBySeqno(), list.seqList.back().getBySeqno());
444
445     /* Keep the range in the iterator obj. We store the range end seqno as one
446        higher than the end seqno that can be read by this iterator.
447        This is because, we must identify the end point of the iterator, and
448        we the read is inclusive of the end points of list.readRange.
449
450        Further, since use the class 'SeqRange' for 'itrRange' we cannot use
451        curr() == end() + 1 to identify the end point because 'SeqRange' does
452        not internally allow curr > end */
453     itrRange = SeqRange(currIt->getBySeqno(),
454                         list.seqList.back().getBySeqno() + 1);
455 }
456
457 BasicLinkedList::RangeIteratorLL::~RangeIteratorLL() {
458     std::lock_guard<SpinLock> lh(list.rangeLock);
459     list.readRange.reset();
460     /* As readLockHolder goes out of scope here, it will automatically release
461        the snapshot read lock on the linked list */
462 }
463
464 OrderedStoredValue& BasicLinkedList::RangeIteratorLL::operator*() const {
465     if (curr() >= end()) {
466         /* We can't read beyond the range end */
467         throw std::out_of_range(
468                 "BasicLinkedList::RangeIteratorLL::operator*()"
469                 ": Trying to read beyond range end seqno " +
470                 std::to_string(end()));
471     }
472     return *currIt;
473 }
474
475 BasicLinkedList::RangeIteratorLL& BasicLinkedList::RangeIteratorLL::
476 operator++() {
477     if (curr() >= end()) {
478         throw std::out_of_range(
479                 "BasicLinkedList::RangeIteratorLL::operator++()"
480                 ": Trying to move the iterator beyond range end"
481                 " seqno " +
482                 std::to_string(end()));
483     }
484
485     --numRemaining;
486
487     /* Check if the iterator is pointing to the last element. Increment beyond
488        the last element indicates the end of the iteration */
489     if (curr() == itrRange.getEnd() - 1) {
490         std::lock_guard<SpinLock> lh(list.rangeLock);
491         /* Release snapshot read lock on the linked list */
492         list.readRange.reset();
493         readLockHolder.unlock();
494
495         /* Update the begin to end() so the client can see that the iteration
496            has ended */
497         itrRange.setBegin(end());
498         return *this;
499     }
500
501     ++currIt;
502     {
503         /* As the iterator moves we reduce the snapshot range being read on the
504            linked list. This helps reduce the stale items in the list during
505            heavy update load from the front end */
506         std::lock_guard<SpinLock> lh(list.rangeLock);
507         list.readRange.setBegin(currIt->getBySeqno());
508     }
509
510     /* Also update the current range stored in the iterator obj */
511     itrRange.setBegin(currIt->getBySeqno());
512
513     return *this;
514 }