MB-23906: Implement delete-with-value with store() instead of delete()
[ep-engine.git] / src / vbucket.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #pragma once
19
20 #include "config.h"
21
22 #include "bloomfilter.h"
23 #include "checkpoint.h"
24 #include "collections/vbucket_manifest.h"
25 #include "dcp/dcp-types.h"
26 #include "hash_table.h"
27 #include "hlc.h"
28 #include "item_pager.h"
29 #include "monotonic.h"
30
31 #include <platform/non_negative_counter.h>
32 #include <relaxed_atomic.h>
33 #include <atomic>
34 #include <queue>
35
36 class EPStats;
37 class ConflictResolution;
38 class Configuration;
39 class PreLinkDocumentContext;
40 class EventuallyPersistentEngine;
41 class DCPBackfill;
42 class RollbackResult;
43
44 /**
45  * The following will be used to identify
46  * the source of an item's expiration.
47  */
48 enum class ExpireBy { Pager, Compactor, Access };
49
50 /* Structure that holds info needed for notification for an item being updated
51    in the vbucket */
52 struct VBNotifyCtx {
53     VBNotifyCtx() : bySeqno(0), notifyReplication(false), notifyFlusher(false) {
54     }
55     Monotonic<int64_t> bySeqno;
56     bool notifyReplication;
57     bool notifyFlusher;
58 };
59
60 /**
61  * Structure that holds info needed to queue an item in chkpt or vb backfill
62  * queue
63  */
64 struct VBQueueItemCtx {
65     VBQueueItemCtx(GenerateBySeqno genBySeqno,
66                    GenerateCas genCas,
67                    TrackCasDrift trackCasDrift,
68                    bool isBackfillItem,
69                    PreLinkDocumentContext* preLinkDocumentContext_)
70         : genBySeqno(genBySeqno),
71           genCas(genCas),
72           trackCasDrift(trackCasDrift),
73           isBackfillItem(isBackfillItem),
74           preLinkDocumentContext(preLinkDocumentContext_) {
75     }
76     /* Indicates if we should queue an item or not. If this is false other
77        members should not be used */
78     GenerateBySeqno genBySeqno;
79     GenerateCas genCas;
80     TrackCasDrift trackCasDrift;
81     bool isBackfillItem;
82     PreLinkDocumentContext* preLinkDocumentContext;
83 };
84
85 /**
86  * Structure that holds seqno based or checkpoint persistence based high
87  * priority requests to a vbucket
88  */
89 struct HighPriorityVBEntry {
90     HighPriorityVBEntry(const void* c,
91                         uint64_t idNum,
92                         HighPriorityVBNotify reqType)
93         : cookie(c), id(idNum), reqType(reqType), start(gethrtime()) {
94     }
95
96     const void* cookie;
97     uint64_t id;
98     HighPriorityVBNotify reqType;
99
100     /* for stats (histogram) */
101     hrtime_t start;
102 };
103
104 typedef std::unique_ptr<Callback<const uint16_t, const VBNotifyCtx&>>
105         NewSeqnoCallback;
106
107 /**
108  * Function object that returns true if the given vbucket is acceptable.
109  */
110 class VBucketFilter {
111 public:
112
113     /**
114      * Instiatiate a VBucketFilter that always returns true.
115      */
116     explicit VBucketFilter() : acceptable() {}
117
118     /**
119      * Instantiate a VBucketFilter that returns true for any of the
120      * given vbucket IDs.
121      */
122     explicit VBucketFilter(const std::vector<uint16_t> &a) :
123         acceptable(a.begin(), a.end()) {}
124
125     explicit VBucketFilter(const std::set<uint16_t> &s) : acceptable(s) {}
126
127     void assign(const std::set<uint16_t> &a) {
128         acceptable = a;
129     }
130
131     bool operator ()(uint16_t v) const {
132         return acceptable.empty() || acceptable.find(v) != acceptable.end();
133     }
134
135     size_t size() const { return acceptable.size(); }
136
137     bool empty() const { return acceptable.empty(); }
138
139     void reset() {
140         acceptable.clear();
141     }
142
143     /**
144      * Calculate the difference between this and another filter.
145      * If "this" contains elements, [1,2,3,4] and other contains [3,4,5,6]
146      * the returned filter contains: [1,2,5,6]
147      * @param other the other filter to compare with
148      * @return a new filter with the elements present in only one of the two
149      *         filters.
150      */
151     VBucketFilter filter_diff(const VBucketFilter &other) const;
152
153     /**
154      * Calculate the intersection between this and another filter.
155      * If "this" contains elements, [1,2,3,4] and other contains [3,4,5,6]
156      * the returned filter contains: [3,4]
157      * @param other the other filter to compare with
158      * @return a new filter with the elements present in both of the two
159      *         filters.
160      */
161     VBucketFilter filter_intersection(const VBucketFilter &other) const;
162
163     const std::set<uint16_t> &getVBSet() const { return acceptable; }
164
165     bool addVBucket(uint16_t vbucket) {
166         std::pair<std::set<uint16_t>::iterator, bool> rv = acceptable.insert(vbucket);
167         return rv.second;
168     }
169
170     void removeVBucket(uint16_t vbucket) {
171         acceptable.erase(vbucket);
172     }
173
174     /**
175      * Dump the filter in a human readable form ( "{ bucket, bucket, bucket }"
176      * to the specified output stream.
177      */
178     friend std::ostream& operator<< (std::ostream& out,
179                                      const VBucketFilter &filter);
180
181 private:
182
183     std::set<uint16_t> acceptable;
184 };
185
186 class EventuallyPersistentEngine;
187 class FailoverTable;
188 class KVShard;
189
190 /**
191  * An individual vbucket.
192  */
193 class VBucket : public RCValue {
194 public:
195
196     // Identifier for a vBucket
197     typedef uint16_t id_type;
198
199     VBucket(id_type i,
200             vbucket_state_t newState,
201             EPStats& st,
202             CheckpointConfig& chkConfig,
203             int64_t lastSeqno,
204             uint64_t lastSnapStart,
205             uint64_t lastSnapEnd,
206             std::unique_ptr<FailoverTable> table,
207             std::shared_ptr<Callback<id_type>> flusherCb,
208             std::unique_ptr<AbstractStoredValueFactory> valFact,
209             NewSeqnoCallback newSeqnoCb,
210             Configuration& config,
211             item_eviction_policy_t evictionPolicy,
212             vbucket_state_t initState = vbucket_state_dead,
213             uint64_t purgeSeqno = 0,
214             uint64_t maxCas = 0,
215             const std::string& collectionsManifest = "");
216
217     virtual ~VBucket();
218
219     int64_t getHighSeqno() const {
220         return checkpointManager.getHighSeqno();
221     }
222
223     size_t getChkMgrMemUsage() {
224         return checkpointManager.getMemoryUsage();
225     }
226
227     size_t getChkMgrMemUsageOfUnrefCheckpoints() {
228         return checkpointManager.getMemoryUsageOfUnrefCheckpoints();
229     }
230
231     uint64_t getPurgeSeqno() const {
232         return purge_seqno;
233     }
234
235     void setPurgeSeqno(uint64_t to) {
236         purge_seqno = to;
237     }
238
239     void setPersistedSnapshot(uint64_t start, uint64_t end) {
240         LockHolder lh(snapshotMutex);
241         persisted_snapshot_start = start;
242         persisted_snapshot_end = end;
243     }
244
245     snapshot_range_t getPersistedSnapshot() const {
246         LockHolder lh(snapshotMutex);
247         return {persisted_snapshot_start, persisted_snapshot_end};
248     }
249
250     uint64_t getMaxCas() const {
251         return hlc.getMaxHLC();
252     }
253
254     void setMaxCas(uint64_t cas) {
255         hlc.setMaxHLC(cas);
256     }
257
258     void setMaxCasAndTrackDrift(uint64_t cas) {
259         hlc.setMaxHLCAndTrackDrift(cas);
260     }
261
262     void forceMaxCas(uint64_t cas) {
263         hlc.forceMaxHLC(cas);
264     }
265
266     HLC::DriftStats getHLCDriftStats() const {
267         return hlc.getDriftStats();
268     }
269
270     HLC::DriftExceptions getHLCDriftExceptionCounters() const {
271         return hlc.getDriftExceptionCounters();
272     }
273
274     void setHLCDriftAheadThreshold(std::chrono::microseconds threshold) {
275         hlc.setDriftAheadThreshold(threshold);
276     }
277
278     void setHLCDriftBehindThreshold(std::chrono::microseconds threshold) {
279         hlc.setDriftBehindThreshold(threshold);
280     }
281
282     bool isTakeoverBackedUp() {
283         return takeover_backed_up.load();
284     }
285
286     void setTakeoverBackedUpState(bool to) {
287         bool inverse = !to;
288         takeover_backed_up.compare_exchange_strong(inverse, to);
289     }
290
291     // States whether the VBucket is in the process of being created
292     bool isBucketCreation() const {
293         return bucketCreation.load();
294     }
295
296     bool setBucketCreation(bool rv) {
297         bool inverse = !rv;
298         return bucketCreation.compare_exchange_strong(inverse, rv);
299     }
300
301     // States whether the VBucket is in the process of being deleted
302     bool isBucketDeletion() const {
303         return bucketDeletion.load();
304     }
305
306     bool setBucketDeletion(bool delBucket) {
307         bool inverse = !delBucket;
308         return bucketDeletion.compare_exchange_strong(inverse, delBucket);
309     }
310
311     // Returns the last persisted sequence number for the VBucket
312     virtual uint64_t getPersistenceSeqno() const = 0;
313
314     void setPersistenceSeqno(uint64_t seqno) {
315         persistenceSeqno.store(seqno);
316     }
317
318     id_type getId() const { return id; }
319     vbucket_state_t getState(void) const { return state.load(); }
320     void setState(vbucket_state_t to);
321     cb::RWLock& getStateLock() {return stateLock;}
322
323     vbucket_state_t getInitialState(void) { return initialState; }
324     void setInitialState(vbucket_state_t initState) {
325         initialState = initState;
326     }
327
328     vbucket_state getVBucketState() const;
329
330     /**
331      * This method performs operations on the stored value prior
332      * to expiring the item.
333      *
334      * @param v the stored value
335      */
336     void handlePreExpiry(StoredValue& v);
337
338     bool addPendingOp(const void *cookie) {
339         LockHolder lh(pendingOpLock);
340         if (state != vbucket_state_pending) {
341             // State transitioned while we were waiting.
342             return false;
343         }
344         // Start a timer when enqueuing the first client.
345         if (pendingOps.empty()) {
346             pendingOpsStart = gethrtime();
347         }
348         pendingOps.push_back(cookie);
349         ++stats.pendingOps;
350         ++stats.pendingOpsTotal;
351         return true;
352     }
353
354     void doStatsForQueueing(const Item& item, size_t itemBytes);
355     void doStatsForFlushing(const Item& item, size_t itemBytes);
356     void incrMetaDataDisk(const Item& qi);
357     void decrMetaDataDisk(const Item& qi);
358
359     /// Reset all statistics assocated with this vBucket.
360     virtual void resetStats();
361
362     // Get age sum in millisecond
363     uint64_t getQueueAge() {
364         uint64_t currDirtyQueueAge = dirtyQueueAge.load(
365                                                     std::memory_order_relaxed);
366         rel_time_t currentAge = ep_current_time() * dirtyQueueSize;
367         if (currentAge < currDirtyQueueAge) {
368             return 0;
369         }
370         return (currentAge - currDirtyQueueAge) * 1000;
371     }
372
373     void fireAllOps(EventuallyPersistentEngine &engine);
374
375     size_t size(void) {
376         HashTableDepthStatVisitor v;
377         ht.visitDepth(v);
378         return v.size;
379     }
380
381     size_t getBackfillSize() {
382         LockHolder lh(backfill.mutex);
383         return backfill.items.size();
384     }
385
386     /**
387      * Process an item that is got from a backfill (TAP or DCP).
388      * It puts it onto a queue for persistence and/or generates a seqno and
389      * updates stats
390      *
391      * @param qi item to be processed
392      * @param generateBySeqno indicates if a new seqno must generated or the
393      *                        seqno in the item must be used
394      *
395      *
396      */
397     virtual void queueBackfillItem(queued_item& qi,
398                                    const GenerateBySeqno generateBySeqno) = 0;
399
400     void getBackfillItems(std::vector<queued_item> &items) {
401         LockHolder lh(backfill.mutex);
402         size_t num_items = backfill.items.size();
403         while (!backfill.items.empty()) {
404             items.push_back(backfill.items.front());
405             backfill.items.pop();
406         }
407         stats.vbBackfillQueueSize.fetch_sub(num_items);
408         stats.memOverhead->fetch_sub(num_items * sizeof(queued_item));
409     }
410
411     bool isBackfillPhase() {
412         return backfill.isBackfillPhase.load();
413     }
414
415     void setBackfillPhase(bool backfillPhase) {
416         backfill.isBackfillPhase.store(backfillPhase);
417     }
418
419     /**
420      * Returns the map of bgfetch items for this vbucket, clearing the
421      * pendingBGFetches.
422      */
423     virtual vb_bgfetch_queue_t getBGFetchItems() = 0;
424
425     virtual bool hasPendingBGFetchItems() = 0;
426
427     static const char* toString(vbucket_state_t s) {
428         switch(s) {
429         case vbucket_state_active: return "active"; break;
430         case vbucket_state_replica: return "replica"; break;
431         case vbucket_state_pending: return "pending"; break;
432         case vbucket_state_dead: return "dead"; break;
433         }
434         return "unknown";
435     }
436
437     static vbucket_state_t fromString(const char* state) {
438         if (strcmp(state, "active") == 0) {
439             return vbucket_state_active;
440         } else if (strcmp(state, "replica") == 0) {
441             return vbucket_state_replica;
442         } else if (strcmp(state, "pending") == 0) {
443             return vbucket_state_pending;
444         } else {
445             return vbucket_state_dead;
446         }
447     }
448
449     /**
450      * Checks and decides whether to add high priority request on the vbucket.
451      * This is an async request made by modules like ns-server during
452      * rebalance. The request is for a response from the vbucket when it
453      * 'sees' beyond a certain sequence number or when a certain checkpoint
454      * is persisted.
455      * Depending on the vbucket type, the meaning 'seeing' a sequence number
456      * changes. That is, it could mean persisted in case of EPVBucket and
457      * added to the sequenced data structure in case of EphemeralVBucket.
458      *
459      * @param seqnoOrChkId seqno to be seen or checkpoint id to be persisted
460      * @param cookie cookie of conn to be notified
461      * @param reqType indicating request for seqno or chk persistence
462      *
463      * @return RequestScheduled if a high priority request is added and
464      *                          notification will be done asynchronously
465      *         NotSupported if the request is not supported for the reqType
466      *         RequestNotScheduled if a high priority request is NOT added (as
467      *                             it is not required). This implies there won't
468      *                             be a subsequent notification
469      */
470     virtual HighPriorityVBReqStatus checkAddHighPriorityVBEntry(
471             uint64_t seqnoOrChkId,
472             const void* cookie,
473             HighPriorityVBNotify reqType) = 0;
474
475     /**
476      * Notify the high priority requests on the vbucket.
477      * This is the response to async requests made by modules like ns-server
478      * during rebalance.
479      *
480      * @param engine Ref to ep-engine
481      * @param id seqno or checkpoint id causing the notification(s).
482      * @param notifyType indicating notify for seqno or chk persistence
483      */
484     virtual void notifyHighPriorityRequests(
485             EventuallyPersistentEngine& engine,
486             uint64_t id,
487             HighPriorityVBNotify notifyType) = 0;
488
489     virtual void notifyAllPendingConnsFailed(EventuallyPersistentEngine& e) = 0;
490
491     /**
492      * Get high priority notifications for a seqno or checkpoint persisted
493      *
494      * @param engine Ref to ep-engine
495      * @param id seqno or checkpoint id for which notifies are to be found
496      * @param notifyType indicating notify for seqno or chk persistence
497      *
498      * @return map of notifications with conn cookie as the key and notify
499      *         status as the value
500      */
501     std::map<const void*, ENGINE_ERROR_CODE> getHighPriorityNotifications(
502             EventuallyPersistentEngine& engine,
503             uint64_t idNum,
504             HighPriorityVBNotify notifyType);
505
506     size_t getHighPriorityChkSize() {
507         return numHpVBReqs.load();
508     }
509
510     /**
511      * BloomFilter operations for vbucket
512      */
513     void createFilter(size_t key_count, double probability);
514     void initTempFilter(size_t key_count, double probability);
515     void addToFilter(const DocKey& key);
516     virtual bool maybeKeyExistsInFilter(const DocKey& key);
517     bool isTempFilterAvailable();
518     void addToTempFilter(const DocKey& key);
519     void swapFilter();
520     void clearFilter();
521     void setFilterStatus(bfilter_status_t to);
522     std::string getFilterStatusString();
523     size_t getFilterSize();
524     size_t getNumOfKeysInFilter();
525
526     uint64_t nextHLCCas() {
527         return hlc.nextHLC();
528     }
529
530     // Applicable only for FULL EVICTION POLICY
531     bool isResidentRatioUnderThreshold(float threshold);
532
533     virtual void addStats(bool details, ADD_STAT add_stat, const void* c) = 0;
534
535     virtual KVShard* getShard() = 0;
536
537     virtual size_t getNumItems() const = 0;
538
539     size_t getNumNonResidentItems() const;
540
541     size_t getNumTempItems(void) {
542         return ht.getNumTempItems();
543     }
544
545     void incrRollbackItemCount(uint64_t val) {
546         rollbackItemCount.fetch_add(val, std::memory_order_relaxed);
547     }
548
549     uint64_t getRollbackItemCount(void) {
550         return rollbackItemCount.load(std::memory_order_relaxed);
551     }
552
553     // Return the persistence checkpoint ID
554     uint64_t getPersistenceCheckpointId() const;
555
556     // Set the persistence checkpoint ID to the given value.
557     void setPersistenceCheckpointId(uint64_t checkpointId);
558
559     // Mark the value associated with the given key as dirty
560     void markDirty(const DocKey& key);
561
562     /**
563      * Obtain the read handle for the collections manifest.
564      * The caller will have read-only access to manifest using the methods
565      * exposed by the ReadHandle
566      */
567     Collections::VB::Manifest::ReadHandle lockCollections() const {
568         return manifest.lock();
569     }
570
571     /**
572      * Update the Collections::VB::Manifest and the VBucket.
573      * Adds SystemEvents for the create and delete of collections into the
574      * checkpoint.
575      *
576      * @param m A Collections::Manifest to apply to the VB::Manifest
577      */
578     void updateFromManifest(const Collections::Manifest& m) {
579         manifest.wlock().update(*this, m);
580     }
581
582     /**
583      * Finalise the deletion of a collection (no items remain in the collection)
584      *
585      * @param collection "string-view" name of the collection
586      * @param revision The Manifest revision which initiated the delete.
587      */
588     void completeDeletion(cb::const_char_buffer collection, uint32_t revision) {
589         manifest.wlock().completeDeletion(*this, collection, revision);
590     }
591
592     /**
593      * Add a collection to this vbucket with a pre-assigned seqno. I.e.
594      * this VB is a replica.
595      *
596      * @param collection collection name to add.
597      * @param revision revision of the collection to add.
598      * @param bySeqno The seqno assigned to the collection create event.
599      */
600     void replicaAddCollection(cb::const_char_buffer collection,
601                               uint32_t revision,
602                               int64_t bySeqno) {
603         manifest.wlock().replicaAdd(*this, collection, revision, bySeqno);
604     }
605
606     /**
607      * Delete a collection from this vbucket with a pre-assigned seqno. I.e.
608      * this VB is a replica.
609      *
610      * @param collection collection name to delete.
611      * @param revision revision of the manifest starting the delete.
612      * @param bySeqno The seqno assigned to the collection delete event.
613      */
614     void replicaBeginDeleteCollection(cb::const_char_buffer collection,
615                                       uint32_t revision,
616                                       int64_t bySeqno) {
617         manifest.wlock().replicaBeginDelete(
618                 *this, collection, revision, bySeqno);
619     }
620
621     /**
622      * Delete a collection from this vbucket with a pre-assigned seqno. I.e.
623      * this VB is a replica.
624      *
625      * @param separator The new separator.
626      * @param revision The revision which changed the separator.
627      * @param bySeqno The seqno assigned to the change separator event.
628      */
629     void replicaChangeCollectionSeparator(cb::const_char_buffer separator,
630                                           uint32_t revision,
631                                           int64_t bySeqno) {
632         manifest.wlock().replicaChangeSeparator(
633                 *this, separator, revision, bySeqno);
634     }
635
636     static const vbucket_state_t ACTIVE;
637     static const vbucket_state_t REPLICA;
638     static const vbucket_state_t PENDING;
639     static const vbucket_state_t DEAD;
640
641     HashTable         ht;
642     CheckpointManager checkpointManager;
643
644     // Struct for managing 'backfill' items - Items which have been added by
645     // an incoming TAP stream and need to be persisted to disk.
646     struct {
647         std::mutex mutex;
648         std::queue<queued_item> items;
649         std::atomic<bool> isBackfillPhase;
650     } backfill;
651
652     /**
653      * Gets the valid StoredValue for the key and deletes an expired item if
654      * desired by the caller. Requires the hash bucket to be locked
655      *
656      * @param hbl Reference to the hash bucket lock
657      * @param key
658      * @param wantsDeleted
659      * @param trackReference
660      * @param queueExpired Delete an expired item
661      */
662     StoredValue* fetchValidValue(HashTable::HashBucketLock& hbl,
663                                  const DocKey& key,
664                                  WantsDeleted wantsDeleted,
665                                  TrackReference trackReference,
666                                  QueueExpired queueExpired);
667
668     /**
669      * Complete the background fetch for the specified item. Depending on the
670      * state of the item, restore it to the hashtable as appropriate,
671      * potentially queuing it as dirty.
672      *
673      * @param key The key of the item
674      * @param fetched_item The item which has been fetched.
675      * @param startTime The time processing of the batch of items started.
676      *
677      * @return ENGINE_ERROR_CODE status notified to be to the front end
678      */
679     virtual ENGINE_ERROR_CODE completeBGFetchForSingleItem(
680             const DocKey& key,
681             const VBucketBGFetchItem& fetched_item,
682             const ProcessClock::time_point startTime) = 0;
683
684     /**
685      * Retrieve an item from the disk for vkey stats
686      *
687      * @param key the key to fetch
688      * @param cookie the connection cookie
689      * @param eviction_policy The eviction policy
690      * @param engine Reference to ep engine
691      * @param bgFetchDelay
692      *
693      * @return VBReturnCtx indicates notifyCtx and operation result
694      */
695     virtual ENGINE_ERROR_CODE statsVKey(const DocKey& key,
696                                         const void* cookie,
697                                         EventuallyPersistentEngine& engine,
698                                         int bgFetchDelay) = 0;
699
700     /**
701      * Complete the vkey stats for an item background fetched from disk.
702      *
703      * @param key The key of the item
704      * @param gcb Bgfetch cbk obj containing the item from disk
705      *
706      */
707     virtual void completeStatsVKey(
708             const DocKey& key, const RememberingCallback<GetValue>& gcb) = 0;
709
710     /**
711      * Set (add new or update) an item into in-memory structure like
712      * hash table and do not generate a seqno. This is called internally from
713      * ep-engine when we want to update our in-memory data (like in HT) with
714      * another source of truth like disk.
715      * Currently called during rollback.
716      *
717      * @param itm Item to be added or updated. Upon success, the itm
718      *            revSeqno are updated
719      *
720      * @return Result indicating the status of the operation
721      */
722     MutationStatus setFromInternal(Item& itm);
723
724     /**
725      * Set (add new or update) an item in the vbucket.
726      *
727      * @param itm Item to be added or updated. Upon success, the itm
728      *            bySeqno, cas and revSeqno are updated
729      * @param cookie the connection cookie
730      * @param engine Reference to ep engine
731      * @param bgFetchDelay
732      *
733      * @return ENGINE_ERROR_CODE status notified to be to the front end
734      */
735     ENGINE_ERROR_CODE set(Item& itm,
736                           const void* cookie,
737                           EventuallyPersistentEngine& engine,
738                           int bgFetchDelay);
739
740     /**
741      * Replace (overwrite existing) an item in the vbucket.
742      *
743      * @param itm Item to be added or updated. Upon success, the itm
744      *            bySeqno, cas and revSeqno are updated
745      * @param cookie the connection cookie
746      * @param engine Reference to ep engine
747      * @param bgFetchDelay
748      *
749      * @return ENGINE_ERROR_CODE status notified to be to the front end
750      */
751     ENGINE_ERROR_CODE replace(Item& itm,
752                               const void* cookie,
753                               EventuallyPersistentEngine& engine,
754                               int bgFetchDelay);
755
756     /**
757      * Add an item directly into its vbucket rather than putting it on a
758      * checkpoint (backfill the item). The can happen during TAP or when a
759      * replica vbucket is receiving backfill items from active vbucket.
760      *
761      * @param itm Item to be added/updated from TAP or DCP backfill. Upon
762      *            success, the itm revSeqno is updated
763      * @param genBySeqno whether or not to generate sequence number
764      *
765      * @return the result of the operation
766      */
767     ENGINE_ERROR_CODE addBackfillItem(Item& itm, GenerateBySeqno genBySeqno);
768
769     /**
770      * Set an item in the store from a non-front end operation (DCP, XDCR)
771      *
772      * @param item the item to set. Upon success, the itm revSeqno is updated
773      * @param cas value to match
774      * @param seqno sequence number of mutation
775      * @param cookie the cookie representing the client to store the item
776      * @param engine Reference to ep engine
777      * @param bgFetchDelay
778      * @param force override vbucket states
779      * @param allowExisting set to false if you want set to fail if the
780      *                      item exists already
781      * @param genBySeqno whether or not to generate sequence number
782      * @param genCas
783      * @param isReplication set to true if we are to use replication
784      *                      throttle threshold
785      *
786      * @return the result of the store operation
787      */
788     ENGINE_ERROR_CODE setWithMeta(Item& itm,
789                                   uint64_t cas,
790                                   uint64_t* seqno,
791                                   const void* cookie,
792                                   EventuallyPersistentEngine& engine,
793                                   int bgFetchDelay,
794                                   bool force,
795                                   bool allowExisting,
796                                   GenerateBySeqno genBySeqno,
797                                   GenerateCas genCas,
798                                   bool isReplication);
799
800     /**
801      * Delete an item in the vbucket
802      *
803      * @param key key to be deleted
804      * @param[in,out] cas value to match; new cas after logical delete
805      * @param cookie the cookie representing the client to store the item
806      * @param engine Reference to ep engine
807      * @param bgFetchDelay
808      * @param[out] itemMeta pointer to item meta data that needs to be returned
809      *                      as a result the delete. A NULL pointer indicates
810      *                      that no meta data needs to be returned.
811      * @param[out] mutInfo Info to uniquely identify (and order) the delete
812      *                     seq. A NULL pointer indicates no info needs to be
813      *                     returned.
814      *
815      * @return the result of the operation
816      */
817     ENGINE_ERROR_CODE deleteItem(const DocKey& key,
818                                  uint64_t& cas,
819                                  const void* cookie,
820                                  EventuallyPersistentEngine& engine,
821                                  int bgFetchDelay,
822                                  ItemMetaData* itemMeta,
823                                  mutation_descr_t* mutInfo);
824
825     /**
826      * Delete an item in the vbucket from a non-front end operation (DCP, XDCR)
827      *
828      * @param key key to be deleted
829      * @param[in, out] cas value to match; new cas after logical delete
830      * @param[out] seqno Pointer to get the seqno generated for the item. A
831      *                   NULL value is passed if not needed
832      * @param cookie the cookie representing the client to store the item
833      * @param engine Reference to ep engine
834      * @param bgFetchDelay
835      * @param force force a delete in full eviction mode without doing a
836      *              bg fetch
837      * @param itemMeta ref to item meta data
838      * @param backfill indicates if the item must be put onto vb queue or
839      *                 onto checkpoint
840      * @param genBySeqno whether or not to generate sequence number
841      * @param generateCas whether or not to generate cas
842      * @param bySeqno seqno of the key being deleted
843      * @param isReplication set to true if we are to use replication
844      *                      throttle threshold
845      *
846      * @return the result of the operation
847      */
848     ENGINE_ERROR_CODE deleteWithMeta(const DocKey& key,
849                                      uint64_t& cas,
850                                      uint64_t* seqno,
851                                      const void* cookie,
852                                      EventuallyPersistentEngine& engine,
853                                      int bgFetchDelay,
854                                      bool force,
855                                      const ItemMetaData& itemMeta,
856                                      bool backfill,
857                                      GenerateBySeqno genBySeqno,
858                                      GenerateCas generateCas,
859                                      uint64_t bySeqno,
860                                      bool isReplication);
861
862     /**
863      * Delete an expired item
864      *
865      * @param key key to be deleted
866      * @param startTime the time to be compared with this item's expiry time
867      * @param revSeqno revision id sequence number
868      * @param source Expiry source
869      */
870     void deleteExpiredItem(const DocKey& key,
871                            time_t startTime,
872                            uint64_t revSeqno,
873                            ExpireBy source);
874
875     /**
876      * Evict a key from memory.
877      *
878      * @param key Key to evict
879      * @param[out] msg Updated to point to a string (with static duration)
880      *                 describing the result of the operation.
881      *
882      * @return SUCCESS if key was successfully evicted (or was already
883      *                 evicted), or the reason why the request failed.
884      *
885      */
886     virtual protocol_binary_response_status evictKey(const DocKey& key,
887                                                      const char** msg) = 0;
888
889     /**
890      * Page out a StoredValue from memory.
891      *
892      * The definition of "page out" is up to the underlying VBucket
893      * implementation - this may mean simply ejecting the value from memory
894      * (Value Eviction), removing the entire document from memory (Full Eviction),
895      * or actually deleting the document (Ephemeral Buckets).
896      *
897      * @param lh Bucket lock associated with the StoredValue.
898      * @param v[in, out] Ref to the StoredValue to be ejected. Based on the
899      *                   VBucket type, policy in the vbucket contents of v and
900      *                   v itself may be changed
901      *
902      * @return true if an item is ejected.
903      */
904     virtual bool pageOut(const HashTable::HashBucketLock& lh,
905                          StoredValue*& v) = 0;
906
907     /**
908      * Add an item in the store
909      *
910      * @param itm the item to add. On success, this will have its seqno and
911      *            CAS updated.
912      * @param cookie the cookie representing the client to store the item
913      * @param engine Reference to ep engine
914      * @param bgFetchDelay
915      *
916      * @return the result of the operation
917      */
918     ENGINE_ERROR_CODE add(Item& itm,
919                           const void* cookie,
920                           EventuallyPersistentEngine& engine,
921                           int bgFetchDelay);
922
923     /**
924      * Retrieve a value, but update its TTL first
925      *
926      * @param key the key to fetch
927      * @param cookie the connection cookie
928      * @param engine Reference to ep engine
929      * @param bgFetchDelay
930      * @param exptime the new expiry time for the object
931      *
932      * @return a GetValue representing the result of the request
933      */
934     GetValue getAndUpdateTtl(const DocKey& key,
935                              const void* cookie,
936                              EventuallyPersistentEngine& engine,
937                              int bgFetchDelay,
938                              time_t exptime);
939     /**
940      * Queue an Item to the checkpoint and return its seqno
941      *
942      * @param item an Item object to queue, can be any kind of item and will be
943      *        given a CAS and seqno by this function.
944      * @param seqno An optional sequence number, if not specified checkpoint
945      *        queueing will assign a seqno to the Item.
946      */
947     int64_t queueItem(Item* item, OptionalSeqno seqno);
948
949     /**
950      * Insert an item into the VBucket during warmup. If we're trying to insert
951      * a partial item we mark it as nonResident
952      *
953      * @param itm Item to insert. itm is not modified. But cannot be passed as
954      *            const because it is passed to functions that can generally
955      *            modify the itm but do not modify it due to the flags passed.
956      * @param eject true if we should eject the value immediately
957      * @param keyMetaDataOnly is this just the key and meta-data or a complete
958      *                        item
959      *
960      * @return the result of the operation
961      */
962     MutationStatus insertFromWarmup(Item& itm,
963                                     bool eject,
964                                     bool keyMetaDataOnly);
965
966     /**
967      * Get metadata and value for a given key
968      *
969      * @param key key for which metadata and value should be retrieved
970      * @param cookie the cookie representing the client
971      * @param engine Reference to ep engine
972      * @param bgFetchDelay
973      * @param options flags indicating some retrieval related info
974      * @param diskFlushAll
975      *
976      * @return the result of the operation
977      */
978     GetValue getInternal(const DocKey& key,
979                          const void* cookie,
980                          EventuallyPersistentEngine& engine,
981                          int bgFetchDelay,
982                          get_options_t options,
983                          bool diskFlushAll);
984
985     /**
986      * Retrieve the meta data for given key
987      *
988      * @param key the key to get the meta data for
989      * @param cookie the connection cookie
990      * @param engine Reference to ep engine
991      * @param bgFetchDelay Delay in secs before we run the bgFetch task
992      * @param fetchDatatype whether to fetch datatype or not
993      * @param[out] metadata meta information returned to the caller
994      * @param[out] deleted specifies the caller whether or not the key is
995      *                     deleted
996      * @param[out] datatype specifies the datatype of the item
997      *
998      * @return the result of the operation
999      */
1000     ENGINE_ERROR_CODE getMetaData(const DocKey& key,
1001                                   const void* cookie,
1002                                   EventuallyPersistentEngine& engine,
1003                                   int bgFetchDelay,
1004                                   bool fetchDatatype,
1005                                   ItemMetaData& metadata,
1006                                   uint32_t& deleted,
1007                                   uint8_t& datatype);
1008
1009     /**
1010      * Looks up the key stats for the given {vbucket, key}.
1011      *
1012      * @param key The key to lookup
1013      * @param cookie The client's cookie
1014      * @param engine Reference to ep engine
1015      * @param bgFetchDelay
1016      * @param[out] kstats On success the keystats for this item.
1017      * @param wantsDeleted If yes then return keystats even if the item is
1018      *                     marked as deleted. If no then will return
1019      *                     ENGINE_KEY_ENOENT for deleted items.
1020      *
1021      * @return the result of the operation
1022      */
1023     ENGINE_ERROR_CODE getKeyStats(const DocKey& key,
1024                                   const void* cookie,
1025                                   EventuallyPersistentEngine& engine,
1026                                   int bgFetchDelay,
1027                                   struct key_stats& kstats,
1028                                   WantsDeleted wantsDeleted);
1029
1030     /**
1031      * Gets a locked item for a given key.
1032      *
1033      * @param key The key to lookup
1034      * @param currentTime Current time to use for locking the item for a
1035      *                    duration of lockTimeout
1036      * @param lockTimeout Timeout for the lock on the item
1037      * @param cookie The client's cookie
1038      * @param engine Reference to ep engine
1039      * @param bgFetchDelay Delay in secs before we run the bgFetch task
1040      *
1041      * @return the result of the operation (contains locked item on success)
1042      */
1043     GetValue getLocked(const DocKey& key,
1044                        rel_time_t currentTime,
1045                        uint32_t lockTimeout,
1046                        const void* cookie,
1047                        EventuallyPersistentEngine& engine,
1048                        int bgFetchDelay);
1049     /**
1050      * Update in memory data structures after an item is deleted on disk
1051      *
1052      * @param queuedItem reference to the deleted item
1053      * @param deleted indicates if item actaully deleted or not (in case item
1054      *                did not exist on disk)
1055      */
1056     void deletedOnDiskCbk(const Item& queuedItem, bool deleted);
1057
1058     /**
1059      * Update in memory data structures after a rollback on disk
1060      *
1061      * @param queuedItem item key
1062      *
1063      * @return indicates if the operation is succcessful
1064      */
1065     bool deleteKey(const DocKey& key);
1066
1067     /**
1068      * Creates a DCP backfill object
1069      *
1070      * @param e ref to EventuallyPersistentEngine
1071      * @param stream ref to the stream for which this backfill obj is created
1072      * @param startSeqno requested start sequence number of the backfill
1073      * @param endSeqno requested end sequence number of the backfill
1074      *
1075      * @return pointer to the backfill object created. Caller to own this
1076      *         object and hence must handle deletion.
1077      */
1078     virtual std::unique_ptr<DCPBackfill> createDCPBackfill(
1079             EventuallyPersistentEngine& e,
1080             const active_stream_t& stream,
1081             uint64_t startSeqno,
1082             uint64_t endSeqno) = 0;
1083
1084     /**
1085      * Update failovers, checkpoint mgr and other vBucket members after
1086      * rollback.
1087      *
1088      * @param rollbackResult contains high seqno of the vBucket after rollback,
1089      *                       snapshot start seqno of the last snapshot in the
1090      *                       vBucket after the rollback,
1091      *                       snapshot end seqno of the last snapshot in the
1092      *                       vBucket after the rollback
1093      * @param prevHighSeqno high seqno before the rollback
1094      */
1095     void postProcessRollback(const RollbackResult& rollbackResult,
1096                              uint64_t prevHighSeqno);
1097
1098     /**
1099      * Debug - print a textual description of the VBucket to stderr.
1100      */
1101     virtual void dump() const;
1102
1103     /**
1104      * Returns the number of deletes in the memory
1105      *
1106      * @return number of deletes
1107      */
1108     size_t getNumInMemoryDeletes() const {
1109         /* couchbase vbuckets: this is generally (after deletes are persisted)
1110                                zero as hash table doesn't keep deletes after
1111                                they are persisted.
1112            ephemeral vbuckets: we keep deletes in both hash table and ordered
1113                                data structure. */
1114         return ht.getNumDeletedItems();
1115     }
1116
1117     static size_t getCheckpointFlushTimeout();
1118
1119     std::queue<queued_item> rejectQueue;
1120     std::unique_ptr<FailoverTable> failovers;
1121
1122     std::atomic<size_t>  opsCreate;
1123     std::atomic<size_t>  opsUpdate;
1124     std::atomic<size_t>  opsDelete;
1125     std::atomic<size_t>  opsReject;
1126
1127     cb::NonNegativeCounter<size_t> dirtyQueueSize;
1128     std::atomic<size_t>  dirtyQueueMem;
1129     std::atomic<size_t>  dirtyQueueFill;
1130     std::atomic<size_t>  dirtyQueueDrain;
1131     std::atomic<uint64_t> dirtyQueueAge;
1132     std::atomic<size_t>  dirtyQueuePendingWrites;
1133     std::atomic<size_t>  metaDataDisk;
1134
1135     std::atomic<size_t>  numExpiredItems;
1136
1137 protected:
1138     /**
1139      * This function checks cas, expiry and other partition (vbucket) related
1140      * rules before setting an item into other in-memory structure like HT,
1141      * and checkpoint mgr. This function assumes that HT bucket lock is grabbed.
1142      *
1143      * @param hbl Hash table bucket lock that must be held
1144      * @param v Reference to the ptr of StoredValue. This can be changed if a
1145      *          new StoredValue is added or just its contents is changed if the
1146      *          exisiting StoredValue is updated
1147      * @param itm Item to be added/updated. On success, its revSeqno is updated
1148      * @param cas value to match
1149      * @param allowExisting set to false if you want set to fail if the
1150      *                      item exists already
1151      * @param hasMetaData
1152      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1153      *                    backfill queue; NULL if item need not be queued
1154      * @param maybeKeyExists true if bloom filter predicts that key may exist
1155      * @param isReplication true if issued by consumer (for replication)
1156      *
1157      * @return Result indicating the status of the operation and notification
1158      *                info
1159      */
1160     std::pair<MutationStatus, VBNotifyCtx> processSet(
1161             const HashTable::HashBucketLock& hbl,
1162             StoredValue*& v,
1163             Item& itm,
1164             uint64_t cas,
1165             bool allowExisting,
1166             bool hasMetaData,
1167             const VBQueueItemCtx* queueItmCtx = nullptr,
1168             bool maybeKeyExists = true,
1169             bool isReplication = false);
1170
1171     /**
1172      * This function checks cas, expiry and other partition (vbucket) related
1173      * rules before adding an item into other in-memory structure like HT,
1174      * and checkpoint mgr. This function assumes that HT bucket lock is grabbed.
1175      *
1176      * @param hbl Hash table bucket lock that must be held
1177      * @param v[in, out] the stored value to do this operation on
1178      * @param itm Item to be added/updated. On success, its revSeqno is updated
1179      * @param isReplication true if issued by consumer (for replication)
1180      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1181      *                    backfill queue; NULL if item need not be queued
1182      *
1183      * @return Result indicating the status of the operation and notification
1184      *                info
1185      */
1186     std::pair<AddStatus, VBNotifyCtx> processAdd(
1187             const HashTable::HashBucketLock& hbl,
1188             StoredValue*& v,
1189             Item& itm,
1190             bool maybeKeyExists,
1191             bool isReplication,
1192             const VBQueueItemCtx* queueItmCtx = nullptr);
1193
1194     /**
1195      * This function checks cas, eviction policy and other partition
1196      * (vbucket) related rules before logically (soft) deleting an item in
1197      * in-memory structure like HT, and checkpoint mgr.
1198      * Assumes that HT bucket lock is grabbed.
1199      *
1200      * @param hbl Hash table bucket lock that must be held
1201      * @param v Reference to the StoredValue to be soft deleted
1202      * @param cas the expected CAS of the item (or 0 to override)
1203      * @param metadata ref to item meta data
1204      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1205      *                    backfill queue
1206      * @param use_meta Indicates if v must be updated with the metadata
1207      * @param bySeqno seqno of the key being deleted
1208      *
1209      * @return pointer to the updated StoredValue. It can be same as that of
1210      *         v or different value if a new StoredValue is created for the
1211      *         update.
1212      *         status of the operation.
1213      *         notification info.
1214      */
1215     std::tuple<MutationStatus, StoredValue*, VBNotifyCtx> processSoftDelete(
1216             const HashTable::HashBucketLock& hbl,
1217             StoredValue& v,
1218             uint64_t cas,
1219             const ItemMetaData& metadata,
1220             const VBQueueItemCtx& queueItmCtx,
1221             bool use_meta,
1222             uint64_t bySeqno);
1223
1224     /**
1225      * Delete a key (associated StoredValue) from ALL in-memory data structures
1226      * like HT.
1227      * Assumes that HT bucket lock is grabbed.
1228      *
1229      * Currently StoredValues form HashTable intrusively. That is, HashTable
1230      * does not store a reference or a copy of the StoredValue. If any other
1231      * in-memory data strucutures are formed intrusively using StoredValues,
1232      * then it must be decided in this function which data structure deletes
1233      * the StoredValue. Currently it is HashTable that deleted the StoredValue
1234      *
1235      * @param hbl Hash table bucket lock that must be held
1236      * @param v Reference to the StoredValue to be deleted
1237      *
1238      * @return true if an object was deleted, false otherwise
1239      */
1240     bool deleteStoredValue(const HashTable::HashBucketLock& hbl,
1241                            StoredValue& v);
1242
1243     /**
1244      * Queue an item for persistence and replication. Maybe track CAS drift
1245      *
1246      * The caller of this function must hold the lock of the hash table
1247      * partition that contains the StoredValue being Queued.
1248      *
1249      * @param v the dirty item. The cas and seqno maybe updated based on the
1250      *          flags passed
1251      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1252      *                    backfill queue, whether to track cas, generate seqno,
1253      *                    generate new cas
1254      *
1255      * @return Notification context containing info needed to notify the
1256      *         clients (like connections, flusher)
1257      */
1258     VBNotifyCtx queueDirty(StoredValue& v, const VBQueueItemCtx& queueItmCtx);
1259
1260     /**
1261      * Queue an item for persistence and replication
1262      *
1263      * The caller of this function must hold the lock of the hash table
1264      * partition that contains the StoredValue being Queued.
1265      *
1266      * @param v the dirty item. The cas and seqno maybe updated based on the
1267      *          flags passed
1268      * @param generateBySeqno request that the seqno is generated by this call
1269      * @param generateCas request that the CAS is generated by this call
1270      * @param isBackfillItem indicates if the item must be put onto vb queue or
1271      *        onto checkpoint
1272      * @param preLinkDocumentContext context object which allows running the
1273      *        document pre link callback after the cas is assinged (but
1274      *        but document not available for anyone)
1275      *
1276      * @return Notification context containing info needed to notify the
1277      *         clients (like connections, flusher)
1278      */
1279     VBNotifyCtx queueDirty(
1280             StoredValue& v,
1281             GenerateBySeqno generateBySeqno = GenerateBySeqno::Yes,
1282             GenerateCas generateCas = GenerateCas::Yes,
1283             bool isBackfillItem = false,
1284             PreLinkDocumentContext* preLinkDocumentContext = nullptr);
1285
1286     /**
1287      * Adds a temporary StoredValue in in-memory data structures like HT.
1288      * Assumes that HT bucket lock is grabbed.
1289      *
1290      * @param hbl Hash table bucket lock that must be held
1291      * @param key the key for which a temporary item needs to be added
1292      * @param isReplication true if issued by consumer (for replication)
1293      *
1294      * @return Result indicating the status of the operation
1295      */
1296     AddStatus addTempStoredValue(const HashTable::HashBucketLock& hbl,
1297                                  const DocKey& key,
1298                                  bool isReplication = false);
1299
1300     /**
1301      * Internal wrapper function around the callback to be called when a new
1302      * seqno is generated in the vbucket
1303      *
1304      * @param notifyCtx holds info needed for notification
1305      */
1306     void notifyNewSeqno(const VBNotifyCtx& notifyCtx);
1307
1308     /**
1309      * VBucket internal function to store high priority requests on the vbucket.
1310      *
1311      * @param seqnoOrChkId seqno to be seen or checkpoint id to be persisted
1312      * @param cookie cookie of conn to be notified
1313      * @param reqType request type indicating seqno or chk persistence
1314      */
1315     void addHighPriorityVBEntry(uint64_t seqnoOrChkId,
1316                                 const void* cookie,
1317                                 HighPriorityVBNotify reqType);
1318
1319     /**
1320      * Get all high priority notifications as temporary failures because they
1321      * could not be completed.
1322      *
1323      * @param engine Ref to ep-engine
1324      *
1325      * @return map of notifies with conn cookie as the key and notify status as
1326      *         the value
1327      */
1328     std::map<const void*, ENGINE_ERROR_CODE> tmpFailAndGetAllHpNotifies(
1329             EventuallyPersistentEngine& engine);
1330
1331     void _addStats(bool details, ADD_STAT add_stat, const void* c);
1332
1333     template <typename T>
1334     void addStat(const char* nm,
1335                  const T& val,
1336                  ADD_STAT add_stat,
1337                  const void* c);
1338
1339     /* This member holds the eviction policy used */
1340     const item_eviction_policy_t eviction;
1341
1342     /* Reference to global (EP engine wide) stats */
1343     EPStats& stats;
1344
1345     /* last seqno that is persisted on the disk */
1346     std::atomic<uint64_t> persistenceSeqno;
1347
1348     /* holds all high priority async requests to the vbucket */
1349     std::list<HighPriorityVBEntry> hpVBReqs;
1350
1351     /* synchronizes access to hpVBReqs */
1352     std::mutex hpVBReqsMutex;
1353
1354     /* size of list hpVBReqs (to avoid MB-9434) */
1355     Couchbase::RelaxedAtomic<size_t> numHpVBReqs;
1356
1357 private:
1358     void fireAllOps(EventuallyPersistentEngine& engine, ENGINE_ERROR_CODE code);
1359
1360     void decrDirtyQueueMem(size_t decrementBy);
1361
1362     void decrDirtyQueueAge(uint32_t decrementBy);
1363
1364     void decrDirtyQueuePendingWrites(size_t decrementBy);
1365
1366     /**
1367      * Updates an existing StoredValue in in-memory data structures like HT.
1368      * Assumes that HT bucket lock is grabbed.
1369      *
1370      * @param htLock Hash table lock that must be held
1371      * @param v Reference to the StoredValue to be updated.
1372      * @param itm Item to be updated.
1373      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1374      *                    backfill queue; NULL if item need not be queued
1375      *
1376      * @return pointer to the updated StoredValue. It can be same as that of
1377      *         v or different value if a new StoredValue is created for the
1378      *         update.
1379      *         status of the operation.
1380      *         notification info.
1381      */
1382     virtual std::tuple<StoredValue*, MutationStatus, VBNotifyCtx>
1383     updateStoredValue(const HashTable::HashBucketLock& hbl,
1384                       StoredValue& v,
1385                       const Item& itm,
1386                       const VBQueueItemCtx* queueItmCtx) = 0;
1387
1388     /**
1389      * Adds a new StoredValue in in-memory data structures like HT.
1390      * Assumes that HT bucket lock is grabbed.
1391      *
1392      * @param hbl Hash table bucket lock that must be held
1393      * @param itm Item to be added.
1394      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1395      *                    backfill queue; NULL if item need not be queued
1396      *
1397      * @return Ptr of the StoredValue added and notification info
1398      */
1399     virtual std::pair<StoredValue*, VBNotifyCtx> addNewStoredValue(
1400             const HashTable::HashBucketLock& hbl,
1401             const Item& itm,
1402             const VBQueueItemCtx* queueItmCtx) = 0;
1403
1404     /**
1405      * Logically (soft) delete item in all in-memory data structures. Also
1406      * updates revSeqno. Depending on the in-memory data structure the item may
1407      * be marked delete and/or reset and/or a new value (marked as deleted)
1408      * added.
1409      * Assumes that HT bucket lock is grabbed.
1410      * Also assumes that v is in the hash table.
1411      *
1412      * @param hbl Hash table bucket lock that must be held
1413      * @param v Reference to the StoredValue to be soft deleted
1414      * @param onlyMarkDeleted indicates if we must reset the StoredValue or
1415      *                        just mark deleted
1416      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1417      *                    backfill queue
1418      * @param bySeqno seqno of the key being deleted
1419      *
1420      * @return pointer to the updated StoredValue. It can be same as that of
1421      *         v or different value if a new StoredValue is created for the
1422      *         update.
1423      *         notification info.
1424      */
1425     virtual std::tuple<StoredValue*, VBNotifyCtx> softDeleteStoredValue(
1426             const HashTable::HashBucketLock& hbl,
1427             StoredValue& v,
1428             bool onlyMarkDeleted,
1429             const VBQueueItemCtx& queueItmCtx,
1430             uint64_t bySeqno) = 0;
1431
1432     /**
1433      * This function handles expiry relatead stuff before logically (soft)
1434      * deleting an item in in-memory structures like HT, and checkpoint mgr.
1435      * Assumes that HT bucket lock is grabbed.
1436      *
1437      * @param hbl Hash table bucket lock that must be held
1438      * @param v Reference to the StoredValue to be soft deleted
1439      *
1440      * @return status of the operation.
1441      *         pointer to the updated StoredValue. It can be same as that of
1442      *         v or different value if a new StoredValue is created for the
1443      *         update.
1444      *         notification info.
1445      */
1446     std::tuple<MutationStatus, StoredValue*, VBNotifyCtx> processExpiredItem(
1447             const HashTable::HashBucketLock& hbl, StoredValue& v);
1448
1449     /**
1450      * Add a temporary item in hash table and enqueue a background fetch for a
1451      * key.
1452      *
1453      * @param hbl Reference to the hash table bucket lock
1454      * @param key the key to be bg fetched
1455      * @param cookie the cookie of the requestor
1456      * @param engine Reference to ep engine
1457      * @param bgFetchDelay Delay in secs before we run the bgFetch task
1458      * @param metadataOnly whether the fetch is for a non-resident value or
1459      *                     metadata of a (possibly) deleted item
1460      * @param isReplication indicates if the call is for a replica vbucket
1461      *
1462      * @return ENGINE_ERROR_CODE status notified to be to the front end
1463      */
1464     virtual ENGINE_ERROR_CODE addTempItemAndBGFetch(
1465             HashTable::HashBucketLock& hbl,
1466             const DocKey& key,
1467             const void* cookie,
1468             EventuallyPersistentEngine& engine,
1469             int bgFetchDelay,
1470             bool metadataOnly,
1471             bool isReplication = false) = 0;
1472
1473     /**
1474      * Enqueue a background fetch for a key.
1475      *
1476      * @param key the key to be bg fetched
1477      * @param cookie the cookie of the requestor
1478      * @param engine Reference to ep engine
1479      * @param bgFetchDelay Delay in secs before we run the bgFetch task
1480      * @param isMeta whether the fetch is for a non-resident value or metadata
1481      *               of a (possibly) deleted item
1482      */
1483     virtual void bgFetch(const DocKey& key,
1484                          const void* cookie,
1485                          EventuallyPersistentEngine& engine,
1486                          int bgFetchDelay,
1487                          bool isMeta = false) = 0;
1488
1489     /**
1490      * Get metadata and value for a non-resident key
1491      *
1492      * @param key key for which metadata and value should be retrieved
1493      * @param cookie the cookie representing the client
1494      * @param engine Reference to ep engine
1495      * @param bgFetchDelay Delay in secs before we run the bgFetch task
1496      * @param options flags indicating some retrieval related info
1497      * @param v reference to the stored value of the non-resident key
1498      *
1499      * @return the result of the operation
1500      */
1501     virtual GetValue getInternalNonResident(const DocKey& key,
1502                                             const void* cookie,
1503                                             EventuallyPersistentEngine& engine,
1504                                             int bgFetchDelay,
1505                                             get_options_t options,
1506                                             const StoredValue& v) = 0;
1507
1508     /**
1509      * Update the revision seqno of a newly StoredValue item.
1510      * We must ensure that it is greater the maxDeletedRevSeqno
1511      *
1512      * @param v StoredValue added newly. Its revSeqno is updated
1513      */
1514     void updateRevSeqNoOfNewStoredValue(StoredValue& v);
1515
1516     /**
1517      * Increase the expiration count global stats and in the vbucket stats
1518      */
1519     void incExpirationStat(ExpireBy source);
1520
1521     void adjustCheckpointFlushTimeout(size_t wall_time);
1522
1523     id_type                         id;
1524     std::atomic<vbucket_state_t>    state;
1525     cb::RWLock                      stateLock;
1526     vbucket_state_t                 initialState;
1527     std::mutex                           pendingOpLock;
1528     std::vector<const void*>        pendingOps;
1529     hrtime_t                        pendingOpsStart;
1530     uint64_t                        purge_seqno;
1531     std::atomic<bool>               takeover_backed_up;
1532
1533     /* snapshotMutex is used to update/read the pair {start, end} atomically,
1534        but not if reading a single field. */
1535     mutable std::mutex snapshotMutex;
1536     uint64_t persisted_snapshot_start;
1537     uint64_t persisted_snapshot_end;
1538
1539     std::mutex bfMutex;
1540     std::unique_ptr<BloomFilter> bFilter;
1541     std::unique_ptr<BloomFilter> tempFilter;    // Used during compaction.
1542
1543     std::atomic<uint64_t> rollbackItemCount;
1544
1545     HLC hlc;
1546     std::string statPrefix;
1547     // The persistence checkpoint ID for this vbucket.
1548     std::atomic<uint64_t> persistenceCheckpointId;
1549     // Flag to indicate the bucket is being created
1550     std::atomic<bool> bucketCreation;
1551     // Flag to indicate the bucket is being deleted
1552     std::atomic<bool> bucketDeletion;
1553
1554     // Ptr to the item conflict resolution module
1555     std::unique_ptr<ConflictResolution> conflictResolver;
1556
1557     // A callback to be called when a new seqno is generated in the vbucket as
1558     // a result of a front end call
1559     NewSeqnoCallback newSeqnoCb;
1560
1561     /// The VBucket collection state
1562     Collections::VB::Manifest manifest;
1563
1564     static std::atomic<size_t> chkFlushTimeout;
1565
1566     friend class VBucketTest;
1567
1568     DISALLOW_COPY_AND_ASSIGN(VBucket);
1569 };
1570
1571 using VBucketPtr = RCPtr<VBucket>;