1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2017 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #include "ephemeral_vb.h"
20 #include "dcp/backfill_memory.h"
21 #include "ephemeral_tombstone_purger.h"
22 #include "failover-table.h"
23 #include "linked_list.h"
24 #include "stored_value_factories.h"
26 EphemeralVBucket::EphemeralVBucket(id_type i,
27 vbucket_state_t newState,
29 CheckpointConfig& chkConfig,
32 uint64_t lastSnapStart,
34 std::unique_ptr<FailoverTable> table,
35 NewSeqnoCallback newSeqnoCb,
36 Configuration& config,
37 item_eviction_policy_t evictionPolicy,
38 vbucket_state_t initState,
41 const std::string& collectionsManifest)
50 /*flusherCb*/ nullptr,
51 std::make_unique<OrderedStoredValueFactory>(st),
52 std::move(newSeqnoCb),
59 seqList(std::make_unique<BasicLinkedList>(i, st)) {
62 size_t EphemeralVBucket::getNumItems() const {
63 return ht.getNumInMemoryItems() - ht.getNumDeletedItems();
66 void EphemeralVBucket::completeStatsVKey(
67 const DocKey& key, const RememberingCallback<GetValue>& gcb) {
68 throw std::logic_error(
69 "EphemeralVBucket::completeStatsVKey() is not valid call. "
71 std::to_string(getId()) + "for key: " +
72 std::string(reinterpret_cast<const char*>(key.data()), key.size()));
75 bool EphemeralVBucket::pageOut(const HashTable::HashBucketLock& lh,
77 // We only delete from active vBuckets to ensure that replicas stay in
78 // sync with the active (the delete from active is sent via DCP to the
79 // the replicas as an explicit delete).
80 if (getState() != vbucket_state_active) {
83 if (v->isDeleted() && !v->getValue()) {
84 // If the item has already been deleted (and doesn't have a value
85 // associated with it) then there's no further deletion possible,
86 // until the deletion marker (tombstone) is later purged at the
87 // metadata purge internal.
90 VBQueueItemCtx queueCtx(GenerateBySeqno::Yes,
95 v->setRevSeqno(v->getRevSeqno() + 1);
97 VBNotifyCtx notifyCtx;
98 std::tie(newSv, notifyCtx) = softDeleteStoredValue(
99 lh, *v, /*onlyMarkDeleted*/ false, queueCtx, 0);
100 ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno());
101 notifyNewSeqno(notifyCtx);
108 void EphemeralVBucket::addStats(bool details,
111 // Include base class statistics:
112 _addStats(details, add_stat, c);
115 // Ephemeral-specific details
116 addStat("auto_delete_count", autoDeleteCount.load(), add_stat, c);
117 addStat("seqlist_count", seqList->getNumItems(), add_stat, c);
118 addStat("seqlist_deleted_count",
119 seqList->getNumDeletedItems(),
122 addStat("seqlist_high_seqno", seqList->getHighSeqno(), add_stat, c);
123 addStat("seqlist_highest_deduped_seqno",
124 seqList->getHighestDedupedSeqno(),
127 const auto rr_begin = seqList->getRangeReadBegin();
128 const auto rr_end = seqList->getRangeReadEnd();
129 addStat("seqlist_range_read_begin", rr_begin, add_stat, c);
130 addStat("seqlist_range_read_end", rr_end, add_stat, c);
131 addStat("seqlist_range_read_count", rr_end - rr_begin, add_stat, c);
132 addStat("seqlist_stale_count",
133 seqList->getNumStaleItems(),
136 addStat("seqlist_stale_value_bytes",
137 seqList->getStaleValueBytes(),
140 addStat("seqlist_stale_metadata_bytes",
141 seqList->getStaleValueBytes(),
147 void EphemeralVBucket::dump() const {
148 std::cerr << "EphemeralVBucket[" << this
149 << "] with state:" << toString(getState())
150 << " numItems:" << getNumItems()
153 std::cerr << ht << std::endl;
156 ENGINE_ERROR_CODE EphemeralVBucket::completeBGFetchForSingleItem(
158 const VBucketBGFetchItem& fetched_item,
159 const ProcessClock::time_point startTime) {
160 /* [EPHE TODO]: Just return error code and make all the callers handle it */
161 throw std::logic_error(
162 "EphemeralVBucket::completeBGFetchForSingleItem() "
163 "is not valid. Called on vb " +
164 std::to_string(getId()) + "for key: " +
165 std::string(reinterpret_cast<const char*>(key.data()), key.size()));
168 void EphemeralVBucket::resetStats() {
169 autoDeleteCount.reset();
172 vb_bgfetch_queue_t EphemeralVBucket::getBGFetchItems() {
173 throw std::logic_error(
174 "EphemeralVBucket::getBGFetchItems() is not valid. "
176 std::to_string(getId()));
179 bool EphemeralVBucket::hasPendingBGFetchItems() {
180 throw std::logic_error(
181 "EphemeralVBucket::hasPendingBGFetchItems() is not valid. "
183 std::to_string(getId()));
186 HighPriorityVBReqStatus EphemeralVBucket::checkAddHighPriorityVBEntry(
187 uint64_t seqnoOrChkId,
189 HighPriorityVBNotify reqType) {
190 if (reqType == HighPriorityVBNotify::ChkPersistence) {
191 return HighPriorityVBReqStatus::NotSupported;
195 /* Serialize the request with sequence lock */
196 std::lock_guard<std::mutex> lh(sequenceLock);
198 if (seqnoOrChkId <= getPersistenceSeqno()) {
199 /* Need not notify asynchronously as the vb already has the
201 return HighPriorityVBReqStatus::RequestNotScheduled;
204 addHighPriorityVBEntry(seqnoOrChkId, cookie, reqType);
207 return HighPriorityVBReqStatus::RequestScheduled;
210 void EphemeralVBucket::notifyHighPriorityRequests(
211 EventuallyPersistentEngine& engine,
213 HighPriorityVBNotify notifyType) {
214 throw std::logic_error(
215 "EphemeralVBucket::notifyHighPriorityRequests() is not valid. "
217 std::to_string(getId()));
220 void EphemeralVBucket::notifyAllPendingConnsFailed(
221 EventuallyPersistentEngine& e) {
222 auto toNotify = tmpFailAndGetAllHpNotifies(e);
224 for (auto& notify : toNotify) {
225 e.notifyIOComplete(notify.first, notify.second);
231 std::unique_ptr<DCPBackfill> EphemeralVBucket::createDCPBackfill(
232 EventuallyPersistentEngine& e,
233 const active_stream_t& stream,
236 /* create a memory backfill object */
237 EphemeralVBucketPtr evb =
238 std::static_pointer_cast<EphemeralVBucket>(shared_from_this());
239 return std::make_unique<DCPBackfillMemory>(
240 evb, stream, startSeqno, endSeqno);
243 std::pair<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>>
244 EphemeralVBucket::inMemoryBackfill(uint64_t start, uint64_t end) {
245 return seqList->rangeRead(start, end);
248 /* Vb level backfill queue is for items in a huge snapshot (disk backfill
249 snapshots from DCP are typically huge) that could not be fit on a
250 checkpoint. They update all stats, checkpoint seqno, but are not put
251 on checkpoint and are directly persisted from the queue.
253 In ephemeral buckets we must not add backfill items from DCP (on
254 replica vbuckets), to the vb backfill queue because we have put them on
255 linkedlist already. Also we do not have the flusher task to drain the
256 items from that queue.
257 (Unlike checkpoints, the items in this queue is not cleaned up
258 in a background cleanup task).
260 But we must be careful to update certain stats and checkpoint seqno
261 like in a regular couchbase bucket. */
262 void EphemeralVBucket::queueBackfillItem(
263 queued_item& qi, const GenerateBySeqno generateBySeqno) {
264 if (GenerateBySeqno::Yes == generateBySeqno) {
265 qi->setBySeqno(checkpointManager.nextBySeqno());
267 checkpointManager.setBySeqno(qi->getBySeqno());
269 ++stats.totalEnqueued;
270 stats.memOverhead->fetch_add(sizeof(queued_item));
273 size_t EphemeralVBucket::purgeTombstones(rel_time_t purgeAge) {
274 // First mark all deleted items in the HashTable which can be purged as
275 // Stale - this removes them from the HashTable, transferring ownership to
277 HTTombstonePurger purger(*this, purgeAge);
280 // Secondly iterate over the sequence list and delete any stale items
281 auto seqListPurged = seqList->purgeTombstones();
283 // Update stats and return.
284 htDeletedPurgeCount += purger.getNumPurged();
285 seqListPurgeCount += seqListPurged;
286 setPurgeSeqno(seqList->getHighestPurgedDeletedSeqno());
288 return seqListPurged;
291 std::tuple<StoredValue*, MutationStatus, VBNotifyCtx>
292 EphemeralVBucket::updateStoredValue(const HashTable::HashBucketLock& hbl,
295 const VBQueueItemCtx* queueItmCtx) {
296 std::lock_guard<std::mutex> lh(sequenceLock);
298 const bool recreatingDeletedItem = v.isDeleted();
300 /* Update the OrderedStoredValue in hash table + Ordered data structure
302 auto res = seqList->updateListElem(lh, *(v.toOrderedStoredValue()));
304 StoredValue* newSv = &v;
305 StoredValue::UniquePtr ownedSv;
306 MutationStatus status(MutationStatus::WasClean);
309 case SequenceList::UpdateStatus::Success:
310 /* OrderedStoredValue moved to end of the list, just update its
312 status = ht.unlocked_updateStoredValue(hbl.getHTLock(), v, itm);
315 case SequenceList::UpdateStatus::Append: {
316 /* OrderedStoredValue cannot be moved to end of the list,
317 due to a range read. Hence, release the storedvalue from the
318 hash table, indicate the list to mark the OrderedStoredValue
319 stale (old duplicate) and add a new StoredValue for the itm.
321 Note: It is important to remove item from hash table before
322 marking stale because once marked stale list assumes the
323 ownership of the item and may delete it anytime. */
324 /* Release current storedValue from hash table */
325 /* [EPHE TODO]: Write a HT func to release the StoredValue directly
326 than taking key as a param and deleting
328 ownedSv = ht.unlocked_release(hbl, v.getKey());
330 /* Add a new storedvalue for the item */
331 newSv = ht.unlocked_addNewStoredValue(hbl, itm);
333 seqList->appendToList(lh, *(newSv->toOrderedStoredValue()));
337 VBNotifyCtx notifyCtx;
339 /* Put on checkpoint mgr */
340 notifyCtx = queueDirty(*newSv, *queueItmCtx);
343 /* Update the high seqno in the sequential storage */
344 seqList->updateHighSeqno(*(newSv->toOrderedStoredValue()));
345 if (recreatingDeletedItem) {
351 if (res == SequenceList::UpdateStatus::Append) {
352 /* Mark the un-updated storedValue as stale. This must be done after
353 the new storedvalue for the item is visible for range read in the
354 list. This is because we do not want the seqlist to delete the stale
355 item before its latest copy is added to the list.
356 (item becomes visible for range read only after updating the list
357 with the seqno of the item) */
358 seqList->markItemStale(std::move(ownedSv));
360 return std::make_tuple(newSv, status, notifyCtx);
363 std::pair<StoredValue*, VBNotifyCtx> EphemeralVBucket::addNewStoredValue(
364 const HashTable::HashBucketLock& hbl,
366 const VBQueueItemCtx* queueItmCtx) {
367 StoredValue* v = ht.unlocked_addNewStoredValue(hbl, itm);
369 std::lock_guard<std::mutex> lh(sequenceLock);
371 /* Add to the sequential storage */
373 seqList->appendToList(lh, *(v->toOrderedStoredValue()));
374 } catch (const std::bad_cast& e) {
375 throw std::logic_error(
376 "EphemeralVBucket::addNewStoredValue(): Error " +
377 std::string(e.what()) + " for vbucket: " +
378 std::to_string(getId()) + " for key: " +
379 std::string(reinterpret_cast<const char*>(v->getKey().data()),
380 v->getKey().size()));
383 VBNotifyCtx notifyCtx;
385 /* Put on checkpoint mgr */
386 notifyCtx = queueDirty(*v, *queueItmCtx);
389 /* Update the high seqno in the sequential storage */
390 seqList->updateHighSeqno(*(v->toOrderedStoredValue()));
393 return {v, notifyCtx};
396 std::tuple<StoredValue*, VBNotifyCtx> EphemeralVBucket::softDeleteStoredValue(
397 const HashTable::HashBucketLock& hbl,
399 bool onlyMarkDeleted,
400 const VBQueueItemCtx& queueItmCtx,
402 std::lock_guard<std::mutex> lh(sequenceLock);
404 StoredValue* newSv = &v;
405 StoredValue::UniquePtr ownedSv;
407 /* Update the OrderedStoredValue in hash table + Ordered data structure
409 auto res = seqList->updateListElem(lh, *(v.toOrderedStoredValue()));
412 case SequenceList::UpdateStatus::Success:
413 /* OrderedStoredValue is moved to end of the list, do nothing */
416 case SequenceList::UpdateStatus::Append: {
417 /* OrderedStoredValue cannot be moved to end of the list,
418 due to a range read. Hence, replace the storedvalue in the
419 hash table with its copy and indicate the list to mark the
420 OrderedStoredValue stale (old duplicate).
422 Note: It is important to remove item from hash table before
423 marking stale because once marked stale list assumes the
424 ownership of the item and may delete it anytime. */
426 /* Release current storedValue from hash table */
427 /* [EPHE TODO]: Write a HT func to replace the StoredValue directly
428 than taking key as a param and deleting (MB-23184) */
429 std::tie(newSv, ownedSv) = ht.unlocked_replaceByCopy(hbl, v);
431 seqList->appendToList(lh, *(newSv->toOrderedStoredValue()));
435 /* Delete the storedvalue */
436 ht.unlocked_softDelete(hbl.getHTLock(), *newSv, onlyMarkDeleted);
438 if (queueItmCtx.genBySeqno == GenerateBySeqno::No) {
439 newSv->setBySeqno(bySeqno);
442 VBNotifyCtx notifyCtx = queueDirty(*newSv, queueItmCtx);
444 /* Update the high seqno in the sequential storage */
445 seqList->updateHighSeqno(*(newSv->toOrderedStoredValue()));
448 if (res == SequenceList::UpdateStatus::Append) {
449 /* Mark the un-updated storedValue as stale. This must be done after
450 the new storedvalue for the item is visible for range read in the
451 list. This is because we do not want the seqlist to delete the stale
452 item before its latest copy is added to the list.
453 (item becomes visible for range read only after updating the list
454 with the seqno of the item) */
455 seqList->markItemStale(std::move(ownedSv));
457 return std::make_tuple(newSv, notifyCtx);
460 void EphemeralVBucket::bgFetch(const DocKey& key,
462 EventuallyPersistentEngine& engine,
463 const int bgFetchDelay,
465 throw std::logic_error(
466 "EphemeralVBucket::bgFetch() is not valid. Called on vb " +
467 std::to_string(getId()) + "for key: " +
468 std::string(reinterpret_cast<const char*>(key.data()), key.size()));
472 EphemeralVBucket::addTempItemAndBGFetch(HashTable::HashBucketLock& hbl,
475 EventuallyPersistentEngine& engine,
478 bool isReplication) {
479 /* [EPHE TODO]: Just return error code and make all the callers handle it */
480 throw std::logic_error(
481 "EphemeralVBucket::addTempItemAndBGFetch() is not valid. "
483 std::to_string(getId()) + "for key: " +
484 std::string(reinterpret_cast<const char*>(key.data()), key.size()));
487 GetValue EphemeralVBucket::getInternalNonResident(
490 EventuallyPersistentEngine& engine,
492 get_options_t options,
493 const StoredValue& v) {
494 /* We reach here only if the v is deleted and does not have any value */