BasicLinkedList::operator<<: Count elements in seqlist
[ep-engine.git] / src / linked_list.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "linked_list.h"
19 #include <mutex>
20
21 BasicLinkedList::BasicLinkedList(uint16_t vbucketId, EPStats& st)
22     : SequenceList(),
23       readRange(0, 0),
24       staleSize(0),
25       staleMetaDataSize(0),
26       highSeqno(0),
27       highestDedupedSeqno(0),
28       highestPurgedDeletedSeqno(0),
29       numStaleItems(0),
30       numDeletedItems(0),
31       vbid(vbucketId),
32       st(st) {
33 }
34
35 BasicLinkedList::~BasicLinkedList() {
36     /* Delete stale items here, other items are deleted by the hash
37        table */
38     std::lock_guard<std::mutex> writeGuard(writeLock);
39     seqList.remove_and_dispose_if(
40             [&writeGuard](const OrderedStoredValue& v) {
41                 return v.isStale(writeGuard);
42             },
43             [this](OrderedStoredValue* v) {
44                 this->st.currentSize.fetch_sub(v->metaDataSize());
45                 delete v;
46             });
47
48     /* Erase all the list elements (does not destroy elements, just removes
49        them from the list) */
50     seqList.clear();
51 }
52
53 void BasicLinkedList::appendToList(std::lock_guard<std::mutex>& seqLock,
54                                    OrderedStoredValue& v) {
55     /* Allow only one write to the list at a time */
56     std::lock_guard<std::mutex> lckGd(writeLock);
57
58     seqList.push_back(v);
59 }
60
61 SequenceList::UpdateStatus BasicLinkedList::updateListElem(
62         std::lock_guard<std::mutex>& seqLock, OrderedStoredValue& v) {
63     /* Allow only one write to the list at a time */
64     std::lock_guard<std::mutex> lckGd(writeLock);
65
66     /* Lock that needed for consistent read of SeqRange 'readRange' */
67     std::lock_guard<SpinLock> lh(rangeLock);
68
69     if (readRange.fallsInRange(v.getBySeqno())) {
70         /* Range read is in middle of a point-in-time snapshot, hence we cannot
71            move the element to the end of the list. Return a temp failure */
72         return UpdateStatus::Append;
73     }
74
75     /* Since there is no other reads or writes happenning in this range, we can
76        move the item to the end of the list */
77     auto it = seqList.iterator_to(v);
78     seqList.erase(it);
79     seqList.push_back(v);
80
81     return UpdateStatus::Success;
82 }
83
84 std::pair<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>>
85 BasicLinkedList::rangeRead(seqno_t start, seqno_t end) {
86     std::vector<UniqueItemPtr> empty;
87     if ((start > end) || (start <= 0)) {
88         LOG(EXTENSION_LOG_WARNING,
89             "BasicLinkedList::rangeRead(): "
90             "(vb:%d) ERANGE: start %" PRIi64 " > end %" PRIi64,
91             vbid,
92             start,
93             end);
94         return {ENGINE_ERANGE, std::move(empty)
95                 /* MSVC not happy with std::vector<UniqueItemPtr>() */};
96     }
97
98     /* Allows only 1 rangeRead for now */
99     std::lock_guard<std::mutex> lckGd(rangeReadLock);
100
101     {
102         std::lock_guard<SpinLock> lh(rangeLock);
103         if (start > highSeqno) {
104             LOG(EXTENSION_LOG_WARNING,
105                 "BasicLinkedList::rangeRead(): "
106                 "(vb:%d) ERANGE: start %" PRIi64 " > highSeqno %" PRIi64,
107                 vbid,
108                 start,
109                 static_cast<seqno_t>(highSeqno));
110             /* If the request is for an invalid range, return before iterating
111                through the list */
112             return {ENGINE_ERANGE, std::move(empty)};
113         }
114
115         /* Mark the initial read range */
116         end = std::min(end, static_cast<seqno_t>(highSeqno));
117         end = std::max(end, static_cast<seqno_t>(highestDedupedSeqno));
118         readRange = SeqRange(1, end);
119     }
120
121     /* Read items in the range */
122     std::vector<UniqueItemPtr> items;
123
124     for (const auto& osv : seqList) {
125         if (osv.getBySeqno() > end) {
126             /* we are done */
127             break;
128         }
129
130         {
131             std::lock_guard<SpinLock> lh(rangeLock);
132             readRange.setBegin(osv.getBySeqno()); /* [EPHE TODO]: should we
133                                                      update the min every time ?
134                                                    */
135         }
136
137         if (osv.getBySeqno() < start) {
138             /* skip this item */
139             continue;
140         }
141
142         try {
143             items.push_back(UniqueItemPtr(osv.toItem(false, vbid)));
144         } catch (const std::bad_alloc&) {
145             /* [EPHE TODO]: Do we handle backfill in a better way ?
146                             Perhaps do backfilling partially (that is
147                             backfill ==> stream; backfill ==> stream ..so on )?
148              */
149             LOG(EXTENSION_LOG_WARNING,
150                 "BasicLinkedList::rangeRead(): "
151                 "(vb %d) ENOMEM while trying to copy "
152                 "item with seqno %" PRIi64 "before streaming it",
153                 vbid,
154                 osv.getBySeqno());
155             return {ENGINE_ENOMEM, std::move(empty)};
156         }
157     }
158
159     /* Done with range read, reset the range */
160     {
161         std::lock_guard<SpinLock> lh(rangeLock);
162         readRange.reset();
163     }
164
165     /* Return all the range read items */
166     return {ENGINE_SUCCESS, std::move(items)};
167 }
168
169 void BasicLinkedList::updateHighSeqno(const OrderedStoredValue& v) {
170     std::lock_guard<SpinLock> lh(rangeLock);
171     highSeqno = v.getBySeqno();
172 }
173
174 void BasicLinkedList::markItemStale(StoredValue::UniquePtr ownedSv) {
175     /* Release the StoredValue as BasicLinkedList does not want it to be of
176        owned type */
177     StoredValue* v = ownedSv.release();
178
179     /* Update the stats tracking the memory owned by the list */
180     staleSize.fetch_add(v->size());
181     staleMetaDataSize.fetch_add(v->metaDataSize());
182     st.currentSize.fetch_add(v->metaDataSize());
183
184     ++numStaleItems;
185     {
186         std::lock_guard<std::mutex> writeGuard(writeLock);
187         v->toOrderedStoredValue()->markStale(writeGuard);
188     }
189 }
190
191 size_t BasicLinkedList::purgeTombstones() {
192     // Purge items marked as stale from the seqList.
193     //
194     // Strategy - we try to ensure that this function does not block
195     // frontend-writes (adding new OrderedStoredValues (OSVs) to the seqList).
196     // To achieve this (safely),
197     // we (try to) acquire the rangeReadLock and setup a 'read' range for the
198     // whole of the seqList. This prevents any other readers from iterating
199     // the list (and accessing stale items) while we purge on it; but permits
200     // front-end operations to continue as they:
201     //   a) Only read/modify non-stale items (we only change stale items) and
202     //   b) Do not change the list membership of anything within the read-range.
203     // However, we do need to be careful about what members of OSVs we access
204     // here - the only OSVs we can safely access are ones marked stale as they
205     // are no longer in the HashTable (and hence subject to HashTable locks).
206     // To check if an item is stale we need to acquire the writeLock
207     // (OSV::stale is guarded by it) for each list item. While this isn't
208     // ideal (that's the same lock needed by front-end operations), we can
209     // release the lock between each element so front-end operations can
210     // have the opportunity to acquire it.
211     //
212     // Attempt to acquire the readRangeLock, to block anyone else concurrently
213     // reading from the list while we remove elements from it.
214     std::unique_lock<std::mutex> rrGuard(rangeReadLock, std::try_to_lock);
215     if (!rrGuard) {
216         // If we cannot acquire the lock then another thread is
217         // running a range read. Given these are typically long-running,
218         // return without blocking.
219         return 0;
220     }
221
222     // Determine the start and end iterators.
223     OrderedLL::iterator startIt;
224     OrderedLL::iterator endIt;
225     {
226         std::lock_guard<std::mutex> writeGuard(writeLock);
227         if (seqList.empty()) {
228             // Nothing in sequence list - nothing to purge.
229             return 0;
230         }
231         // Determine the {start} and {end} iterator (inclusive). Note
232         // that (at most) one item is added to the seqList before its
233         // sequence number is set (as the seqno comes from Ckpt
234         // manager); if that is the case (such an item is guaranteed
235         // to not be stale), then move end to the previous item
236         // (i.e. we don't consider this "in-flight" item), as long as
237         // there is at least two elements.
238         startIt = seqList.begin();
239         endIt = std::prev(seqList.end());
240         // Need rangeLock for highSeqno & readRange
241         std::lock_guard<SpinLock> rangeGuard(rangeLock);
242         if ((startIt != endIt) && (!endIt->isStale(writeGuard))) {
243             endIt = std::prev(endIt);
244         }
245         readRange = SeqRange(startIt->getBySeqno(), endIt->getBySeqno());
246     }
247
248     // Iterate across all but the last item in the seqList, looking
249     // for stale items.
250     // Note the for() loop terminates one element before endIt - we
251     // actually want an inclusive iteration but as we are comparing
252     // essentially random addresses (and we don't want to 'touch' the
253     // element after endIt), we loop to one before endIt, then handle
254     // endIt explicilty at the end.
255     // Note(2): Iterator is manually incremented outside the for() loop as it
256     // is invalidated when we erase items.
257     size_t purgedCount = 0;
258     bool stale;
259     for (auto it = startIt; it != endIt;) {
260         {
261             std::lock_guard<std::mutex> writeGuard(writeLock);
262             stale = it->isStale(writeGuard);
263         }
264         // Only stale items are purged.
265         if (!stale) {
266             ++it;
267             continue;
268         }
269
270         // Checks pass, remove from list and delete.
271         it = purgeListElem(it);
272         ++purgedCount;
273     }
274     // Handle the last element.
275     {
276         std::lock_guard<std::mutex> writeGuard(writeLock);
277         stale = endIt->isStale(writeGuard);
278     }
279     if (stale) {
280         purgeListElem(endIt);
281         ++purgedCount;
282     }
283
284     // Complete; reset the readRange.
285     {
286         std::lock_guard<SpinLock> lh(rangeLock);
287         readRange.reset();
288     }
289     return purgedCount;
290 }
291
292 void BasicLinkedList::updateNumDeletedItems(bool oldDeleted, bool newDeleted) {
293     if (oldDeleted && !newDeleted) {
294         --numDeletedItems;
295     } else if (!oldDeleted && newDeleted) {
296         ++numDeletedItems;
297     }
298 }
299
300 uint64_t BasicLinkedList::getNumStaleItems() const {
301     return numStaleItems;
302 }
303
304 size_t BasicLinkedList::getStaleValueBytes() const {
305     return staleSize;
306 }
307
308 size_t BasicLinkedList::getStaleMetadataBytes() const {
309     return staleMetaDataSize;
310 }
311
312 uint64_t BasicLinkedList::getNumDeletedItems() const {
313     std::lock_guard<std::mutex> lckGd(writeLock);
314     return numDeletedItems;
315 }
316
317 uint64_t BasicLinkedList::getNumItems() const {
318     std::lock_guard<std::mutex> lckGd(writeLock);
319     return seqList.size();
320 }
321
322 uint64_t BasicLinkedList::getHighSeqno() const {
323     std::lock_guard<SpinLock> lh(rangeLock);
324     return highSeqno;
325 }
326
327 uint64_t BasicLinkedList::getHighestDedupedSeqno() const {
328     std::lock_guard<std::mutex> lckGd(writeLock);
329     return highestDedupedSeqno;
330 }
331
332 seqno_t BasicLinkedList::getHighestPurgedDeletedSeqno() const {
333     return highestPurgedDeletedSeqno;
334 }
335
336 uint64_t BasicLinkedList::getRangeReadBegin() const {
337     std::lock_guard<SpinLock> lh(rangeLock);
338     return readRange.getBegin();
339 }
340
341 uint64_t BasicLinkedList::getRangeReadEnd() const {
342     std::lock_guard<SpinLock> lh(rangeLock);
343     return readRange.getEnd();
344 }
345
346 void BasicLinkedList::dump() const {
347     std::cerr << *this << std::endl;
348 }
349
350 std::ostream& operator <<(std::ostream& os, const BasicLinkedList& ll) {
351     os << "BasicLinkedList[" << &ll << "] with numItems:" << ll.seqList.size()
352        << " deletedItems:" << ll.numDeletedItems
353        << " staleItems:" << ll.getNumStaleItems()
354        << " highPurgeSeqno:" << ll.getHighestPurgedDeletedSeqno()
355        << " elements:[" << std::endl;
356     size_t count = 0;
357     for (const auto& val : ll.seqList) {
358         os << "    " << val << std::endl;
359         ++count;
360     }
361     os << "] (count:" << count << ")";
362     return os;
363 }
364
365 OrderedLL::iterator BasicLinkedList::purgeListElem(OrderedLL::iterator it) {
366     StoredValue::UniquePtr purged(&*it);
367     {
368         std::lock_guard<std::mutex> lckGd(writeLock);
369         it = seqList.erase(it);
370     }
371
372     /* Update the stats tracking the memory owned by the list */
373     staleSize.fetch_sub(purged->size());
374     staleMetaDataSize.fetch_sub(purged->metaDataSize());
375     st.currentSize.fetch_sub(purged->metaDataSize());
376
377     // Similary for the item counts:
378     --numStaleItems;
379     if (purged->isDeleted()) {
380         --numDeletedItems;
381     }
382
383     if (purged->isDeleted()) {
384         highestPurgedDeletedSeqno = std::max(seqno_t(highestPurgedDeletedSeqno),
385                                              purged->getBySeqno());
386     }
387     return it;
388 }