63b3a807ce4335614c7eb56c523b12ea6ce8d6c8
[ep-engine.git] / src / linked_list.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "linked_list.h"
19 #include <mutex>
20
21 BasicLinkedList::BasicLinkedList(uint16_t vbucketId, EPStats& st)
22     : SequenceList(),
23       readRange(0, 0),
24       staleSize(0),
25       staleMetaDataSize(0),
26       highSeqno(0),
27       highestDedupedSeqno(0),
28       highestPurgedDeletedSeqno(0),
29       numStaleItems(0),
30       numDeletedItems(0),
31       vbid(vbucketId),
32       st(st) {
33 }
34
35 BasicLinkedList::~BasicLinkedList() {
36     /* Delete stale items here, other items are deleted by the hash
37        table */
38     std::lock_guard<std::mutex> writeGuard(writeLock);
39     seqList.remove_and_dispose_if(
40             [&writeGuard](const OrderedStoredValue& v) {
41                 return v.isStale(writeGuard);
42             },
43             [this](OrderedStoredValue* v) {
44                 this->st.currentSize.fetch_sub(v->metaDataSize());
45                 delete v;
46             });
47
48     /* Erase all the list elements (does not destroy elements, just removes
49        them from the list) */
50     seqList.clear();
51 }
52
53 void BasicLinkedList::appendToList(std::lock_guard<std::mutex>& seqLock,
54                                    OrderedStoredValue& v) {
55     /* Allow only one write to the list at a time */
56     std::lock_guard<std::mutex> lckGd(writeLock);
57
58     seqList.push_back(v);
59 }
60
61 SequenceList::UpdateStatus BasicLinkedList::updateListElem(
62         std::lock_guard<std::mutex>& seqLock, OrderedStoredValue& v) {
63     /* Allow only one write to the list at a time */
64     std::lock_guard<std::mutex> lckGd(writeLock);
65
66     /* Lock that needed for consistent read of SeqRange 'readRange' */
67     std::lock_guard<SpinLock> lh(rangeLock);
68
69     if (readRange.fallsInRange(v.getBySeqno())) {
70         /* Range read is in middle of a point-in-time snapshot, hence we cannot
71            move the element to the end of the list. Return a temp failure */
72         return UpdateStatus::Append;
73     }
74
75     /* Since there is no other reads or writes happenning in this range, we can
76        move the item to the end of the list */
77     auto it = seqList.iterator_to(v);
78     seqList.erase(it);
79     seqList.push_back(v);
80
81     return UpdateStatus::Success;
82 }
83
84 std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
85 BasicLinkedList::rangeRead(seqno_t start, seqno_t end) {
86     std::vector<UniqueItemPtr> empty;
87     if ((start > end) || (start <= 0)) {
88         LOG(EXTENSION_LOG_WARNING,
89             "BasicLinkedList::rangeRead(): "
90             "(vb:%d) ERANGE: start %" PRIi64 " > end %" PRIi64,
91             vbid,
92             start,
93             end);
94         return std::make_tuple(
95                 ENGINE_ERANGE,
96                 std::move(empty)
97                 /* MSVC not happy with std::vector<UniqueItemPtr>() */,
98                 0);
99     }
100
101     /* Allows only 1 rangeRead for now */
102     std::lock_guard<std::mutex> lckGd(rangeReadLock);
103
104     {
105         std::lock_guard<SpinLock> lh(rangeLock);
106         if (start > highSeqno) {
107             LOG(EXTENSION_LOG_WARNING,
108                 "BasicLinkedList::rangeRead(): "
109                 "(vb:%d) ERANGE: start %" PRIi64 " > highSeqno %" PRIi64,
110                 vbid,
111                 start,
112                 static_cast<seqno_t>(highSeqno));
113             /* If the request is for an invalid range, return before iterating
114                through the list */
115             return std::make_tuple(ENGINE_ERANGE, std::move(empty), 0);
116         }
117
118         /* Mark the initial read range */
119         end = std::min(end, static_cast<seqno_t>(highSeqno));
120         end = std::max(end, static_cast<seqno_t>(highestDedupedSeqno));
121         readRange = SeqRange(1, end);
122     }
123
124     /* Read items in the range */
125     std::vector<UniqueItemPtr> items;
126
127     for (const auto& osv : seqList) {
128         if (osv.getBySeqno() > end) {
129             /* we are done */
130             break;
131         }
132
133         {
134             std::lock_guard<SpinLock> lh(rangeLock);
135             readRange.setBegin(osv.getBySeqno()); /* [EPHE TODO]: should we
136                                                      update the min every time ?
137                                                    */
138         }
139
140         if (osv.getBySeqno() < start) {
141             /* skip this item */
142             continue;
143         }
144
145         try {
146             items.push_back(UniqueItemPtr(osv.toItem(false, vbid)));
147         } catch (const std::bad_alloc&) {
148             /* [EPHE TODO]: Do we handle backfill in a better way ?
149                             Perhaps do backfilling partially (that is
150                             backfill ==> stream; backfill ==> stream ..so on )?
151              */
152             LOG(EXTENSION_LOG_WARNING,
153                 "BasicLinkedList::rangeRead(): "
154                 "(vb %d) ENOMEM while trying to copy "
155                 "item with seqno %" PRIi64 "before streaming it",
156                 vbid,
157                 osv.getBySeqno());
158             return std::make_tuple(ENGINE_ENOMEM, std::move(empty), 0);
159         }
160     }
161
162     /* Done with range read, reset the range */
163     {
164         std::lock_guard<SpinLock> lh(rangeLock);
165         readRange.reset();
166     }
167
168     /* Return all the range read items */
169     return std::make_tuple(ENGINE_SUCCESS, std::move(items), end);
170 }
171
172 void BasicLinkedList::updateHighSeqno(
173         std::lock_guard<std::mutex>& highSeqnoLock,
174         const OrderedStoredValue& v) {
175     highSeqno = v.getBySeqno();
176 }
177 void BasicLinkedList::updateHighestDedupedSeqno(
178         std::lock_guard<std::mutex>& highSeqnoLock,
179         const OrderedStoredValue& v) {
180     highestDedupedSeqno = v.getBySeqno();
181 }
182
183 void BasicLinkedList::markItemStale(StoredValue::UniquePtr ownedSv) {
184     /* Release the StoredValue as BasicLinkedList does not want it to be of
185        owned type */
186     StoredValue* v = ownedSv.release();
187
188     /* Update the stats tracking the memory owned by the list */
189     staleSize.fetch_add(v->size());
190     staleMetaDataSize.fetch_add(v->metaDataSize());
191     st.currentSize.fetch_add(v->metaDataSize());
192
193     ++numStaleItems;
194     {
195         std::lock_guard<std::mutex> writeGuard(writeLock);
196         v->toOrderedStoredValue()->markStale(writeGuard);
197     }
198 }
199
200 size_t BasicLinkedList::purgeTombstones() {
201     // Purge items marked as stale from the seqList.
202     //
203     // Strategy - we try to ensure that this function does not block
204     // frontend-writes (adding new OrderedStoredValues (OSVs) to the seqList).
205     // To achieve this (safely),
206     // we (try to) acquire the rangeReadLock and setup a 'read' range for the
207     // whole of the seqList. This prevents any other readers from iterating
208     // the list (and accessing stale items) while we purge on it; but permits
209     // front-end operations to continue as they:
210     //   a) Only read/modify non-stale items (we only change stale items) and
211     //   b) Do not change the list membership of anything within the read-range.
212     // However, we do need to be careful about what members of OSVs we access
213     // here - the only OSVs we can safely access are ones marked stale as they
214     // are no longer in the HashTable (and hence subject to HashTable locks).
215     // To check if an item is stale we need to acquire the writeLock
216     // (OSV::stale is guarded by it) for each list item. While this isn't
217     // ideal (that's the same lock needed by front-end operations), we can
218     // release the lock between each element so front-end operations can
219     // have the opportunity to acquire it.
220     //
221     // Attempt to acquire the readRangeLock, to block anyone else concurrently
222     // reading from the list while we remove elements from it.
223     std::unique_lock<std::mutex> rrGuard(rangeReadLock, std::try_to_lock);
224     if (!rrGuard) {
225         // If we cannot acquire the lock then another thread is
226         // running a range read. Given these are typically long-running,
227         // return without blocking.
228         return 0;
229     }
230
231     // Determine the start and end iterators.
232     OrderedLL::iterator startIt;
233     OrderedLL::iterator endIt;
234     {
235         std::lock_guard<std::mutex> writeGuard(writeLock);
236         if (seqList.empty()) {
237             // Nothing in sequence list - nothing to purge.
238             return 0;
239         }
240         // Determine the {start} and {end} iterator (inclusive). Note
241         // that (at most) one item is added to the seqList before its
242         // sequence number is set (as the seqno comes from Ckpt
243         // manager); if that is the case (such an item is guaranteed
244         // to not be stale), then move end to the previous item
245         // (i.e. we don't consider this "in-flight" item), as long as
246         // there is at least two elements.
247         startIt = seqList.begin();
248         endIt = std::prev(seqList.end());
249         // Need rangeLock for highSeqno & readRange
250         std::lock_guard<SpinLock> rangeGuard(rangeLock);
251         if ((startIt != endIt) && (!endIt->isStale(writeGuard))) {
252             endIt = std::prev(endIt);
253         }
254         readRange = SeqRange(startIt->getBySeqno(), endIt->getBySeqno());
255     }
256
257     // Iterate across all but the last item in the seqList, looking
258     // for stale items.
259     // Note the for() loop terminates one element before endIt - we
260     // actually want an inclusive iteration but as we are comparing
261     // essentially random addresses (and we don't want to 'touch' the
262     // element after endIt), we loop to one before endIt, then handle
263     // endIt explicilty at the end.
264     // Note(2): Iterator is manually incremented outside the for() loop as it
265     // is invalidated when we erase items.
266     size_t purgedCount = 0;
267     bool stale;
268     for (auto it = startIt; it != endIt;) {
269         {
270             std::lock_guard<std::mutex> writeGuard(writeLock);
271             stale = it->isStale(writeGuard);
272         }
273         // Only stale items are purged.
274         if (!stale) {
275             ++it;
276             continue;
277         }
278
279         // Checks pass, remove from list and delete.
280         it = purgeListElem(it);
281         ++purgedCount;
282     }
283     // Handle the last element.
284     {
285         std::lock_guard<std::mutex> writeGuard(writeLock);
286         stale = endIt->isStale(writeGuard);
287     }
288     if (stale) {
289         purgeListElem(endIt);
290         ++purgedCount;
291     }
292
293     // Complete; reset the readRange.
294     {
295         std::lock_guard<SpinLock> lh(rangeLock);
296         readRange.reset();
297     }
298     return purgedCount;
299 }
300
301 void BasicLinkedList::updateNumDeletedItems(bool oldDeleted, bool newDeleted) {
302     if (oldDeleted && !newDeleted) {
303         --numDeletedItems;
304     } else if (!oldDeleted && newDeleted) {
305         ++numDeletedItems;
306     }
307 }
308
309 uint64_t BasicLinkedList::getNumStaleItems() const {
310     return numStaleItems;
311 }
312
313 size_t BasicLinkedList::getStaleValueBytes() const {
314     return staleSize;
315 }
316
317 size_t BasicLinkedList::getStaleMetadataBytes() const {
318     return staleMetaDataSize;
319 }
320
321 uint64_t BasicLinkedList::getNumDeletedItems() const {
322     std::lock_guard<std::mutex> lckGd(writeLock);
323     return numDeletedItems;
324 }
325
326 uint64_t BasicLinkedList::getNumItems() const {
327     std::lock_guard<std::mutex> lckGd(writeLock);
328     return seqList.size();
329 }
330
331 uint64_t BasicLinkedList::getHighSeqno() const {
332     std::lock_guard<std::mutex> lckGd(highSeqnosLock);
333     return highSeqno;
334 }
335
336 uint64_t BasicLinkedList::getHighestDedupedSeqno() const {
337     std::lock_guard<std::mutex> lckGd(highSeqnosLock);
338     return highestDedupedSeqno;
339 }
340
341 seqno_t BasicLinkedList::getHighestPurgedDeletedSeqno() const {
342     return highestPurgedDeletedSeqno;
343 }
344
345 uint64_t BasicLinkedList::getRangeReadBegin() const {
346     std::lock_guard<SpinLock> lh(rangeLock);
347     return readRange.getBegin();
348 }
349
350 uint64_t BasicLinkedList::getRangeReadEnd() const {
351     std::lock_guard<SpinLock> lh(rangeLock);
352     return readRange.getEnd();
353 }
354 std::mutex& BasicLinkedList::getHighSeqnosLock() const {
355     return highSeqnosLock;
356 }
357
358 void BasicLinkedList::dump() const {
359     std::cerr << *this << std::endl;
360 }
361
362 std::ostream& operator <<(std::ostream& os, const BasicLinkedList& ll) {
363     os << "BasicLinkedList[" << &ll << "] with numItems:" << ll.seqList.size()
364        << " deletedItems:" << ll.numDeletedItems
365        << " staleItems:" << ll.getNumStaleItems()
366        << " highPurgeSeqno:" << ll.getHighestPurgedDeletedSeqno()
367        << " elements:[" << std::endl;
368     size_t count = 0;
369     for (const auto& val : ll.seqList) {
370         os << "    " << val << std::endl;
371         ++count;
372     }
373     os << "] (count:" << count << ")";
374     return os;
375 }
376
377 OrderedLL::iterator BasicLinkedList::purgeListElem(OrderedLL::iterator it) {
378     StoredValue::UniquePtr purged(&*it);
379     {
380         std::lock_guard<std::mutex> lckGd(writeLock);
381         it = seqList.erase(it);
382     }
383
384     /* Update the stats tracking the memory owned by the list */
385     staleSize.fetch_sub(purged->size());
386     staleMetaDataSize.fetch_sub(purged->metaDataSize());
387     st.currentSize.fetch_sub(purged->metaDataSize());
388
389     // Similary for the item counts:
390     --numStaleItems;
391     if (purged->isDeleted()) {
392         --numDeletedItems;
393     }
394
395     if (purged->isDeleted()) {
396         highestPurgedDeletedSeqno = std::max(seqno_t(highestPurgedDeletedSeqno),
397                                              purged->getBySeqno());
398     }
399     return it;
400 }