28b1c454cf873b75cd078dfbeb5684f695d86510
[ep-engine.git] / src / ephemeral_vb.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "ephemeral_vb.h"
19
20 #include "dcp/backfill_memory.h"
21 #include "ephemeral_tombstone_purger.h"
22 #include "failover-table.h"
23 #include "linked_list.h"
24 #include "stored_value_factories.h"
25
26 EphemeralVBucket::EphemeralVBucket(id_type i,
27                                    vbucket_state_t newState,
28                                    EPStats& st,
29                                    CheckpointConfig& chkConfig,
30                                    KVShard* kvshard,
31                                    int64_t lastSeqno,
32                                    uint64_t lastSnapStart,
33                                    uint64_t lastSnapEnd,
34                                    std::unique_ptr<FailoverTable> table,
35                                    NewSeqnoCallback newSeqnoCb,
36                                    Configuration& config,
37                                    item_eviction_policy_t evictionPolicy,
38                                    vbucket_state_t initState,
39                                    uint64_t purgeSeqno,
40                                    uint64_t maxCas,
41                                    const std::string& collectionsManifest)
42     : VBucket(i,
43               newState,
44               st,
45               chkConfig,
46               lastSeqno,
47               lastSnapStart,
48               lastSnapEnd,
49               std::move(table),
50               /*flusherCb*/ nullptr,
51               std::make_unique<OrderedStoredValueFactory>(st),
52               std::move(newSeqnoCb),
53               config,
54               evictionPolicy,
55               initState,
56               purgeSeqno,
57               maxCas,
58               collectionsManifest),
59       seqList(std::make_unique<BasicLinkedList>(i, st)) {
60 }
61
62 size_t EphemeralVBucket::getNumItems() const {
63     return ht.getNumInMemoryItems() - ht.getNumDeletedItems();
64 }
65
66 void EphemeralVBucket::completeStatsVKey(
67         const DocKey& key, const RememberingCallback<GetValue>& gcb) {
68     throw std::logic_error(
69             "EphemeralVBucket::completeStatsVKey() is not valid call. "
70             "Called on vb " +
71             std::to_string(getId()) + "for key: " +
72             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
73 }
74
75 bool EphemeralVBucket::pageOut(const HashTable::HashBucketLock& lh,
76                                StoredValue*& v) {
77     // We only delete from active vBuckets to ensure that replicas stay in
78     // sync with the active (the delete from active is sent via DCP to the
79     // the replicas as an explicit delete).
80     if (getState() != vbucket_state_active) {
81         return false;
82     }
83     if (v->isDeleted() && !v->getValue()) {
84         // If the item has already been deleted (and doesn't have a value
85         // associated with it) then there's no further deletion possible,
86         // until the deletion marker (tombstone) is later purged at the
87         // metadata purge internal.
88         return false;
89     }
90     VBQueueItemCtx queueCtx(GenerateBySeqno::Yes,
91                             GenerateCas::Yes,
92                             TrackCasDrift::No,
93                             /*isBackfill*/ false,
94                             nullptr);
95     v->setRevSeqno(v->getRevSeqno() + 1);
96     StoredValue* newSv;
97     VBNotifyCtx notifyCtx;
98     std::tie(newSv, notifyCtx) = softDeleteStoredValue(
99             lh, *v, /*onlyMarkDeleted*/ false, queueCtx, 0);
100     ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno());
101     notifyNewSeqno(notifyCtx);
102
103     autoDeleteCount++;
104
105     return true;
106 }
107
108 void EphemeralVBucket::addStats(bool details,
109                                 ADD_STAT add_stat,
110                                 const void* c) {
111     // Include base class statistics:
112     _addStats(details, add_stat, c);
113
114     if (details) {
115         // Ephemeral-specific details
116         addStat("auto_delete_count", autoDeleteCount.load(), add_stat, c);
117         addStat("seqlist_count", seqList->getNumItems(), add_stat, c);
118         addStat("seqlist_deleted_count",
119                 seqList->getNumDeletedItems(),
120                 add_stat,
121                 c);
122         addStat("seqlist_high_seqno", seqList->getHighSeqno(), add_stat, c);
123         addStat("seqlist_highest_deduped_seqno",
124                 seqList->getHighestDedupedSeqno(),
125                 add_stat,
126                 c);
127         const auto rr_begin = seqList->getRangeReadBegin();
128         const auto rr_end = seqList->getRangeReadEnd();
129         addStat("seqlist_range_read_begin", rr_begin, add_stat, c);
130         addStat("seqlist_range_read_end", rr_end, add_stat, c);
131         addStat("seqlist_range_read_count", rr_end - rr_begin, add_stat, c);
132         addStat("seqlist_stale_count",
133                 seqList->getNumStaleItems(),
134                 add_stat,
135                 c);
136         addStat("seqlist_stale_value_bytes",
137                 seqList->getStaleValueBytes(),
138                 add_stat,
139                 c);
140         addStat("seqlist_stale_metadata_bytes",
141                 seqList->getStaleValueBytes(),
142                 add_stat,
143                 c);
144     }
145 }
146
147 void EphemeralVBucket::dump() const {
148     std::cerr << "EphemeralVBucket[" << this
149               << "] with state: " << toString(getState())
150               << " numItems:" << getNumItems()
151               << std::endl;
152     seqList->dump();
153 }
154
155 ENGINE_ERROR_CODE EphemeralVBucket::completeBGFetchForSingleItem(
156         const DocKey& key,
157         const VBucketBGFetchItem& fetched_item,
158         const ProcessClock::time_point startTime) {
159     /* [EPHE TODO]: Just return error code and make all the callers handle it */
160     throw std::logic_error(
161             "EphemeralVBucket::completeBGFetchForSingleItem() "
162             "is not valid. Called on vb " +
163             std::to_string(getId()) + "for key: " +
164             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
165 }
166
167 void EphemeralVBucket::resetStats() {
168     autoDeleteCount.reset();
169 }
170
171 vb_bgfetch_queue_t EphemeralVBucket::getBGFetchItems() {
172     throw std::logic_error(
173             "EphemeralVBucket::getBGFetchItems() is not valid. "
174             "Called on vb " +
175             std::to_string(getId()));
176 }
177
178 bool EphemeralVBucket::hasPendingBGFetchItems() {
179     throw std::logic_error(
180             "EphemeralVBucket::hasPendingBGFetchItems() is not valid. "
181             "Called on vb " +
182             std::to_string(getId()));
183 }
184
185 HighPriorityVBReqStatus EphemeralVBucket::checkAddHighPriorityVBEntry(
186         uint64_t seqnoOrChkId,
187         const void* cookie,
188         HighPriorityVBNotify reqType) {
189     if (reqType == HighPriorityVBNotify::ChkPersistence) {
190         return HighPriorityVBReqStatus::NotSupported;
191     }
192
193     {
194         /* Serialize the request with sequence lock */
195         std::lock_guard<std::mutex> lh(sequenceLock);
196
197         if (seqnoOrChkId <= getPersistenceSeqno()) {
198             /* Need not notify asynchronously as the vb already has the
199                requested seqno */
200             return HighPriorityVBReqStatus::RequestNotScheduled;
201         }
202
203         addHighPriorityVBEntry(seqnoOrChkId, cookie, reqType);
204     }
205
206     return HighPriorityVBReqStatus::RequestScheduled;
207 }
208
209 void EphemeralVBucket::notifyHighPriorityRequests(
210         EventuallyPersistentEngine& engine,
211         uint64_t idNum,
212         HighPriorityVBNotify notifyType) {
213     throw std::logic_error(
214             "EphemeralVBucket::notifyHighPriorityRequests() is not valid. "
215             "Called on vb " +
216             std::to_string(getId()));
217 }
218
219 void EphemeralVBucket::notifyAllPendingConnsFailed(
220         EventuallyPersistentEngine& e) {
221     auto toNotify = tmpFailAndGetAllHpNotifies(e);
222
223     for (auto& notify : toNotify) {
224         e.notifyIOComplete(notify.first, notify.second);
225     }
226
227     fireAllOps(e);
228 }
229
230 std::unique_ptr<DCPBackfill> EphemeralVBucket::createDCPBackfill(
231         EventuallyPersistentEngine& e,
232         const active_stream_t& stream,
233         uint64_t startSeqno,
234         uint64_t endSeqno) {
235     /* create a memory backfill object */
236     EphemeralVBucketPtr evb =
237             std::static_pointer_cast<EphemeralVBucket>(shared_from_this());
238     return std::make_unique<DCPBackfillMemory>(
239             evb, stream, startSeqno, endSeqno);
240 }
241
242 std::pair<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>>
243 EphemeralVBucket::inMemoryBackfill(uint64_t start, uint64_t end) {
244     return seqList->rangeRead(start, end);
245 }
246
247 /* Vb level backfill queue is for items in a huge snapshot (disk backfill
248    snapshots from DCP are typically huge) that could not be fit on a
249    checkpoint. They update all stats, checkpoint seqno, but are not put
250    on checkpoint and are directly persisted from the queue.
251
252    In ephemeral buckets we must not add backfill items from DCP (on
253    replica vbuckets), to the vb backfill queue because we have put them on
254    linkedlist already. Also we do not have the flusher task to drain the
255    items from that queue.
256    (Unlike checkpoints, the items in this queue is not cleaned up
257     in a background cleanup task).
258
259    But we must be careful to update certain stats and checkpoint seqno
260    like in a regular couchbase bucket. */
261 void EphemeralVBucket::queueBackfillItem(
262         queued_item& qi, const GenerateBySeqno generateBySeqno) {
263     if (GenerateBySeqno::Yes == generateBySeqno) {
264         qi->setBySeqno(checkpointManager.nextBySeqno());
265     } else {
266         checkpointManager.setBySeqno(qi->getBySeqno());
267     }
268     ++stats.totalEnqueued;
269     stats.memOverhead->fetch_add(sizeof(queued_item));
270 }
271
272 size_t EphemeralVBucket::purgeTombstones(rel_time_t purgeAge) {
273     // First mark all deleted items in the HashTable which can be purged as
274     // Stale - this removes them from the HashTable, transferring ownership to
275     // SequenceList.
276     HTTombstonePurger purger(*this, purgeAge);
277     ht.visit(purger);
278
279     // Secondly iterate over the sequence list and delete any stale items
280     auto seqListPurged = seqList->purgeTombstones();
281
282     // Update stats and return.
283     htDeletedPurgeCount += purger.getNumPurged();
284     seqListPurgeCount += seqListPurged;
285     setPurgeSeqno(seqList->getHighestPurgedDeletedSeqno());
286
287     return seqListPurged;
288 }
289
290 std::tuple<StoredValue*, MutationStatus, VBNotifyCtx>
291 EphemeralVBucket::updateStoredValue(const HashTable::HashBucketLock& hbl,
292                                     StoredValue& v,
293                                     const Item& itm,
294                                     const VBQueueItemCtx* queueItmCtx) {
295     std::lock_guard<std::mutex> lh(sequenceLock);
296
297     const bool recreatingDeletedItem = v.isDeleted();
298
299     /* Update the OrderedStoredValue in hash table + Ordered data structure
300        (list) */
301     auto res = seqList->updateListElem(lh, *(v.toOrderedStoredValue()));
302
303     StoredValue* newSv = &v;
304     StoredValue::UniquePtr ownedSv;
305     MutationStatus status(MutationStatus::WasClean);
306
307     switch (res) {
308     case SequenceList::UpdateStatus::Success:
309         /* OrderedStoredValue moved to end of the list, just update its
310            value */
311         status = ht.unlocked_updateStoredValue(hbl.getHTLock(), v, itm);
312         break;
313
314     case SequenceList::UpdateStatus::Append: {
315         /* OrderedStoredValue cannot be moved to end of the list,
316            due to a range read. Hence, release the storedvalue from the
317            hash table, indicate the list to mark the OrderedStoredValue
318            stale (old duplicate) and add a new StoredValue for the itm.
319
320            Note: It is important to remove item from hash table before
321                  marking stale because once marked stale list assumes the
322                  ownership of the item and may delete it anytime. */
323         /* Release current storedValue from hash table */
324         /* [EPHE TODO]: Write a HT func to release the StoredValue directly
325                         than taking key as a param and deleting
326                         (MB-23184) */
327         ownedSv = ht.unlocked_release(hbl, v.getKey());
328
329         /* Add a new storedvalue for the item */
330         newSv = ht.unlocked_addNewStoredValue(hbl, itm);
331
332         seqList->appendToList(lh, *(newSv->toOrderedStoredValue()));
333     } break;
334     }
335
336     VBNotifyCtx notifyCtx;
337     if (queueItmCtx) {
338         /* Put on checkpoint mgr */
339         notifyCtx = queueDirty(*newSv, *queueItmCtx);
340     }
341
342     /* Update the high seqno in the sequential storage */
343     seqList->updateHighSeqno(*(newSv->toOrderedStoredValue()));
344     if (recreatingDeletedItem) {
345         ++opsCreate;
346     } else {
347         ++opsUpdate;
348     }
349
350     if (res == SequenceList::UpdateStatus::Append) {
351         /* Mark the un-updated storedValue as stale. This must be done after
352            the new storedvalue for the item is visible for range read in the
353            list. This is because we do not want the seqlist to delete the stale
354            item before its latest copy is added to the list.
355            (item becomes visible for range read only after updating the list
356             with the seqno of the item) */
357         seqList->markItemStale(std::move(ownedSv));
358     }
359     return std::make_tuple(newSv, status, notifyCtx);
360 }
361
362 std::pair<StoredValue*, VBNotifyCtx> EphemeralVBucket::addNewStoredValue(
363         const HashTable::HashBucketLock& hbl,
364         const Item& itm,
365         const VBQueueItemCtx* queueItmCtx) {
366     StoredValue* v = ht.unlocked_addNewStoredValue(hbl, itm);
367
368     std::lock_guard<std::mutex> lh(sequenceLock);
369
370     /* Add to the sequential storage */
371     try {
372         seqList->appendToList(lh, *(v->toOrderedStoredValue()));
373     } catch (const std::bad_cast& e) {
374         throw std::logic_error(
375                 "EphemeralVBucket::addNewStoredValue(): Error " +
376                 std::string(e.what()) + " for vbucket: " +
377                 std::to_string(getId()) + " for key: " +
378                 std::string(reinterpret_cast<const char*>(v->getKey().data()),
379                             v->getKey().size()));
380     }
381
382     VBNotifyCtx notifyCtx;
383     if (queueItmCtx) {
384         /* Put on checkpoint mgr */
385         notifyCtx = queueDirty(*v, *queueItmCtx);
386     }
387
388     /* Update the high seqno in the sequential storage */
389     seqList->updateHighSeqno(*(v->toOrderedStoredValue()));
390     ++opsCreate;
391
392     return {v, notifyCtx};
393 }
394
395 std::tuple<StoredValue*, VBNotifyCtx> EphemeralVBucket::softDeleteStoredValue(
396         const HashTable::HashBucketLock& hbl,
397         StoredValue& v,
398         bool onlyMarkDeleted,
399         const VBQueueItemCtx& queueItmCtx,
400         uint64_t bySeqno) {
401     std::lock_guard<std::mutex> lh(sequenceLock);
402
403     StoredValue* newSv = &v;
404     StoredValue::UniquePtr ownedSv;
405
406     /* Update the OrderedStoredValue in hash table + Ordered data structure
407        (list) */
408     auto res = seqList->updateListElem(lh, *(v.toOrderedStoredValue()));
409
410     switch (res) {
411     case SequenceList::UpdateStatus::Success:
412         /* OrderedStoredValue is moved to end of the list, do nothing */
413         break;
414
415     case SequenceList::UpdateStatus::Append: {
416         /* OrderedStoredValue cannot be moved to end of the list,
417            due to a range read. Hence, replace the storedvalue in the
418            hash table with its copy and indicate the list to mark the
419            OrderedStoredValue stale (old duplicate).
420
421            Note: It is important to remove item from hash table before
422                  marking stale because once marked stale list assumes the
423                  ownership of the item and may delete it anytime. */
424
425         /* Release current storedValue from hash table */
426         /* [EPHE TODO]: Write a HT func to replace the StoredValue directly
427                         than taking key as a param and deleting (MB-23184) */
428         std::tie(newSv, ownedSv) = ht.unlocked_replaceByCopy(hbl, v);
429
430         seqList->appendToList(lh, *(newSv->toOrderedStoredValue()));
431     } break;
432     }
433
434     /* Delete the storedvalue */
435     ht.unlocked_softDelete(hbl.getHTLock(), *newSv, onlyMarkDeleted);
436
437     if (queueItmCtx.genBySeqno == GenerateBySeqno::No) {
438         newSv->setBySeqno(bySeqno);
439     }
440
441     VBNotifyCtx notifyCtx = queueDirty(*newSv, queueItmCtx);
442
443     /* Update the high seqno in the sequential storage */
444     seqList->updateHighSeqno(*(newSv->toOrderedStoredValue()));
445     ++opsDelete;
446
447     if (res == SequenceList::UpdateStatus::Append) {
448         /* Mark the un-updated storedValue as stale. This must be done after
449            the new storedvalue for the item is visible for range read in the
450            list. This is because we do not want the seqlist to delete the stale
451            item before its latest copy is added to the list.
452            (item becomes visible for range read only after updating the list
453            with the seqno of the item) */
454         seqList->markItemStale(std::move(ownedSv));
455     }
456     return std::make_tuple(newSv, notifyCtx);
457 }
458
459 void EphemeralVBucket::bgFetch(const DocKey& key,
460                                const void* cookie,
461                                EventuallyPersistentEngine& engine,
462                                const int bgFetchDelay,
463                                const bool isMeta) {
464     throw std::logic_error(
465             "EphemeralVBucket::bgFetch() is not valid. Called on vb " +
466             std::to_string(getId()) + "for key: " +
467             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
468 }
469
470 ENGINE_ERROR_CODE
471 EphemeralVBucket::addTempItemAndBGFetch(HashTable::HashBucketLock& hbl,
472                                         const DocKey& key,
473                                         const void* cookie,
474                                         EventuallyPersistentEngine& engine,
475                                         int bgFetchDelay,
476                                         bool metadataOnly,
477                                         bool isReplication) {
478     /* [EPHE TODO]: Just return error code and make all the callers handle it */
479     throw std::logic_error(
480             "EphemeralVBucket::addTempItemAndBGFetch() is not valid. "
481             "Called on vb " +
482             std::to_string(getId()) + "for key: " +
483             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
484 }
485
486 GetValue EphemeralVBucket::getInternalNonResident(
487         const DocKey& key,
488         const void* cookie,
489         EventuallyPersistentEngine& engine,
490         int bgFetchDelay,
491         get_options_t options,
492         const StoredValue& v) {
493     /* We reach here only if the v is deleted and does not have any value */
494     return GetValue();
495 }