DEBUG: Add HashTable::operator<<, expand StoredValue::operator<<
[ep-engine.git] / src / ephemeral_vb.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "ephemeral_vb.h"
19
20 #include "dcp/backfill_memory.h"
21 #include "ephemeral_tombstone_purger.h"
22 #include "failover-table.h"
23 #include "linked_list.h"
24 #include "stored_value_factories.h"
25
26 EphemeralVBucket::EphemeralVBucket(id_type i,
27                                    vbucket_state_t newState,
28                                    EPStats& st,
29                                    CheckpointConfig& chkConfig,
30                                    KVShard* kvshard,
31                                    int64_t lastSeqno,
32                                    uint64_t lastSnapStart,
33                                    uint64_t lastSnapEnd,
34                                    std::unique_ptr<FailoverTable> table,
35                                    NewSeqnoCallback newSeqnoCb,
36                                    Configuration& config,
37                                    item_eviction_policy_t evictionPolicy,
38                                    vbucket_state_t initState,
39                                    uint64_t purgeSeqno,
40                                    uint64_t maxCas,
41                                    const std::string& collectionsManifest)
42     : VBucket(i,
43               newState,
44               st,
45               chkConfig,
46               lastSeqno,
47               lastSnapStart,
48               lastSnapEnd,
49               std::move(table),
50               /*flusherCb*/ nullptr,
51               std::make_unique<OrderedStoredValueFactory>(st),
52               std::move(newSeqnoCb),
53               config,
54               evictionPolicy,
55               initState,
56               purgeSeqno,
57               maxCas,
58               collectionsManifest),
59       seqList(std::make_unique<BasicLinkedList>(i, st)) {
60 }
61
62 size_t EphemeralVBucket::getNumItems() const {
63     return ht.getNumInMemoryItems() - ht.getNumDeletedItems();
64 }
65
66 void EphemeralVBucket::completeStatsVKey(
67         const DocKey& key, const RememberingCallback<GetValue>& gcb) {
68     throw std::logic_error(
69             "EphemeralVBucket::completeStatsVKey() is not valid call. "
70             "Called on vb " +
71             std::to_string(getId()) + "for key: " +
72             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
73 }
74
75 bool EphemeralVBucket::pageOut(const HashTable::HashBucketLock& lh,
76                                StoredValue*& v) {
77     // We only delete from active vBuckets to ensure that replicas stay in
78     // sync with the active (the delete from active is sent via DCP to the
79     // the replicas as an explicit delete).
80     if (getState() != vbucket_state_active) {
81         return false;
82     }
83     if (v->isDeleted() && !v->getValue()) {
84         // If the item has already been deleted (and doesn't have a value
85         // associated with it) then there's no further deletion possible,
86         // until the deletion marker (tombstone) is later purged at the
87         // metadata purge internal.
88         return false;
89     }
90     VBQueueItemCtx queueCtx(GenerateBySeqno::Yes,
91                             GenerateCas::Yes,
92                             TrackCasDrift::No,
93                             /*isBackfill*/ false,
94                             nullptr);
95     v->setRevSeqno(v->getRevSeqno() + 1);
96     StoredValue* newSv;
97     VBNotifyCtx notifyCtx;
98     std::tie(newSv, notifyCtx) = softDeleteStoredValue(
99             lh, *v, /*onlyMarkDeleted*/ false, queueCtx, 0);
100     ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno());
101     notifyNewSeqno(notifyCtx);
102
103     autoDeleteCount++;
104
105     return true;
106 }
107
108 void EphemeralVBucket::addStats(bool details,
109                                 ADD_STAT add_stat,
110                                 const void* c) {
111     // Include base class statistics:
112     _addStats(details, add_stat, c);
113
114     if (details) {
115         // Ephemeral-specific details
116         addStat("auto_delete_count", autoDeleteCount.load(), add_stat, c);
117         addStat("seqlist_count", seqList->getNumItems(), add_stat, c);
118         addStat("seqlist_deleted_count",
119                 seqList->getNumDeletedItems(),
120                 add_stat,
121                 c);
122         addStat("seqlist_high_seqno", seqList->getHighSeqno(), add_stat, c);
123         addStat("seqlist_highest_deduped_seqno",
124                 seqList->getHighestDedupedSeqno(),
125                 add_stat,
126                 c);
127         const auto rr_begin = seqList->getRangeReadBegin();
128         const auto rr_end = seqList->getRangeReadEnd();
129         addStat("seqlist_range_read_begin", rr_begin, add_stat, c);
130         addStat("seqlist_range_read_end", rr_end, add_stat, c);
131         addStat("seqlist_range_read_count", rr_end - rr_begin, add_stat, c);
132         addStat("seqlist_stale_count",
133                 seqList->getNumStaleItems(),
134                 add_stat,
135                 c);
136         addStat("seqlist_stale_value_bytes",
137                 seqList->getStaleValueBytes(),
138                 add_stat,
139                 c);
140         addStat("seqlist_stale_metadata_bytes",
141                 seqList->getStaleValueBytes(),
142                 add_stat,
143                 c);
144     }
145 }
146
147 void EphemeralVBucket::dump() const {
148     std::cerr << "EphemeralVBucket[" << this
149               << "] with state:" << toString(getState())
150               << " numItems:" << getNumItems()
151               << std::endl;
152     seqList->dump();
153     std::cerr << ht << std::endl;
154 }
155
156 ENGINE_ERROR_CODE EphemeralVBucket::completeBGFetchForSingleItem(
157         const DocKey& key,
158         const VBucketBGFetchItem& fetched_item,
159         const ProcessClock::time_point startTime) {
160     /* [EPHE TODO]: Just return error code and make all the callers handle it */
161     throw std::logic_error(
162             "EphemeralVBucket::completeBGFetchForSingleItem() "
163             "is not valid. Called on vb " +
164             std::to_string(getId()) + "for key: " +
165             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
166 }
167
168 void EphemeralVBucket::resetStats() {
169     autoDeleteCount.reset();
170 }
171
172 vb_bgfetch_queue_t EphemeralVBucket::getBGFetchItems() {
173     throw std::logic_error(
174             "EphemeralVBucket::getBGFetchItems() is not valid. "
175             "Called on vb " +
176             std::to_string(getId()));
177 }
178
179 bool EphemeralVBucket::hasPendingBGFetchItems() {
180     throw std::logic_error(
181             "EphemeralVBucket::hasPendingBGFetchItems() is not valid. "
182             "Called on vb " +
183             std::to_string(getId()));
184 }
185
186 HighPriorityVBReqStatus EphemeralVBucket::checkAddHighPriorityVBEntry(
187         uint64_t seqnoOrChkId,
188         const void* cookie,
189         HighPriorityVBNotify reqType) {
190     if (reqType == HighPriorityVBNotify::ChkPersistence) {
191         return HighPriorityVBReqStatus::NotSupported;
192     }
193
194     {
195         /* Serialize the request with sequence lock */
196         std::lock_guard<std::mutex> lh(sequenceLock);
197
198         if (seqnoOrChkId <= getPersistenceSeqno()) {
199             /* Need not notify asynchronously as the vb already has the
200                requested seqno */
201             return HighPriorityVBReqStatus::RequestNotScheduled;
202         }
203
204         addHighPriorityVBEntry(seqnoOrChkId, cookie, reqType);
205     }
206
207     return HighPriorityVBReqStatus::RequestScheduled;
208 }
209
210 void EphemeralVBucket::notifyHighPriorityRequests(
211         EventuallyPersistentEngine& engine,
212         uint64_t idNum,
213         HighPriorityVBNotify notifyType) {
214     throw std::logic_error(
215             "EphemeralVBucket::notifyHighPriorityRequests() is not valid. "
216             "Called on vb " +
217             std::to_string(getId()));
218 }
219
220 void EphemeralVBucket::notifyAllPendingConnsFailed(
221         EventuallyPersistentEngine& e) {
222     auto toNotify = tmpFailAndGetAllHpNotifies(e);
223
224     for (auto& notify : toNotify) {
225         e.notifyIOComplete(notify.first, notify.second);
226     }
227
228     fireAllOps(e);
229 }
230
231 std::unique_ptr<DCPBackfill> EphemeralVBucket::createDCPBackfill(
232         EventuallyPersistentEngine& e,
233         const active_stream_t& stream,
234         uint64_t startSeqno,
235         uint64_t endSeqno) {
236     /* create a memory backfill object */
237     EphemeralVBucketPtr evb =
238             std::static_pointer_cast<EphemeralVBucket>(shared_from_this());
239     return std::make_unique<DCPBackfillMemory>(
240             evb, stream, startSeqno, endSeqno);
241 }
242
243 std::pair<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>>
244 EphemeralVBucket::inMemoryBackfill(uint64_t start, uint64_t end) {
245     return seqList->rangeRead(start, end);
246 }
247
248 /* Vb level backfill queue is for items in a huge snapshot (disk backfill
249    snapshots from DCP are typically huge) that could not be fit on a
250    checkpoint. They update all stats, checkpoint seqno, but are not put
251    on checkpoint and are directly persisted from the queue.
252
253    In ephemeral buckets we must not add backfill items from DCP (on
254    replica vbuckets), to the vb backfill queue because we have put them on
255    linkedlist already. Also we do not have the flusher task to drain the
256    items from that queue.
257    (Unlike checkpoints, the items in this queue is not cleaned up
258     in a background cleanup task).
259
260    But we must be careful to update certain stats and checkpoint seqno
261    like in a regular couchbase bucket. */
262 void EphemeralVBucket::queueBackfillItem(
263         queued_item& qi, const GenerateBySeqno generateBySeqno) {
264     if (GenerateBySeqno::Yes == generateBySeqno) {
265         qi->setBySeqno(checkpointManager.nextBySeqno());
266     } else {
267         checkpointManager.setBySeqno(qi->getBySeqno());
268     }
269     ++stats.totalEnqueued;
270     stats.memOverhead->fetch_add(sizeof(queued_item));
271 }
272
273 size_t EphemeralVBucket::purgeTombstones(rel_time_t purgeAge) {
274     // First mark all deleted items in the HashTable which can be purged as
275     // Stale - this removes them from the HashTable, transferring ownership to
276     // SequenceList.
277     HTTombstonePurger purger(*this, purgeAge);
278     ht.visit(purger);
279
280     // Secondly iterate over the sequence list and delete any stale items
281     auto seqListPurged = seqList->purgeTombstones();
282
283     // Update stats and return.
284     htDeletedPurgeCount += purger.getNumPurged();
285     seqListPurgeCount += seqListPurged;
286     setPurgeSeqno(seqList->getHighestPurgedDeletedSeqno());
287
288     return seqListPurged;
289 }
290
291 std::tuple<StoredValue*, MutationStatus, VBNotifyCtx>
292 EphemeralVBucket::updateStoredValue(const HashTable::HashBucketLock& hbl,
293                                     StoredValue& v,
294                                     const Item& itm,
295                                     const VBQueueItemCtx* queueItmCtx) {
296     std::lock_guard<std::mutex> lh(sequenceLock);
297
298     const bool recreatingDeletedItem = v.isDeleted();
299
300     /* Update the OrderedStoredValue in hash table + Ordered data structure
301        (list) */
302     auto res = seqList->updateListElem(lh, *(v.toOrderedStoredValue()));
303
304     StoredValue* newSv = &v;
305     StoredValue::UniquePtr ownedSv;
306     MutationStatus status(MutationStatus::WasClean);
307
308     switch (res) {
309     case SequenceList::UpdateStatus::Success:
310         /* OrderedStoredValue moved to end of the list, just update its
311            value */
312         status = ht.unlocked_updateStoredValue(hbl.getHTLock(), v, itm);
313         break;
314
315     case SequenceList::UpdateStatus::Append: {
316         /* OrderedStoredValue cannot be moved to end of the list,
317            due to a range read. Hence, release the storedvalue from the
318            hash table, indicate the list to mark the OrderedStoredValue
319            stale (old duplicate) and add a new StoredValue for the itm.
320
321            Note: It is important to remove item from hash table before
322                  marking stale because once marked stale list assumes the
323                  ownership of the item and may delete it anytime. */
324         /* Release current storedValue from hash table */
325         /* [EPHE TODO]: Write a HT func to release the StoredValue directly
326                         than taking key as a param and deleting
327                         (MB-23184) */
328         ownedSv = ht.unlocked_release(hbl, v.getKey());
329
330         /* Add a new storedvalue for the item */
331         newSv = ht.unlocked_addNewStoredValue(hbl, itm);
332
333         seqList->appendToList(lh, *(newSv->toOrderedStoredValue()));
334     } break;
335     }
336
337     VBNotifyCtx notifyCtx;
338     if (queueItmCtx) {
339         /* Put on checkpoint mgr */
340         notifyCtx = queueDirty(*newSv, *queueItmCtx);
341     }
342
343     /* Update the high seqno in the sequential storage */
344     seqList->updateHighSeqno(*(newSv->toOrderedStoredValue()));
345     if (recreatingDeletedItem) {
346         ++opsCreate;
347     } else {
348         ++opsUpdate;
349     }
350
351     if (res == SequenceList::UpdateStatus::Append) {
352         /* Mark the un-updated storedValue as stale. This must be done after
353            the new storedvalue for the item is visible for range read in the
354            list. This is because we do not want the seqlist to delete the stale
355            item before its latest copy is added to the list.
356            (item becomes visible for range read only after updating the list
357             with the seqno of the item) */
358         seqList->markItemStale(std::move(ownedSv));
359     }
360     return std::make_tuple(newSv, status, notifyCtx);
361 }
362
363 std::pair<StoredValue*, VBNotifyCtx> EphemeralVBucket::addNewStoredValue(
364         const HashTable::HashBucketLock& hbl,
365         const Item& itm,
366         const VBQueueItemCtx* queueItmCtx) {
367     StoredValue* v = ht.unlocked_addNewStoredValue(hbl, itm);
368
369     std::lock_guard<std::mutex> lh(sequenceLock);
370
371     /* Add to the sequential storage */
372     try {
373         seqList->appendToList(lh, *(v->toOrderedStoredValue()));
374     } catch (const std::bad_cast& e) {
375         throw std::logic_error(
376                 "EphemeralVBucket::addNewStoredValue(): Error " +
377                 std::string(e.what()) + " for vbucket: " +
378                 std::to_string(getId()) + " for key: " +
379                 std::string(reinterpret_cast<const char*>(v->getKey().data()),
380                             v->getKey().size()));
381     }
382
383     VBNotifyCtx notifyCtx;
384     if (queueItmCtx) {
385         /* Put on checkpoint mgr */
386         notifyCtx = queueDirty(*v, *queueItmCtx);
387     }
388
389     /* Update the high seqno in the sequential storage */
390     seqList->updateHighSeqno(*(v->toOrderedStoredValue()));
391     ++opsCreate;
392
393     return {v, notifyCtx};
394 }
395
396 std::tuple<StoredValue*, VBNotifyCtx> EphemeralVBucket::softDeleteStoredValue(
397         const HashTable::HashBucketLock& hbl,
398         StoredValue& v,
399         bool onlyMarkDeleted,
400         const VBQueueItemCtx& queueItmCtx,
401         uint64_t bySeqno) {
402     std::lock_guard<std::mutex> lh(sequenceLock);
403
404     StoredValue* newSv = &v;
405     StoredValue::UniquePtr ownedSv;
406
407     /* Update the OrderedStoredValue in hash table + Ordered data structure
408        (list) */
409     auto res = seqList->updateListElem(lh, *(v.toOrderedStoredValue()));
410
411     switch (res) {
412     case SequenceList::UpdateStatus::Success:
413         /* OrderedStoredValue is moved to end of the list, do nothing */
414         break;
415
416     case SequenceList::UpdateStatus::Append: {
417         /* OrderedStoredValue cannot be moved to end of the list,
418            due to a range read. Hence, replace the storedvalue in the
419            hash table with its copy and indicate the list to mark the
420            OrderedStoredValue stale (old duplicate).
421
422            Note: It is important to remove item from hash table before
423                  marking stale because once marked stale list assumes the
424                  ownership of the item and may delete it anytime. */
425
426         /* Release current storedValue from hash table */
427         /* [EPHE TODO]: Write a HT func to replace the StoredValue directly
428                         than taking key as a param and deleting (MB-23184) */
429         std::tie(newSv, ownedSv) = ht.unlocked_replaceByCopy(hbl, v);
430
431         seqList->appendToList(lh, *(newSv->toOrderedStoredValue()));
432     } break;
433     }
434
435     /* Delete the storedvalue */
436     ht.unlocked_softDelete(hbl.getHTLock(), *newSv, onlyMarkDeleted);
437
438     if (queueItmCtx.genBySeqno == GenerateBySeqno::No) {
439         newSv->setBySeqno(bySeqno);
440     }
441
442     VBNotifyCtx notifyCtx = queueDirty(*newSv, queueItmCtx);
443
444     /* Update the high seqno in the sequential storage */
445     seqList->updateHighSeqno(*(newSv->toOrderedStoredValue()));
446     ++opsDelete;
447
448     if (res == SequenceList::UpdateStatus::Append) {
449         /* Mark the un-updated storedValue as stale. This must be done after
450            the new storedvalue for the item is visible for range read in the
451            list. This is because we do not want the seqlist to delete the stale
452            item before its latest copy is added to the list.
453            (item becomes visible for range read only after updating the list
454            with the seqno of the item) */
455         seqList->markItemStale(std::move(ownedSv));
456     }
457     return std::make_tuple(newSv, notifyCtx);
458 }
459
460 void EphemeralVBucket::bgFetch(const DocKey& key,
461                                const void* cookie,
462                                EventuallyPersistentEngine& engine,
463                                const int bgFetchDelay,
464                                const bool isMeta) {
465     throw std::logic_error(
466             "EphemeralVBucket::bgFetch() is not valid. Called on vb " +
467             std::to_string(getId()) + "for key: " +
468             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
469 }
470
471 ENGINE_ERROR_CODE
472 EphemeralVBucket::addTempItemAndBGFetch(HashTable::HashBucketLock& hbl,
473                                         const DocKey& key,
474                                         const void* cookie,
475                                         EventuallyPersistentEngine& engine,
476                                         int bgFetchDelay,
477                                         bool metadataOnly,
478                                         bool isReplication) {
479     /* [EPHE TODO]: Just return error code and make all the callers handle it */
480     throw std::logic_error(
481             "EphemeralVBucket::addTempItemAndBGFetch() is not valid. "
482             "Called on vb " +
483             std::to_string(getId()) + "for key: " +
484             std::string(reinterpret_cast<const char*>(key.data()), key.size()));
485 }
486
487 GetValue EphemeralVBucket::getInternalNonResident(
488         const DocKey& key,
489         const void* cookie,
490         EventuallyPersistentEngine& engine,
491         int bgFetchDelay,
492         get_options_t options,
493         const StoredValue& v) {
494     /* We reach here only if the v is deleted and does not have any value */
495     return GetValue();
496 }