MB-24094: Add Item to SequenceList on TTL update
[ep-engine.git] / src / ephemeral_vb.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "ephemeral_vb.h"
19
20 #include "dcp/backfill_memory.h"
21 #include "ephemeral_tombstone_purger.h"
22 #include "failover-table.h"
23 #include "linked_list.h"
24 #include "stored_value_factories.h"
25 #include "vbucketdeletiontask.h"
26
27 EphemeralVBucket::EphemeralVBucket(id_type i,
28                                    vbucket_state_t newState,
29                                    EPStats& st,
30                                    CheckpointConfig& chkConfig,
31                                    KVShard* kvshard,
32                                    int64_t lastSeqno,
33                                    uint64_t lastSnapStart,
34                                    uint64_t lastSnapEnd,
35                                    std::unique_ptr<FailoverTable> table,
36                                    NewSeqnoCallback newSeqnoCb,
37                                    Configuration& config,
38                                    item_eviction_policy_t evictionPolicy,
39                                    vbucket_state_t initState,
40                                    uint64_t purgeSeqno,
41                                    uint64_t maxCas,
42                                    const std::string& collectionsManifest)
43     : VBucket(i,
44               newState,
45               st,
46               chkConfig,
47               lastSeqno,
48               lastSnapStart,
49               lastSnapEnd,
50               std::move(table),
51               /*flusherCb*/ nullptr,
52               std::make_unique<OrderedStoredValueFactory>(st),
53               std::move(newSeqnoCb),
54               config,
55               evictionPolicy,
56               initState,
57               purgeSeqno,
58               maxCas,
59               collectionsManifest),
60       seqList(std::make_unique<BasicLinkedList>(i, st)) {
61 }
62
63 size_t EphemeralVBucket::getNumItems() const {
64     return ht.getNumInMemoryItems() - ht.getNumDeletedItems();
65 }
66
67 void EphemeralVBucket::completeStatsVKey(
68         const DocKey& key, const RememberingCallback<GetValue>& gcb) {
69     throw std::logic_error(
70             "EphemeralVBucket::completeStatsVKey() is not valid call. "
71             "Called on vb " +
72             std::to_string(getId()) + "for key: " +
73             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
74 }
75
76 bool EphemeralVBucket::pageOut(const HashTable::HashBucketLock& lh,
77                                StoredValue*& v) {
78     // We only delete from active vBuckets to ensure that replicas stay in
79     // sync with the active (the delete from active is sent via DCP to the
80     // the replicas as an explicit delete).
81     if (getState() != vbucket_state_active) {
82         return false;
83     }
84     if (v->isDeleted() && !v->getValue()) {
85         // If the item has already been deleted (and doesn't have a value
86         // associated with it) then there's no further deletion possible,
87         // until the deletion marker (tombstone) is later purged at the
88         // metadata purge internal.
89         return false;
90     }
91     VBQueueItemCtx queueCtx(GenerateBySeqno::Yes,
92                             GenerateCas::Yes,
93                             TrackCasDrift::No,
94                             /*isBackfill*/ false,
95                             nullptr);
96     v->setRevSeqno(v->getRevSeqno() + 1);
97     StoredValue* newSv;
98     VBNotifyCtx notifyCtx;
99     std::tie(newSv, notifyCtx) = softDeleteStoredValue(
100             lh, *v, /*onlyMarkDeleted*/ false, queueCtx, 0);
101     ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno());
102     notifyNewSeqno(notifyCtx);
103
104     autoDeleteCount++;
105
106     return true;
107 }
108
109 void EphemeralVBucket::addStats(bool details,
110                                 ADD_STAT add_stat,
111                                 const void* c) {
112     // Include base class statistics:
113     _addStats(details, add_stat, c);
114
115     if (details) {
116         // Ephemeral-specific details
117         addStat("auto_delete_count", autoDeleteCount.load(), add_stat, c);
118         addStat("seqlist_count", seqList->getNumItems(), add_stat, c);
119         addStat("seqlist_deleted_count",
120                 seqList->getNumDeletedItems(),
121                 add_stat,
122                 c);
123         addStat("seqlist_high_seqno", seqList->getHighSeqno(), add_stat, c);
124         addStat("seqlist_highest_deduped_seqno",
125                 seqList->getHighestDedupedSeqno(),
126                 add_stat,
127                 c);
128         const auto rr_begin = seqList->getRangeReadBegin();
129         const auto rr_end = seqList->getRangeReadEnd();
130         addStat("seqlist_range_read_begin", rr_begin, add_stat, c);
131         addStat("seqlist_range_read_end", rr_end, add_stat, c);
132         addStat("seqlist_range_read_count", rr_end - rr_begin, add_stat, c);
133         addStat("seqlist_stale_count",
134                 seqList->getNumStaleItems(),
135                 add_stat,
136                 c);
137         addStat("seqlist_stale_value_bytes",
138                 seqList->getStaleValueBytes(),
139                 add_stat,
140                 c);
141         addStat("seqlist_stale_metadata_bytes",
142                 seqList->getStaleValueBytes(),
143                 add_stat,
144                 c);
145     }
146 }
147
148 void EphemeralVBucket::dump() const {
149     std::cerr << "EphemeralVBucket[" << this
150               << "] with state:" << toString(getState())
151               << " numItems:" << getNumItems()
152               << std::endl;
153     seqList->dump();
154     std::cerr << ht << std::endl;
155 }
156
157 ENGINE_ERROR_CODE EphemeralVBucket::completeBGFetchForSingleItem(
158         const DocKey& key,
159         const VBucketBGFetchItem& fetched_item,
160         const ProcessClock::time_point startTime) {
161     /* [EPHE TODO]: Just return error code and make all the callers handle it */
162     throw std::logic_error(
163             "EphemeralVBucket::completeBGFetchForSingleItem() "
164             "is not valid. Called on vb " +
165             std::to_string(getId()) + "for key: " +
166             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
167 }
168
169 void EphemeralVBucket::resetStats() {
170     autoDeleteCount.reset();
171 }
172
173 vb_bgfetch_queue_t EphemeralVBucket::getBGFetchItems() {
174     throw std::logic_error(
175             "EphemeralVBucket::getBGFetchItems() is not valid. "
176             "Called on vb " +
177             std::to_string(getId()));
178 }
179
180 bool EphemeralVBucket::hasPendingBGFetchItems() {
181     throw std::logic_error(
182             "EphemeralVBucket::hasPendingBGFetchItems() is not valid. "
183             "Called on vb " +
184             std::to_string(getId()));
185 }
186
187 HighPriorityVBReqStatus EphemeralVBucket::checkAddHighPriorityVBEntry(
188         uint64_t seqnoOrChkId,
189         const void* cookie,
190         HighPriorityVBNotify reqType) {
191     if (reqType == HighPriorityVBNotify::ChkPersistence) {
192         return HighPriorityVBReqStatus::NotSupported;
193     }
194
195     {
196         /* Serialize the request with sequence lock */
197         std::lock_guard<std::mutex> lh(sequenceLock);
198
199         if (seqnoOrChkId <= getPersistenceSeqno()) {
200             /* Need not notify asynchronously as the vb already has the
201                requested seqno */
202             return HighPriorityVBReqStatus::RequestNotScheduled;
203         }
204
205         addHighPriorityVBEntry(seqnoOrChkId, cookie, reqType);
206     }
207
208     return HighPriorityVBReqStatus::RequestScheduled;
209 }
210
211 void EphemeralVBucket::notifyHighPriorityRequests(
212         EventuallyPersistentEngine& engine,
213         uint64_t idNum,
214         HighPriorityVBNotify notifyType) {
215     throw std::logic_error(
216             "EphemeralVBucket::notifyHighPriorityRequests() is not valid. "
217             "Called on vb " +
218             std::to_string(getId()));
219 }
220
221 void EphemeralVBucket::notifyAllPendingConnsFailed(
222         EventuallyPersistentEngine& e) {
223     auto toNotify = tmpFailAndGetAllHpNotifies(e);
224
225     for (auto& notify : toNotify) {
226         e.notifyIOComplete(notify.first, notify.second);
227     }
228
229     fireAllOps(e);
230 }
231
232 std::unique_ptr<DCPBackfill> EphemeralVBucket::createDCPBackfill(
233         EventuallyPersistentEngine& e,
234         const active_stream_t& stream,
235         uint64_t startSeqno,
236         uint64_t endSeqno) {
237     /* create a memory backfill object */
238     EphemeralVBucketPtr evb =
239             std::static_pointer_cast<EphemeralVBucket>(shared_from_this());
240     return std::make_unique<DCPBackfillMemory>(
241             evb, stream, startSeqno, endSeqno);
242 }
243
244 std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
245 EphemeralVBucket::inMemoryBackfill(uint64_t start, uint64_t end) {
246     return seqList->rangeRead(start, end);
247 }
248
249 /* Vb level backfill queue is for items in a huge snapshot (disk backfill
250    snapshots from DCP are typically huge) that could not be fit on a
251    checkpoint. They update all stats, checkpoint seqno, but are not put
252    on checkpoint and are directly persisted from the queue.
253
254    In ephemeral buckets we must not add backfill items from DCP (on
255    replica vbuckets), to the vb backfill queue because we have put them on
256    linkedlist already. Also we do not have the flusher task to drain the
257    items from that queue.
258    (Unlike checkpoints, the items in this queue is not cleaned up
259     in a background cleanup task).
260
261    But we must be careful to update certain stats and checkpoint seqno
262    like in a regular couchbase bucket. */
263 void EphemeralVBucket::queueBackfillItem(
264         queued_item& qi, const GenerateBySeqno generateBySeqno) {
265     if (GenerateBySeqno::Yes == generateBySeqno) {
266         qi->setBySeqno(checkpointManager.nextBySeqno());
267     } else {
268         checkpointManager.setBySeqno(qi->getBySeqno());
269     }
270     ++stats.totalEnqueued;
271     stats.memOverhead->fetch_add(sizeof(queued_item));
272 }
273
274 size_t EphemeralVBucket::purgeTombstones(rel_time_t purgeAge) {
275     // First mark all deleted items in the HashTable which can be purged as
276     // Stale - this removes them from the HashTable, transferring ownership to
277     // SequenceList.
278     HTTombstonePurger purger(*this, purgeAge);
279     ht.visit(purger);
280
281     // Secondly iterate over the sequence list and delete any stale items
282     auto seqListPurged = seqList->purgeTombstones();
283
284     // Update stats and return.
285     htDeletedPurgeCount += purger.getNumPurged();
286     seqListPurgeCount += seqListPurged;
287     setPurgeSeqno(seqList->getHighestPurgedDeletedSeqno());
288
289     return seqListPurged;
290 }
291
292 std::tuple<StoredValue*, MutationStatus, VBNotifyCtx>
293 EphemeralVBucket::updateStoredValue(const HashTable::HashBucketLock& hbl,
294                                     StoredValue& v,
295                                     const Item& itm,
296                                     const VBQueueItemCtx* queueItmCtx,
297                                     bool justTouch) {
298     std::lock_guard<std::mutex> lh(sequenceLock);
299
300     const bool oldValueDeleted = v.isDeleted();
301     const bool recreatingDeletedItem = v.isDeleted() && !itm.isDeleted();
302
303     SequenceList::UpdateStatus res;
304     VBNotifyCtx notifyCtx;
305     StoredValue* newSv = &v;
306     StoredValue::UniquePtr ownedSv;
307     MutationStatus status(MutationStatus::WasClean);
308
309     {
310         // Once we update the seqList, there is a short period where the
311         // highSeqno and highestDedupedSeqno are both incorrect. We have to hold
312         // this lock to prevent a new rangeRead starting, and covering an
313         // inconsistent range.
314         std::lock_guard<std::mutex> listWriteLg(seqList->getListWriteLock());
315
316         /* Update the OrderedStoredValue in hash table + Ordered data structure
317            (list) */
318         res = seqList->updateListElem(
319                 lh, listWriteLg, *(v.toOrderedStoredValue()));
320
321         switch (res) {
322         case SequenceList::UpdateStatus::Success:
323             /* OrderedStoredValue moved to end of the list, just update its
324                value */
325             status = ht.unlocked_updateStoredValue(hbl.getHTLock(), v, itm);
326             break;
327
328         case SequenceList::UpdateStatus::Append: {
329             /* OrderedStoredValue cannot be moved to end of the list,
330                due to a range read. Hence, release the storedvalue from the
331                hash table, indicate the list to mark the OrderedStoredValue
332                stale (old duplicate) and add a new StoredValue for the itm.
333
334                Note: It is important to remove item from hash table before
335                      marking stale because once marked stale list assumes the
336                      ownership of the item and may delete it anytime. */
337             /* Release current storedValue from hash table */
338             /* [EPHE TODO]: Write a HT func to release the StoredValue directly
339                             than taking key as a param and deleting
340                             (MB-23184) */
341             ownedSv = ht.unlocked_release(hbl, v.getKey());
342
343             /* Add a new storedvalue for the item */
344             newSv = ht.unlocked_addNewStoredValue(hbl, itm);
345
346             seqList->appendToList(
347                     lh, listWriteLg, *(newSv->toOrderedStoredValue()));
348         } break;
349         }
350
351         if (queueItmCtx) {
352             /* Put on checkpoint mgr */
353             notifyCtx = queueDirty(*newSv, *queueItmCtx);
354         }
355
356         /* Update the high seqno in the sequential storage */
357         auto& osv = *(newSv->toOrderedStoredValue());
358         seqList->updateHighSeqno(listWriteLg, osv);
359         seqList->updateHighestDedupedSeqno(listWriteLg, osv);
360
361         if (res == SequenceList::UpdateStatus::Append) {
362             /* Mark the un-updated storedValue as stale. This must be done after
363                the new storedvalue for the item is visible for range read in the
364                list. This is because we do not want the seqlist to delete the
365                stale item before its latest copy is added to the list.
366                (item becomes visible for range read only after updating the list
367                 with the seqno of the item) */
368             seqList->markItemStale(listWriteLg, std::move(ownedSv), newSv);
369         }
370     }
371
372     if (recreatingDeletedItem) {
373         ++opsCreate;
374     } else {
375         ++opsUpdate;
376     }
377
378     seqList->updateNumDeletedItems(oldValueDeleted, itm.isDeleted());
379
380     return std::make_tuple(newSv, status, notifyCtx);
381 }
382
383 std::pair<StoredValue*, VBNotifyCtx> EphemeralVBucket::addNewStoredValue(
384         const HashTable::HashBucketLock& hbl,
385         const Item& itm,
386         const VBQueueItemCtx* queueItmCtx) {
387     StoredValue* v = ht.unlocked_addNewStoredValue(hbl, itm);
388
389     std::lock_guard<std::mutex> lh(sequenceLock);
390
391     OrderedStoredValue* osv;
392     try {
393         osv = v->toOrderedStoredValue();
394     } catch (const std::bad_cast& e) {
395         throw std::logic_error(
396                 "EphemeralVBucket::addNewStoredValue(): Error " +
397                 std::string(e.what()) + " for vbucket: " +
398                 std::to_string(getId()) + " for key: " +
399                 std::string(reinterpret_cast<const char*>(v->getKey().data()),
400                             v->getKey().size()));
401     }
402
403     VBNotifyCtx notifyCtx;
404     {
405         std::lock_guard<std::mutex> listWriteLg(seqList->getListWriteLock());
406
407         /* Add to the sequential storage */
408         seqList->appendToList(lh, listWriteLg, *osv);
409
410         if (queueItmCtx) {
411             /* Put on checkpoint mgr */
412             notifyCtx = queueDirty(*v, *queueItmCtx);
413         }
414
415         /* Update the high seqno in the sequential storage */
416         seqList->updateHighSeqno(listWriteLg, *osv);
417     }
418     ++opsCreate;
419
420     seqList->updateNumDeletedItems(false, itm.isDeleted());
421
422     return {v, notifyCtx};
423 }
424
425 std::tuple<StoredValue*, VBNotifyCtx> EphemeralVBucket::softDeleteStoredValue(
426         const HashTable::HashBucketLock& hbl,
427         StoredValue& v,
428         bool onlyMarkDeleted,
429         const VBQueueItemCtx& queueItmCtx,
430         uint64_t bySeqno) {
431     std::lock_guard<std::mutex> lh(sequenceLock);
432
433     StoredValue* newSv = &v;
434     StoredValue::UniquePtr ownedSv;
435
436     const bool oldValueDeleted = v.isDeleted();
437
438     SequenceList::UpdateStatus res;
439     VBNotifyCtx notifyCtx;
440     {
441         // Once we update the seqList, there is a short period where the
442         // highSeqno and highestDedupedSeqno are both incorrect. We have to hold
443         // this lock to prevent a new rangeRead starting, and covering an
444         // inconsistent range.
445         std::lock_guard<std::mutex> listWriteLg(seqList->getListWriteLock());
446
447         /* Update the OrderedStoredValue in hash table + Ordered data structure
448            (list) */
449         res = seqList->updateListElem(
450                 lh, listWriteLg, *(v.toOrderedStoredValue()));
451
452         switch (res) {
453         case SequenceList::UpdateStatus::Success:
454             /* OrderedStoredValue is moved to end of the list, do nothing */
455             break;
456
457         case SequenceList::UpdateStatus::Append: {
458             /* OrderedStoredValue cannot be moved to end of the list,
459                due to a range read. Hence, replace the storedvalue in the
460                hash table with its copy and indicate the list to mark the
461                OrderedStoredValue stale (old duplicate).
462
463                Note: It is important to remove item from hash table before
464                      marking stale because once marked stale list assumes the
465                      ownership of the item and may delete it anytime. */
466
467             /* Release current storedValue from hash table */
468             /* [EPHE TODO]: Write a HT func to replace the StoredValue directly
469                             than taking key as a param and deleting (MB-23184)
470                */
471             std::tie(newSv, ownedSv) = ht.unlocked_replaceByCopy(hbl, v);
472
473             seqList->appendToList(
474                     lh, listWriteLg, *(newSv->toOrderedStoredValue()));
475         } break;
476         }
477
478         /* Delete the storedvalue */
479         ht.unlocked_softDelete(hbl.getHTLock(), *newSv, onlyMarkDeleted);
480
481         if (queueItmCtx.genBySeqno == GenerateBySeqno::No) {
482             newSv->setBySeqno(bySeqno);
483         }
484
485         notifyCtx = queueDirty(*newSv, queueItmCtx);
486
487         /* Update the high seqno in the sequential storage */
488         auto& osv = *(newSv->toOrderedStoredValue());
489         seqList->updateHighSeqno(listWriteLg, osv);
490         seqList->updateHighestDedupedSeqno(listWriteLg, osv);
491
492         if (res == SequenceList::UpdateStatus::Append) {
493             /* Mark the un-updated storedValue as stale. This must be done after
494                the new storedvalue for the item is visible for range read in the
495                list. This is because we do not want the seqlist to delete the
496                stale item before its latest copy is added to the list.
497                (item becomes visible for range read only after updating the list
498                with the seqno of the item) */
499             seqList->markItemStale(listWriteLg, std::move(ownedSv), newSv);
500         }
501     }
502
503     ++opsDelete;
504
505     seqList->updateNumDeletedItems(oldValueDeleted, true);
506
507     return std::make_tuple(newSv, notifyCtx);
508 }
509
510 void EphemeralVBucket::bgFetch(const DocKey& key,
511                                const void* cookie,
512                                EventuallyPersistentEngine& engine,
513                                const int bgFetchDelay,
514                                const bool isMeta) {
515     throw std::logic_error(
516             "EphemeralVBucket::bgFetch() is not valid. Called on vb " +
517             std::to_string(getId()) + "for key: " +
518             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
519 }
520
521 ENGINE_ERROR_CODE
522 EphemeralVBucket::addTempItemAndBGFetch(HashTable::HashBucketLock& hbl,
523                                         const DocKey& key,
524                                         const void* cookie,
525                                         EventuallyPersistentEngine& engine,
526                                         int bgFetchDelay,
527                                         bool metadataOnly,
528                                         bool isReplication) {
529     /* [EPHE TODO]: Just return error code and make all the callers handle it */
530     throw std::logic_error(
531             "EphemeralVBucket::addTempItemAndBGFetch() is not valid. "
532             "Called on vb " +
533             std::to_string(getId()) + "for key: " +
534             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
535 }
536
537 GetValue EphemeralVBucket::getInternalNonResident(
538         const DocKey& key,
539         const void* cookie,
540         EventuallyPersistentEngine& engine,
541         int bgFetchDelay,
542         get_options_t options,
543         const StoredValue& v) {
544     /* We reach here only if the v is deleted and does not have any value */
545     return GetValue();
546 }
547
548 void EphemeralVBucket::setupDeferredDeletion(const void* cookie) {
549     setDeferredDeletionCookie(cookie);
550     setDeferredDeletion(true);
551 }
552
553 void EphemeralVBucket::scheduleDeferredDeletion(
554         EventuallyPersistentEngine& engine) {
555     ExTask task = new VBucketMemoryDeletionTask(engine, this);
556     ExecutorPool::get()->schedule(task);
557 }