a1b8fc355ec05e78fb41d0c3aa7bff5cfbc7a852
[ep-engine.git] / src / ephemeral_vb.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "ephemeral_vb.h"
19
20 #include "dcp/backfill_memory.h"
21 #include "ephemeral_tombstone_purger.h"
22 #include "failover-table.h"
23 #include "linked_list.h"
24 #include "stored_value_factories.h"
25 #include "vbucketdeletiontask.h"
26
27 EphemeralVBucket::EphemeralVBucket(id_type i,
28                                    vbucket_state_t newState,
29                                    EPStats& st,
30                                    CheckpointConfig& chkConfig,
31                                    KVShard* kvshard,
32                                    int64_t lastSeqno,
33                                    uint64_t lastSnapStart,
34                                    uint64_t lastSnapEnd,
35                                    std::unique_ptr<FailoverTable> table,
36                                    NewSeqnoCallback newSeqnoCb,
37                                    Configuration& config,
38                                    item_eviction_policy_t evictionPolicy,
39                                    vbucket_state_t initState,
40                                    uint64_t purgeSeqno,
41                                    uint64_t maxCas,
42                                    const std::string& collectionsManifest)
43     : VBucket(i,
44               newState,
45               st,
46               chkConfig,
47               lastSeqno,
48               lastSnapStart,
49               lastSnapEnd,
50               std::move(table),
51               /*flusherCb*/ nullptr,
52               std::make_unique<OrderedStoredValueFactory>(st),
53               std::move(newSeqnoCb),
54               config,
55               evictionPolicy,
56               initState,
57               purgeSeqno,
58               maxCas,
59               collectionsManifest),
60       seqList(std::make_unique<BasicLinkedList>(i, st)) {
61 }
62
63 size_t EphemeralVBucket::getNumItems() const {
64     return ht.getNumInMemoryItems() - ht.getNumDeletedItems();
65 }
66
67 void EphemeralVBucket::completeStatsVKey(
68         const DocKey& key, const RememberingCallback<GetValue>& gcb) {
69     throw std::logic_error(
70             "EphemeralVBucket::completeStatsVKey() is not valid call. "
71             "Called on vb " +
72             std::to_string(getId()) + "for key: " +
73             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
74 }
75
76 bool EphemeralVBucket::pageOut(const HashTable::HashBucketLock& lh,
77                                StoredValue*& v) {
78     // We only delete from active vBuckets to ensure that replicas stay in
79     // sync with the active (the delete from active is sent via DCP to the
80     // the replicas as an explicit delete).
81     if (getState() != vbucket_state_active) {
82         return false;
83     }
84     if (v->isDeleted() && !v->getValue()) {
85         // If the item has already been deleted (and doesn't have a value
86         // associated with it) then there's no further deletion possible,
87         // until the deletion marker (tombstone) is later purged at the
88         // metadata purge internal.
89         return false;
90     }
91     VBQueueItemCtx queueCtx(GenerateBySeqno::Yes,
92                             GenerateCas::Yes,
93                             TrackCasDrift::No,
94                             /*isBackfill*/ false,
95                             nullptr);
96     v->setRevSeqno(v->getRevSeqno() + 1);
97     StoredValue* newSv;
98     VBNotifyCtx notifyCtx;
99     std::tie(newSv, notifyCtx) = softDeleteStoredValue(
100             lh, *v, /*onlyMarkDeleted*/ false, queueCtx, 0);
101     ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno());
102     notifyNewSeqno(notifyCtx);
103
104     autoDeleteCount++;
105
106     return true;
107 }
108
109 void EphemeralVBucket::addStats(bool details,
110                                 ADD_STAT add_stat,
111                                 const void* c) {
112     // Include base class statistics:
113     _addStats(details, add_stat, c);
114
115     if (details) {
116         // Ephemeral-specific details
117         addStat("auto_delete_count", autoDeleteCount.load(), add_stat, c);
118         addStat("seqlist_count", seqList->getNumItems(), add_stat, c);
119         addStat("seqlist_deleted_count",
120                 seqList->getNumDeletedItems(),
121                 add_stat,
122                 c);
123         addStat("seqlist_high_seqno", seqList->getHighSeqno(), add_stat, c);
124         addStat("seqlist_highest_deduped_seqno",
125                 seqList->getHighestDedupedSeqno(),
126                 add_stat,
127                 c);
128         const auto rr_begin = seqList->getRangeReadBegin();
129         const auto rr_end = seqList->getRangeReadEnd();
130         addStat("seqlist_range_read_begin", rr_begin, add_stat, c);
131         addStat("seqlist_range_read_end", rr_end, add_stat, c);
132         addStat("seqlist_range_read_count", rr_end - rr_begin, add_stat, c);
133         addStat("seqlist_stale_count",
134                 seqList->getNumStaleItems(),
135                 add_stat,
136                 c);
137         addStat("seqlist_stale_value_bytes",
138                 seqList->getStaleValueBytes(),
139                 add_stat,
140                 c);
141         addStat("seqlist_stale_metadata_bytes",
142                 seqList->getStaleValueBytes(),
143                 add_stat,
144                 c);
145     }
146 }
147
148 void EphemeralVBucket::dump() const {
149     std::cerr << "EphemeralVBucket[" << this
150               << "] with state:" << toString(getState())
151               << " numItems:" << getNumItems()
152               << std::endl;
153     seqList->dump();
154     std::cerr << ht << std::endl;
155 }
156
157 ENGINE_ERROR_CODE EphemeralVBucket::completeBGFetchForSingleItem(
158         const DocKey& key,
159         const VBucketBGFetchItem& fetched_item,
160         const ProcessClock::time_point startTime) {
161     /* [EPHE TODO]: Just return error code and make all the callers handle it */
162     throw std::logic_error(
163             "EphemeralVBucket::completeBGFetchForSingleItem() "
164             "is not valid. Called on vb " +
165             std::to_string(getId()) + "for key: " +
166             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
167 }
168
169 void EphemeralVBucket::resetStats() {
170     autoDeleteCount.reset();
171 }
172
173 vb_bgfetch_queue_t EphemeralVBucket::getBGFetchItems() {
174     throw std::logic_error(
175             "EphemeralVBucket::getBGFetchItems() is not valid. "
176             "Called on vb " +
177             std::to_string(getId()));
178 }
179
180 bool EphemeralVBucket::hasPendingBGFetchItems() {
181     throw std::logic_error(
182             "EphemeralVBucket::hasPendingBGFetchItems() is not valid. "
183             "Called on vb " +
184             std::to_string(getId()));
185 }
186
187 HighPriorityVBReqStatus EphemeralVBucket::checkAddHighPriorityVBEntry(
188         uint64_t seqnoOrChkId,
189         const void* cookie,
190         HighPriorityVBNotify reqType) {
191     if (reqType == HighPriorityVBNotify::ChkPersistence) {
192         return HighPriorityVBReqStatus::NotSupported;
193     }
194
195     {
196         /* Serialize the request with sequence lock */
197         std::lock_guard<std::mutex> lh(sequenceLock);
198
199         if (seqnoOrChkId <= getPersistenceSeqno()) {
200             /* Need not notify asynchronously as the vb already has the
201                requested seqno */
202             return HighPriorityVBReqStatus::RequestNotScheduled;
203         }
204
205         addHighPriorityVBEntry(seqnoOrChkId, cookie, reqType);
206     }
207
208     return HighPriorityVBReqStatus::RequestScheduled;
209 }
210
211 void EphemeralVBucket::notifyHighPriorityRequests(
212         EventuallyPersistentEngine& engine,
213         uint64_t idNum,
214         HighPriorityVBNotify notifyType) {
215     throw std::logic_error(
216             "EphemeralVBucket::notifyHighPriorityRequests() is not valid. "
217             "Called on vb " +
218             std::to_string(getId()));
219 }
220
221 void EphemeralVBucket::notifyAllPendingConnsFailed(
222         EventuallyPersistentEngine& e) {
223     auto toNotify = tmpFailAndGetAllHpNotifies(e);
224
225     for (auto& notify : toNotify) {
226         e.notifyIOComplete(notify.first, notify.second);
227     }
228
229     fireAllOps(e);
230 }
231
232 std::unique_ptr<DCPBackfill> EphemeralVBucket::createDCPBackfill(
233         EventuallyPersistentEngine& e,
234         const active_stream_t& stream,
235         uint64_t startSeqno,
236         uint64_t endSeqno) {
237     /* create a memory backfill object */
238     EphemeralVBucketPtr evb =
239             std::static_pointer_cast<EphemeralVBucket>(shared_from_this());
240     return std::make_unique<DCPBackfillMemory>(
241             evb, stream, startSeqno, endSeqno);
242 }
243
244 std::tuple<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>, seqno_t>
245 EphemeralVBucket::inMemoryBackfill(uint64_t start, uint64_t end) {
246     return seqList->rangeRead(start, end);
247 }
248
249 /* Vb level backfill queue is for items in a huge snapshot (disk backfill
250    snapshots from DCP are typically huge) that could not be fit on a
251    checkpoint. They update all stats, checkpoint seqno, but are not put
252    on checkpoint and are directly persisted from the queue.
253
254    In ephemeral buckets we must not add backfill items from DCP (on
255    replica vbuckets), to the vb backfill queue because we have put them on
256    linkedlist already. Also we do not have the flusher task to drain the
257    items from that queue.
258    (Unlike checkpoints, the items in this queue is not cleaned up
259     in a background cleanup task).
260
261    But we must be careful to update certain stats and checkpoint seqno
262    like in a regular couchbase bucket. */
263 void EphemeralVBucket::queueBackfillItem(
264         queued_item& qi, const GenerateBySeqno generateBySeqno) {
265     if (GenerateBySeqno::Yes == generateBySeqno) {
266         qi->setBySeqno(checkpointManager.nextBySeqno());
267     } else {
268         checkpointManager.setBySeqno(qi->getBySeqno());
269     }
270     ++stats.totalEnqueued;
271     stats.memOverhead->fetch_add(sizeof(queued_item));
272 }
273
274 size_t EphemeralVBucket::purgeTombstones(rel_time_t purgeAge) {
275     // First mark all deleted items in the HashTable which can be purged as
276     // Stale - this removes them from the HashTable, transferring ownership to
277     // SequenceList.
278     HTTombstonePurger purger(*this, purgeAge);
279     ht.visit(purger);
280
281     // Secondly iterate over the sequence list and delete any stale items
282     auto seqListPurged = seqList->purgeTombstones();
283
284     // Update stats and return.
285     htDeletedPurgeCount += purger.getNumPurged();
286     seqListPurgeCount += seqListPurged;
287     setPurgeSeqno(seqList->getHighestPurgedDeletedSeqno());
288
289     return seqListPurged;
290 }
291
292 std::tuple<StoredValue*, MutationStatus, VBNotifyCtx>
293 EphemeralVBucket::updateStoredValue(const HashTable::HashBucketLock& hbl,
294                                     StoredValue& v,
295                                     const Item& itm,
296                                     const VBQueueItemCtx* queueItmCtx) {
297     std::lock_guard<std::mutex> lh(sequenceLock);
298
299     const bool oldValueDeleted = v.isDeleted();
300     const bool recreatingDeletedItem = v.isDeleted() && !itm.isDeleted();
301
302     SequenceList::UpdateStatus res;
303     VBNotifyCtx notifyCtx;
304     StoredValue* newSv = &v;
305     StoredValue::UniquePtr ownedSv;
306     MutationStatus status(MutationStatus::WasClean);
307
308     {
309         // Once we update the seqList, there is a short period where the
310         // highSeqno and highestDedupedSeqno are both incorrect. We have to hold
311         // this lock to prevent a new rangeRead starting, and covering an
312         // inconsistent range.
313         std::lock_guard<std::mutex> listWriteLg(seqList->getListWriteLock());
314
315         /* Update the OrderedStoredValue in hash table + Ordered data structure
316            (list) */
317         res = seqList->updateListElem(
318                 lh, listWriteLg, *(v.toOrderedStoredValue()));
319
320         switch (res) {
321         case SequenceList::UpdateStatus::Success:
322             /* OrderedStoredValue moved to end of the list, just update its
323                value */
324             status = ht.unlocked_updateStoredValue(hbl.getHTLock(), v, itm);
325             break;
326
327         case SequenceList::UpdateStatus::Append: {
328             /* OrderedStoredValue cannot be moved to end of the list,
329                due to a range read. Hence, release the storedvalue from the
330                hash table, indicate the list to mark the OrderedStoredValue
331                stale (old duplicate) and add a new StoredValue for the itm.
332
333                Note: It is important to remove item from hash table before
334                      marking stale because once marked stale list assumes the
335                      ownership of the item and may delete it anytime. */
336             /* Release current storedValue from hash table */
337             /* [EPHE TODO]: Write a HT func to release the StoredValue directly
338                             than taking key as a param and deleting
339                             (MB-23184) */
340             ownedSv = ht.unlocked_release(hbl, v.getKey());
341
342             /* Add a new storedvalue for the item */
343             newSv = ht.unlocked_addNewStoredValue(hbl, itm);
344
345             seqList->appendToList(
346                     lh, listWriteLg, *(newSv->toOrderedStoredValue()));
347         } break;
348         }
349
350         if (queueItmCtx) {
351             /* Put on checkpoint mgr */
352             notifyCtx = queueDirty(*newSv, *queueItmCtx);
353         }
354
355         /* Update the high seqno in the sequential storage */
356         auto& osv = *(newSv->toOrderedStoredValue());
357         seqList->updateHighSeqno(listWriteLg, osv);
358         seqList->updateHighestDedupedSeqno(listWriteLg, osv);
359
360         if (res == SequenceList::UpdateStatus::Append) {
361             /* Mark the un-updated storedValue as stale. This must be done after
362                the new storedvalue for the item is visible for range read in the
363                list. This is because we do not want the seqlist to delete the
364                stale item before its latest copy is added to the list.
365                (item becomes visible for range read only after updating the list
366                 with the seqno of the item) */
367             seqList->markItemStale(listWriteLg, std::move(ownedSv), newSv);
368         }
369     }
370
371     if (recreatingDeletedItem) {
372         ++opsCreate;
373     } else {
374         ++opsUpdate;
375     }
376
377     seqList->updateNumDeletedItems(oldValueDeleted, itm.isDeleted());
378
379     return std::make_tuple(newSv, status, notifyCtx);
380 }
381
382 std::pair<StoredValue*, VBNotifyCtx> EphemeralVBucket::addNewStoredValue(
383         const HashTable::HashBucketLock& hbl,
384         const Item& itm,
385         const VBQueueItemCtx* queueItmCtx) {
386     StoredValue* v = ht.unlocked_addNewStoredValue(hbl, itm);
387
388     std::lock_guard<std::mutex> lh(sequenceLock);
389
390     OrderedStoredValue* osv;
391     try {
392         osv = v->toOrderedStoredValue();
393     } catch (const std::bad_cast& e) {
394         throw std::logic_error(
395                 "EphemeralVBucket::addNewStoredValue(): Error " +
396                 std::string(e.what()) + " for vbucket: " +
397                 std::to_string(getId()) + " for key: " +
398                 std::string(reinterpret_cast<const char*>(v->getKey().data()),
399                             v->getKey().size()));
400     }
401
402     VBNotifyCtx notifyCtx;
403     {
404         std::lock_guard<std::mutex> listWriteLg(seqList->getListWriteLock());
405
406         /* Add to the sequential storage */
407         seqList->appendToList(lh, listWriteLg, *osv);
408
409         if (queueItmCtx) {
410             /* Put on checkpoint mgr */
411             notifyCtx = queueDirty(*v, *queueItmCtx);
412         }
413
414         /* Update the high seqno in the sequential storage */
415         seqList->updateHighSeqno(listWriteLg, *osv);
416     }
417     ++opsCreate;
418
419     seqList->updateNumDeletedItems(false, itm.isDeleted());
420
421     return {v, notifyCtx};
422 }
423
424 std::tuple<StoredValue*, VBNotifyCtx> EphemeralVBucket::softDeleteStoredValue(
425         const HashTable::HashBucketLock& hbl,
426         StoredValue& v,
427         bool onlyMarkDeleted,
428         const VBQueueItemCtx& queueItmCtx,
429         uint64_t bySeqno) {
430     std::lock_guard<std::mutex> lh(sequenceLock);
431
432     StoredValue* newSv = &v;
433     StoredValue::UniquePtr ownedSv;
434
435     const bool oldValueDeleted = v.isDeleted();
436
437     SequenceList::UpdateStatus res;
438     VBNotifyCtx notifyCtx;
439     {
440         // Once we update the seqList, there is a short period where the
441         // highSeqno and highestDedupedSeqno are both incorrect. We have to hold
442         // this lock to prevent a new rangeRead starting, and covering an
443         // inconsistent range.
444         std::lock_guard<std::mutex> listWriteLg(seqList->getListWriteLock());
445
446         /* Update the OrderedStoredValue in hash table + Ordered data structure
447            (list) */
448         res = seqList->updateListElem(
449                 lh, listWriteLg, *(v.toOrderedStoredValue()));
450
451         switch (res) {
452         case SequenceList::UpdateStatus::Success:
453             /* OrderedStoredValue is moved to end of the list, do nothing */
454             break;
455
456         case SequenceList::UpdateStatus::Append: {
457             /* OrderedStoredValue cannot be moved to end of the list,
458                due to a range read. Hence, replace the storedvalue in the
459                hash table with its copy and indicate the list to mark the
460                OrderedStoredValue stale (old duplicate).
461
462                Note: It is important to remove item from hash table before
463                      marking stale because once marked stale list assumes the
464                      ownership of the item and may delete it anytime. */
465
466             /* Release current storedValue from hash table */
467             /* [EPHE TODO]: Write a HT func to replace the StoredValue directly
468                             than taking key as a param and deleting (MB-23184)
469                */
470             std::tie(newSv, ownedSv) = ht.unlocked_replaceByCopy(hbl, v);
471
472             seqList->appendToList(
473                     lh, listWriteLg, *(newSv->toOrderedStoredValue()));
474         } break;
475         }
476
477         /* Delete the storedvalue */
478         ht.unlocked_softDelete(hbl.getHTLock(), *newSv, onlyMarkDeleted);
479
480         if (queueItmCtx.genBySeqno == GenerateBySeqno::No) {
481             newSv->setBySeqno(bySeqno);
482         }
483
484         notifyCtx = queueDirty(*newSv, queueItmCtx);
485
486         /* Update the high seqno in the sequential storage */
487         auto& osv = *(newSv->toOrderedStoredValue());
488         seqList->updateHighSeqno(listWriteLg, osv);
489         seqList->updateHighestDedupedSeqno(listWriteLg, osv);
490
491         if (res == SequenceList::UpdateStatus::Append) {
492             /* Mark the un-updated storedValue as stale. This must be done after
493                the new storedvalue for the item is visible for range read in the
494                list. This is because we do not want the seqlist to delete the
495                stale item before its latest copy is added to the list.
496                (item becomes visible for range read only after updating the list
497                with the seqno of the item) */
498             seqList->markItemStale(listWriteLg, std::move(ownedSv), newSv);
499         }
500     }
501
502     ++opsDelete;
503
504     seqList->updateNumDeletedItems(oldValueDeleted, true);
505
506     return std::make_tuple(newSv, notifyCtx);
507 }
508
509 void EphemeralVBucket::bgFetch(const DocKey& key,
510                                const void* cookie,
511                                EventuallyPersistentEngine& engine,
512                                const int bgFetchDelay,
513                                const bool isMeta) {
514     throw std::logic_error(
515             "EphemeralVBucket::bgFetch() is not valid. Called on vb " +
516             std::to_string(getId()) + "for key: " +
517             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
518 }
519
520 ENGINE_ERROR_CODE
521 EphemeralVBucket::addTempItemAndBGFetch(HashTable::HashBucketLock& hbl,
522                                         const DocKey& key,
523                                         const void* cookie,
524                                         EventuallyPersistentEngine& engine,
525                                         int bgFetchDelay,
526                                         bool metadataOnly,
527                                         bool isReplication) {
528     /* [EPHE TODO]: Just return error code and make all the callers handle it */
529     throw std::logic_error(
530             "EphemeralVBucket::addTempItemAndBGFetch() is not valid. "
531             "Called on vb " +
532             std::to_string(getId()) + "for key: " +
533             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
534 }
535
536 GetValue EphemeralVBucket::getInternalNonResident(
537         const DocKey& key,
538         const void* cookie,
539         EventuallyPersistentEngine& engine,
540         int bgFetchDelay,
541         get_options_t options,
542         const StoredValue& v) {
543     /* We reach here only if the v is deleted and does not have any value */
544     return GetValue();
545 }
546
547 void EphemeralVBucket::setupDeferredDeletion(const void* cookie) {
548     setDeferredDeletionCookie(cookie);
549     setDeferredDeletion(true);
550 }
551
552 void EphemeralVBucket::scheduleDeferredDeletion(
553         EventuallyPersistentEngine& engine) {
554     ExTask task = new VBucketMemoryDeletionTask(engine, this);
555     ExecutorPool::get()->schedule(task);
556 }