84aa25a883120c2d08534e41a3fd0b1612574919
[ep-engine.git] / src / vbucket.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #pragma once
19
20 #include "config.h"
21
22 #include "bloomfilter.h"
23 #include "checkpoint.h"
24 #include "collections/vbucket_manifest.h"
25 #include "dcp/dcp-types.h"
26 #include "hash_table.h"
27 #include "hlc.h"
28 #include "item_pager.h"
29 #include "kvstore.h"
30 #include "monotonic.h"
31
32 #include <platform/non_negative_counter.h>
33 #include <relaxed_atomic.h>
34 #include <atomic>
35 #include <queue>
36
37 class EPStats;
38 class ConflictResolution;
39 class Configuration;
40 class PreLinkDocumentContext;
41 class EventuallyPersistentEngine;
42 class DCPBackfill;
43 class RollbackResult;
44
45 /**
46  * The following will be used to identify
47  * the source of an item's expiration.
48  */
49 enum class ExpireBy { Pager, Compactor, Access };
50
51 /* Structure that holds info needed for notification for an item being updated
52    in the vbucket */
53 struct VBNotifyCtx {
54     VBNotifyCtx() : bySeqno(0), notifyReplication(false), notifyFlusher(false) {
55     }
56     Monotonic<int64_t> bySeqno;
57     bool notifyReplication;
58     bool notifyFlusher;
59 };
60
61 /**
62  * Structure that holds info needed to queue an item in chkpt or vb backfill
63  * queue
64  */
65 struct VBQueueItemCtx {
66     VBQueueItemCtx(GenerateBySeqno genBySeqno,
67                    GenerateCas genCas,
68                    TrackCasDrift trackCasDrift,
69                    bool isBackfillItem,
70                    PreLinkDocumentContext* preLinkDocumentContext_)
71         : genBySeqno(genBySeqno),
72           genCas(genCas),
73           trackCasDrift(trackCasDrift),
74           isBackfillItem(isBackfillItem),
75           preLinkDocumentContext(preLinkDocumentContext_) {
76     }
77     /* Indicates if we should queue an item or not. If this is false other
78        members should not be used */
79     GenerateBySeqno genBySeqno;
80     GenerateCas genCas;
81     TrackCasDrift trackCasDrift;
82     bool isBackfillItem;
83     PreLinkDocumentContext* preLinkDocumentContext;
84 };
85
86 /**
87  * Structure that holds seqno based or checkpoint persistence based high
88  * priority requests to a vbucket
89  */
90 struct HighPriorityVBEntry {
91     HighPriorityVBEntry(const void* c,
92                         uint64_t idNum,
93                         HighPriorityVBNotify reqType)
94         : cookie(c), id(idNum), reqType(reqType), start(gethrtime()) {
95     }
96
97     const void* cookie;
98     uint64_t id;
99     HighPriorityVBNotify reqType;
100
101     /* for stats (histogram) */
102     hrtime_t start;
103 };
104
105 typedef std::unique_ptr<Callback<const uint16_t, const VBNotifyCtx&>>
106         NewSeqnoCallback;
107
108 /**
109  * Function object that returns true if the given vbucket is acceptable.
110  */
111 class VBucketFilter {
112 public:
113
114     /**
115      * Instiatiate a VBucketFilter that always returns true.
116      */
117     explicit VBucketFilter() : acceptable() {}
118
119     /**
120      * Instantiate a VBucketFilter that returns true for any of the
121      * given vbucket IDs.
122      */
123     explicit VBucketFilter(const std::vector<uint16_t> &a) :
124         acceptable(a.begin(), a.end()) {}
125
126     explicit VBucketFilter(const std::set<uint16_t> &s) : acceptable(s) {}
127
128     void assign(const std::set<uint16_t> &a) {
129         acceptable = a;
130     }
131
132     bool operator ()(uint16_t v) const {
133         return acceptable.empty() || acceptable.find(v) != acceptable.end();
134     }
135
136     size_t size() const { return acceptable.size(); }
137
138     bool empty() const { return acceptable.empty(); }
139
140     void reset() {
141         acceptable.clear();
142     }
143
144     /**
145      * Calculate the difference between this and another filter.
146      * If "this" contains elements, [1,2,3,4] and other contains [3,4,5,6]
147      * the returned filter contains: [1,2,5,6]
148      * @param other the other filter to compare with
149      * @return a new filter with the elements present in only one of the two
150      *         filters.
151      */
152     VBucketFilter filter_diff(const VBucketFilter &other) const;
153
154     /**
155      * Calculate the intersection between this and another filter.
156      * If "this" contains elements, [1,2,3,4] and other contains [3,4,5,6]
157      * the returned filter contains: [3,4]
158      * @param other the other filter to compare with
159      * @return a new filter with the elements present in both of the two
160      *         filters.
161      */
162     VBucketFilter filter_intersection(const VBucketFilter &other) const;
163
164     const std::set<uint16_t> &getVBSet() const { return acceptable; }
165
166     bool addVBucket(uint16_t vbucket) {
167         std::pair<std::set<uint16_t>::iterator, bool> rv = acceptable.insert(vbucket);
168         return rv.second;
169     }
170
171     void removeVBucket(uint16_t vbucket) {
172         acceptable.erase(vbucket);
173     }
174
175     /**
176      * Dump the filter in a human readable form ( "{ bucket, bucket, bucket }"
177      * to the specified output stream.
178      */
179     friend std::ostream& operator<< (std::ostream& out,
180                                      const VBucketFilter &filter);
181
182 private:
183
184     std::set<uint16_t> acceptable;
185 };
186
187 class EventuallyPersistentEngine;
188 class FailoverTable;
189 class KVShard;
190 class VBucketMemoryDeletionTask;
191
192 /**
193  * An individual vbucket.
194  */
195 class VBucket : public std::enable_shared_from_this<VBucket> {
196 public:
197
198     // Identifier for a vBucket
199     typedef uint16_t id_type;
200
201     VBucket(id_type i,
202             vbucket_state_t newState,
203             EPStats& st,
204             CheckpointConfig& chkConfig,
205             int64_t lastSeqno,
206             uint64_t lastSnapStart,
207             uint64_t lastSnapEnd,
208             std::unique_ptr<FailoverTable> table,
209             std::shared_ptr<Callback<id_type>> flusherCb,
210             std::unique_ptr<AbstractStoredValueFactory> valFact,
211             NewSeqnoCallback newSeqnoCb,
212             Configuration& config,
213             item_eviction_policy_t evictionPolicy,
214             vbucket_state_t initState = vbucket_state_dead,
215             uint64_t purgeSeqno = 0,
216             uint64_t maxCas = 0,
217             const std::string& collectionsManifest = "");
218
219     virtual ~VBucket();
220
221     int64_t getHighSeqno() const {
222         return checkpointManager.getHighSeqno();
223     }
224
225     size_t getChkMgrMemUsage() {
226         return checkpointManager.getMemoryUsage();
227     }
228
229     size_t getChkMgrMemUsageOfUnrefCheckpoints() {
230         return checkpointManager.getMemoryUsageOfUnrefCheckpoints();
231     }
232
233     uint64_t getPurgeSeqno() const {
234         return purge_seqno;
235     }
236
237     void setPurgeSeqno(uint64_t to) {
238         purge_seqno = to;
239     }
240
241     void setPersistedSnapshot(uint64_t start, uint64_t end) {
242         LockHolder lh(snapshotMutex);
243         persisted_snapshot_start = start;
244         persisted_snapshot_end = end;
245     }
246
247     snapshot_range_t getPersistedSnapshot() const {
248         LockHolder lh(snapshotMutex);
249         return {persisted_snapshot_start, persisted_snapshot_end};
250     }
251
252     uint64_t getMaxCas() const {
253         return hlc.getMaxHLC();
254     }
255
256     void setMaxCas(uint64_t cas) {
257         hlc.setMaxHLC(cas);
258     }
259
260     void setMaxCasAndTrackDrift(uint64_t cas) {
261         hlc.setMaxHLCAndTrackDrift(cas);
262     }
263
264     void forceMaxCas(uint64_t cas) {
265         hlc.forceMaxHLC(cas);
266     }
267
268     HLC::DriftStats getHLCDriftStats() const {
269         return hlc.getDriftStats();
270     }
271
272     HLC::DriftExceptions getHLCDriftExceptionCounters() const {
273         return hlc.getDriftExceptionCounters();
274     }
275
276     void setHLCDriftAheadThreshold(std::chrono::microseconds threshold) {
277         hlc.setDriftAheadThreshold(threshold);
278     }
279
280     void setHLCDriftBehindThreshold(std::chrono::microseconds threshold) {
281         hlc.setDriftBehindThreshold(threshold);
282     }
283
284     bool isTakeoverBackedUp() {
285         return takeover_backed_up.load();
286     }
287
288     void setTakeoverBackedUpState(bool to) {
289         bool inverse = !to;
290         takeover_backed_up.compare_exchange_strong(inverse, to);
291     }
292
293     // States whether the VBucket is in the process of being created
294     bool isBucketCreation() const {
295         return bucketCreation.load();
296     }
297
298     bool setBucketCreation(bool rv) {
299         bool inverse = !rv;
300         return bucketCreation.compare_exchange_strong(inverse, rv);
301     }
302
303     /**
304      * @return true if the vbucket deletion is to be deferred to a background
305      *         task.
306      */
307     bool isDeletionDeferred() const {
308         return deferredDeletion.load();
309     }
310
311     /**
312      * @param value true if the vbucket's deletion should be deferred to a
313      *        background task. This is for VBucket objects created by
314      *        makeVBucket and owned by a VBucketPtr. If the VBucket was manually
315      *        created this will have no effect on deletion.
316      */
317     void setDeferredDeletion(bool value) {
318         deferredDeletion.store(true);
319     }
320
321     /**
322      * @param A cookie to notify when the deferred deletion completes.
323      */
324     void setDeferredDeletionCookie(const void* cookie) {
325         deferredDeletionCookie = cookie;
326     }
327
328     /**
329      * @return the cookie which could of been set when setupDeferredDeletion was
330      *         called.
331      */
332     const void* getDeferredDeletionCookie() const {
333         return deferredDeletionCookie;
334     }
335
336     /**
337      * Setup deferred deletion, this is where deletion of the vbucket is
338      * deferred and completed by an AUXIO/NONIO task. AUXIO for EPVBucket
339      * as it will hit disk for the data file unlink, NONIO is used for
340      * EphemeralVBucket as only memory resources need freeing.
341      * @param cookie A cookie to notify when the deletion task completes.
342      */
343     virtual void setupDeferredDeletion(const void* cookie) = 0;
344
345     // Returns the last persisted sequence number for the VBucket
346     virtual uint64_t getPersistenceSeqno() const = 0;
347
348     /**
349      * Returns the sequence number to expose publically as the highest
350      * persisted seqno. Note this is may differ from getPersistenceSeqno,
351      * depending on the Bucket type.
352      *
353      * Historical note: This is the same as PersistenceSeqno for EP buckets,
354      * and hence before Spock wasn't a separate function; however for Ephemeral
355      * buckets we need to distinguish between what sequence number we report
356      * to external clients for Observe/persistTo, and what sequence number we
357      * report to internal DCP / ns_server for takeover:
358      *  a) Clients need 0 for the Ephemeral "persisted to" seqno (as
359      *     there isn't any Persistence and we can't claim something is on-disk
360      *     when it is not).
361      *  b) ns_server / replication needs a non-zero, "logically-persisted" seqno
362      *     from the replica to know that a vBucket is ready for takeover.
363      * As such, getPublicPersistenceSeqno() is used for (a), and
364      * getPersistenceSeqno() is used for (b).
365      */
366     virtual uint64_t getPublicPersistenceSeqno() const = 0;
367
368     void setPersistenceSeqno(uint64_t seqno) {
369         persistenceSeqno.store(seqno);
370     }
371
372     id_type getId() const { return id; }
373     vbucket_state_t getState(void) const { return state.load(); }
374     void setState(vbucket_state_t to);
375     cb::RWLock& getStateLock() {return stateLock;}
376
377     vbucket_state_t getInitialState(void) { return initialState; }
378     void setInitialState(vbucket_state_t initState) {
379         initialState = initState;
380     }
381
382     vbucket_state getVBucketState() const;
383
384     /**
385      * This method performs operations on the stored value prior
386      * to expiring the item.
387      *
388      * @param v the stored value
389      */
390     void handlePreExpiry(StoredValue& v);
391
392     bool addPendingOp(const void *cookie) {
393         LockHolder lh(pendingOpLock);
394         if (state != vbucket_state_pending) {
395             // State transitioned while we were waiting.
396             return false;
397         }
398         // Start a timer when enqueuing the first client.
399         if (pendingOps.empty()) {
400             pendingOpsStart = gethrtime();
401         }
402         pendingOps.push_back(cookie);
403         ++stats.pendingOps;
404         ++stats.pendingOpsTotal;
405         return true;
406     }
407
408     void doStatsForQueueing(const Item& item, size_t itemBytes);
409     void doStatsForFlushing(const Item& item, size_t itemBytes);
410     void incrMetaDataDisk(const Item& qi);
411     void decrMetaDataDisk(const Item& qi);
412
413     /// Reset all statistics assocated with this vBucket.
414     virtual void resetStats();
415
416     // Get age sum in millisecond
417     uint64_t getQueueAge() {
418         uint64_t currDirtyQueueAge = dirtyQueueAge.load(
419                                                     std::memory_order_relaxed);
420         rel_time_t currentAge = ep_current_time() * dirtyQueueSize;
421         if (currentAge < currDirtyQueueAge) {
422             return 0;
423         }
424         return (currentAge - currDirtyQueueAge) * 1000;
425     }
426
427     void fireAllOps(EventuallyPersistentEngine &engine);
428
429     size_t size(void) {
430         HashTableDepthStatVisitor v;
431         ht.visitDepth(v);
432         return v.size;
433     }
434
435     size_t getBackfillSize() {
436         LockHolder lh(backfill.mutex);
437         return backfill.items.size();
438     }
439
440     /**
441      * Process an item that is got from a backfill (TAP or DCP).
442      * It puts it onto a queue for persistence and/or generates a seqno and
443      * updates stats
444      *
445      * @param qi item to be processed
446      * @param generateBySeqno indicates if a new seqno must generated or the
447      *                        seqno in the item must be used
448      *
449      *
450      */
451     virtual void queueBackfillItem(queued_item& qi,
452                                    const GenerateBySeqno generateBySeqno) = 0;
453
454     void getBackfillItems(std::vector<queued_item> &items) {
455         LockHolder lh(backfill.mutex);
456         size_t num_items = backfill.items.size();
457         while (!backfill.items.empty()) {
458             items.push_back(backfill.items.front());
459             backfill.items.pop();
460         }
461         stats.vbBackfillQueueSize.fetch_sub(num_items);
462         stats.memOverhead->fetch_sub(num_items * sizeof(queued_item));
463     }
464
465     bool isBackfillPhase() {
466         return backfill.isBackfillPhase.load();
467     }
468
469     void setBackfillPhase(bool backfillPhase) {
470         backfill.isBackfillPhase.store(backfillPhase);
471     }
472
473     /**
474      * Returns the map of bgfetch items for this vbucket, clearing the
475      * pendingBGFetches.
476      */
477     virtual vb_bgfetch_queue_t getBGFetchItems() = 0;
478
479     virtual bool hasPendingBGFetchItems() = 0;
480
481     static const char* toString(vbucket_state_t s) {
482         switch(s) {
483         case vbucket_state_active: return "active"; break;
484         case vbucket_state_replica: return "replica"; break;
485         case vbucket_state_pending: return "pending"; break;
486         case vbucket_state_dead: return "dead"; break;
487         }
488         return "unknown";
489     }
490
491     static vbucket_state_t fromString(const char* state) {
492         if (strcmp(state, "active") == 0) {
493             return vbucket_state_active;
494         } else if (strcmp(state, "replica") == 0) {
495             return vbucket_state_replica;
496         } else if (strcmp(state, "pending") == 0) {
497             return vbucket_state_pending;
498         } else {
499             return vbucket_state_dead;
500         }
501     }
502
503     /**
504      * Checks and decides whether to add high priority request on the vbucket.
505      * This is an async request made by modules like ns-server during
506      * rebalance. The request is for a response from the vbucket when it
507      * 'sees' beyond a certain sequence number or when a certain checkpoint
508      * is persisted.
509      * Depending on the vbucket type, the meaning 'seeing' a sequence number
510      * changes. That is, it could mean persisted in case of EPVBucket and
511      * added to the sequenced data structure in case of EphemeralVBucket.
512      *
513      * @param seqnoOrChkId seqno to be seen or checkpoint id to be persisted
514      * @param cookie cookie of conn to be notified
515      * @param reqType indicating request for seqno or chk persistence
516      *
517      * @return RequestScheduled if a high priority request is added and
518      *                          notification will be done asynchronously
519      *         NotSupported if the request is not supported for the reqType
520      *         RequestNotScheduled if a high priority request is NOT added (as
521      *                             it is not required). This implies there won't
522      *                             be a subsequent notification
523      */
524     virtual HighPriorityVBReqStatus checkAddHighPriorityVBEntry(
525             uint64_t seqnoOrChkId,
526             const void* cookie,
527             HighPriorityVBNotify reqType) = 0;
528
529     /**
530      * Notify the high priority requests on the vbucket.
531      * This is the response to async requests made by modules like ns-server
532      * during rebalance.
533      *
534      * @param engine Ref to ep-engine
535      * @param id seqno or checkpoint id causing the notification(s).
536      * @param notifyType indicating notify for seqno or chk persistence
537      */
538     virtual void notifyHighPriorityRequests(
539             EventuallyPersistentEngine& engine,
540             uint64_t id,
541             HighPriorityVBNotify notifyType) = 0;
542
543     virtual void notifyAllPendingConnsFailed(EventuallyPersistentEngine& e) = 0;
544
545     /**
546      * Get high priority notifications for a seqno or checkpoint persisted
547      *
548      * @param engine Ref to ep-engine
549      * @param id seqno or checkpoint id for which notifies are to be found
550      * @param notifyType indicating notify for seqno or chk persistence
551      *
552      * @return map of notifications with conn cookie as the key and notify
553      *         status as the value
554      */
555     std::map<const void*, ENGINE_ERROR_CODE> getHighPriorityNotifications(
556             EventuallyPersistentEngine& engine,
557             uint64_t idNum,
558             HighPriorityVBNotify notifyType);
559
560     size_t getHighPriorityChkSize() {
561         return numHpVBReqs.load();
562     }
563
564     /**
565      * BloomFilter operations for vbucket
566      */
567     void createFilter(size_t key_count, double probability);
568     void initTempFilter(size_t key_count, double probability);
569     void addToFilter(const DocKey& key);
570     virtual bool maybeKeyExistsInFilter(const DocKey& key);
571     bool isTempFilterAvailable();
572     void addToTempFilter(const DocKey& key);
573     void swapFilter();
574     void clearFilter();
575     void setFilterStatus(bfilter_status_t to);
576     std::string getFilterStatusString();
577     size_t getFilterSize();
578     size_t getNumOfKeysInFilter();
579
580     uint64_t nextHLCCas() {
581         return hlc.nextHLC();
582     }
583
584     // Applicable only for FULL EVICTION POLICY
585     bool isResidentRatioUnderThreshold(float threshold);
586
587     virtual void addStats(bool details, ADD_STAT add_stat, const void* c) = 0;
588
589     virtual KVShard* getShard() = 0;
590
591     /**
592      * Returns the number of alive (non-deleted) Items the VBucket.
593      *
594      * Includes items which are not currently resident in memory (i.e. under
595      * Full eviction and have been fully evicted from memory).
596      * Does *not* include deleted items.
597      */
598     virtual size_t getNumItems() const = 0;
599
600     size_t getNumNonResidentItems() const;
601
602     size_t getNumTempItems(void) {
603         return ht.getNumTempItems();
604     }
605
606     void incrRollbackItemCount(uint64_t val) {
607         rollbackItemCount.fetch_add(val, std::memory_order_relaxed);
608     }
609
610     uint64_t getRollbackItemCount(void) {
611         return rollbackItemCount.load(std::memory_order_relaxed);
612     }
613
614     // Return the persistence checkpoint ID
615     uint64_t getPersistenceCheckpointId() const;
616
617     // Set the persistence checkpoint ID to the given value.
618     void setPersistenceCheckpointId(uint64_t checkpointId);
619
620     // Mark the value associated with the given key as dirty
621     void markDirty(const DocKey& key);
622
623     /**
624      * Obtain the read handle for the collections manifest.
625      * The caller will have read-only access to manifest using the methods
626      * exposed by the ReadHandle
627      */
628     Collections::VB::Manifest::ReadHandle lockCollections() const {
629         return manifest.lock();
630     }
631
632     /**
633      * Update the Collections::VB::Manifest and the VBucket.
634      * Adds SystemEvents for the create and delete of collections into the
635      * checkpoint.
636      *
637      * @param m A Collections::Manifest to apply to the VB::Manifest
638      */
639     void updateFromManifest(const Collections::Manifest& m) {
640         manifest.wlock().update(*this, m);
641     }
642
643     /**
644      * Finalise the deletion of a collection (no items remain in the collection)
645      *
646      * @param collection "string-view" name of the collection
647      * @param revision The Manifest revision which initiated the delete.
648      */
649     void completeDeletion(cb::const_char_buffer collection, uint32_t revision) {
650         manifest.wlock().completeDeletion(*this, collection, revision);
651     }
652
653     /**
654      * Add a collection to this vbucket with a pre-assigned seqno. I.e.
655      * this VB is a replica.
656      *
657      * @param collection collection name to add.
658      * @param revision revision of the collection to add.
659      * @param bySeqno The seqno assigned to the collection create event.
660      */
661     void replicaAddCollection(cb::const_char_buffer collection,
662                               uint32_t revision,
663                               int64_t bySeqno) {
664         manifest.wlock().replicaAdd(*this, collection, revision, bySeqno);
665     }
666
667     /**
668      * Delete a collection from this vbucket with a pre-assigned seqno. I.e.
669      * this VB is a replica.
670      *
671      * @param collection collection name to delete.
672      * @param revision revision of the manifest starting the delete.
673      * @param bySeqno The seqno assigned to the collection delete event.
674      */
675     void replicaBeginDeleteCollection(cb::const_char_buffer collection,
676                                       uint32_t revision,
677                                       int64_t bySeqno) {
678         manifest.wlock().replicaBeginDelete(
679                 *this, collection, revision, bySeqno);
680     }
681
682     /**
683      * Delete a collection from this vbucket with a pre-assigned seqno. I.e.
684      * this VB is a replica.
685      *
686      * @param separator The new separator.
687      * @param revision The revision which changed the separator.
688      * @param bySeqno The seqno assigned to the change separator event.
689      */
690     void replicaChangeCollectionSeparator(cb::const_char_buffer separator,
691                                           uint32_t revision,
692                                           int64_t bySeqno) {
693         manifest.wlock().replicaChangeSeparator(
694                 *this, separator, revision, bySeqno);
695     }
696
697     /**
698      * Get the collection manifest
699      *
700      * @return const reference to the manifest
701      */
702     const Collections::VB::Manifest& getManifest() const {
703         return manifest;
704     }
705
706     static const vbucket_state_t ACTIVE;
707     static const vbucket_state_t REPLICA;
708     static const vbucket_state_t PENDING;
709     static const vbucket_state_t DEAD;
710
711     HashTable         ht;
712     CheckpointManager checkpointManager;
713
714     // Struct for managing 'backfill' items - Items which have been added by
715     // an incoming TAP stream and need to be persisted to disk.
716     struct {
717         std::mutex mutex;
718         std::queue<queued_item> items;
719         std::atomic<bool> isBackfillPhase;
720     } backfill;
721
722     /**
723      * Gets the valid StoredValue for the key and deletes an expired item if
724      * desired by the caller. Requires the hash bucket to be locked
725      *
726      * @param hbl Reference to the hash bucket lock
727      * @param key
728      * @param wantsDeleted
729      * @param trackReference
730      * @param queueExpired Delete an expired item
731      */
732     StoredValue* fetchValidValue(HashTable::HashBucketLock& hbl,
733                                  const DocKey& key,
734                                  WantsDeleted wantsDeleted,
735                                  TrackReference trackReference,
736                                  QueueExpired queueExpired);
737
738     /**
739      * Complete the background fetch for the specified item. Depending on the
740      * state of the item, restore it to the hashtable as appropriate,
741      * potentially queuing it as dirty.
742      *
743      * @param key The key of the item
744      * @param fetched_item The item which has been fetched.
745      * @param startTime The time processing of the batch of items started.
746      *
747      * @return ENGINE_ERROR_CODE status notified to be to the front end
748      */
749     virtual ENGINE_ERROR_CODE completeBGFetchForSingleItem(
750             const DocKey& key,
751             const VBucketBGFetchItem& fetched_item,
752             const ProcessClock::time_point startTime) = 0;
753
754     /**
755      * Retrieve an item from the disk for vkey stats
756      *
757      * @param key the key to fetch
758      * @param cookie the connection cookie
759      * @param eviction_policy The eviction policy
760      * @param engine Reference to ep engine
761      * @param bgFetchDelay
762      *
763      * @return VBReturnCtx indicates notifyCtx and operation result
764      */
765     virtual ENGINE_ERROR_CODE statsVKey(const DocKey& key,
766                                         const void* cookie,
767                                         EventuallyPersistentEngine& engine,
768                                         int bgFetchDelay) = 0;
769
770     /**
771      * Complete the vkey stats for an item background fetched from disk.
772      *
773      * @param key The key of the item
774      * @param gcb Bgfetch cbk obj containing the item from disk
775      *
776      */
777     virtual void completeStatsVKey(
778             const DocKey& key, const RememberingCallback<GetValue>& gcb) = 0;
779
780     /**
781      * Set (add new or update) an item into in-memory structure like
782      * hash table and do not generate a seqno. This is called internally from
783      * ep-engine when we want to update our in-memory data (like in HT) with
784      * another source of truth like disk.
785      * Currently called during rollback.
786      *
787      * @param itm Item to be added or updated. Upon success, the itm
788      *            revSeqno are updated
789      *
790      * @return Result indicating the status of the operation
791      */
792     MutationStatus setFromInternal(Item& itm);
793
794     /**
795      * Set (add new or update) an item in the vbucket.
796      *
797      * @param itm Item to be added or updated. Upon success, the itm
798      *            bySeqno, cas and revSeqno are updated
799      * @param cookie the connection cookie
800      * @param engine Reference to ep engine
801      * @param bgFetchDelay
802      *
803      * @return ENGINE_ERROR_CODE status notified to be to the front end
804      */
805     ENGINE_ERROR_CODE set(Item& itm,
806                           const void* cookie,
807                           EventuallyPersistentEngine& engine,
808                           int bgFetchDelay);
809
810     /**
811      * Replace (overwrite existing) an item in the vbucket.
812      *
813      * @param itm Item to be added or updated. Upon success, the itm
814      *            bySeqno, cas and revSeqno are updated
815      * @param cookie the connection cookie
816      * @param engine Reference to ep engine
817      * @param bgFetchDelay
818      *
819      * @return ENGINE_ERROR_CODE status notified to be to the front end
820      */
821     ENGINE_ERROR_CODE replace(Item& itm,
822                               const void* cookie,
823                               EventuallyPersistentEngine& engine,
824                               int bgFetchDelay);
825
826     /**
827      * Add an item directly into its vbucket rather than putting it on a
828      * checkpoint (backfill the item). The can happen during TAP or when a
829      * replica vbucket is receiving backfill items from active vbucket.
830      *
831      * @param itm Item to be added/updated from TAP or DCP backfill. Upon
832      *            success, the itm revSeqno is updated
833      * @param genBySeqno whether or not to generate sequence number
834      *
835      * @return the result of the operation
836      */
837     ENGINE_ERROR_CODE addBackfillItem(Item& itm, GenerateBySeqno genBySeqno);
838
839     /**
840      * Set an item in the store from a non-front end operation (DCP, XDCR)
841      *
842      * @param item the item to set. Upon success, the itm revSeqno is updated
843      * @param cas value to match
844      * @param seqno sequence number of mutation
845      * @param cookie the cookie representing the client to store the item
846      * @param engine Reference to ep engine
847      * @param bgFetchDelay
848      * @param force override vbucket states
849      * @param allowExisting set to false if you want set to fail if the
850      *                      item exists already
851      * @param genBySeqno whether or not to generate sequence number
852      * @param genCas
853      * @param isReplication set to true if we are to use replication
854      *                      throttle threshold
855      *
856      * @return the result of the store operation
857      */
858     ENGINE_ERROR_CODE setWithMeta(Item& itm,
859                                   uint64_t cas,
860                                   uint64_t* seqno,
861                                   const void* cookie,
862                                   EventuallyPersistentEngine& engine,
863                                   int bgFetchDelay,
864                                   bool force,
865                                   bool allowExisting,
866                                   GenerateBySeqno genBySeqno,
867                                   GenerateCas genCas,
868                                   bool isReplication);
869
870     /**
871      * Delete an item in the vbucket
872      *
873      * @param key key to be deleted
874      * @param[in,out] cas value to match; new cas after logical delete
875      * @param cookie the cookie representing the client to store the item
876      * @param engine Reference to ep engine
877      * @param bgFetchDelay
878      * @param[out] itemMeta pointer to item meta data that needs to be returned
879      *                      as a result the delete. A NULL pointer indicates
880      *                      that no meta data needs to be returned.
881      * @param[out] mutInfo Info to uniquely identify (and order) the delete
882      *                     seq. A NULL pointer indicates no info needs to be
883      *                     returned.
884      *
885      * @return the result of the operation
886      */
887     ENGINE_ERROR_CODE deleteItem(const DocKey& key,
888                                  uint64_t& cas,
889                                  const void* cookie,
890                                  EventuallyPersistentEngine& engine,
891                                  int bgFetchDelay,
892                                  ItemMetaData* itemMeta,
893                                  mutation_descr_t* mutInfo);
894
895     /**
896      * Delete an item in the vbucket from a non-front end operation (DCP, XDCR)
897      *
898      * @param key key to be deleted
899      * @param[in, out] cas value to match; new cas after logical delete
900      * @param[out] seqno Pointer to get the seqno generated for the item. A
901      *                   NULL value is passed if not needed
902      * @param cookie the cookie representing the client to store the item
903      * @param engine Reference to ep engine
904      * @param bgFetchDelay
905      * @param force force a delete in full eviction mode without doing a
906      *              bg fetch
907      * @param itemMeta ref to item meta data
908      * @param backfill indicates if the item must be put onto vb queue or
909      *                 onto checkpoint
910      * @param genBySeqno whether or not to generate sequence number
911      * @param generateCas whether or not to generate cas
912      * @param bySeqno seqno of the key being deleted
913      * @param isReplication set to true if we are to use replication
914      *                      throttle threshold
915      *
916      * @return the result of the operation
917      */
918     ENGINE_ERROR_CODE deleteWithMeta(const DocKey& key,
919                                      uint64_t& cas,
920                                      uint64_t* seqno,
921                                      const void* cookie,
922                                      EventuallyPersistentEngine& engine,
923                                      int bgFetchDelay,
924                                      bool force,
925                                      const ItemMetaData& itemMeta,
926                                      bool backfill,
927                                      GenerateBySeqno genBySeqno,
928                                      GenerateCas generateCas,
929                                      uint64_t bySeqno,
930                                      bool isReplication);
931
932     /**
933      * Delete an expired item
934      *
935      * @param it item to be deleted
936      * @param startTime the time to be compared with this item's expiry time
937      * @param source Expiry source
938      */
939     void deleteExpiredItem(const Item& it,
940                            time_t startTime,
941                            ExpireBy source);
942
943     /**
944      * Evict a key from memory.
945      *
946      * @param key Key to evict
947      * @param[out] msg Updated to point to a string (with static duration)
948      *                 describing the result of the operation.
949      *
950      * @return SUCCESS if key was successfully evicted (or was already
951      *                 evicted), or the reason why the request failed.
952      *
953      */
954     virtual protocol_binary_response_status evictKey(const DocKey& key,
955                                                      const char** msg) = 0;
956
957     /**
958      * Page out a StoredValue from memory.
959      *
960      * The definition of "page out" is up to the underlying VBucket
961      * implementation - this may mean simply ejecting the value from memory
962      * (Value Eviction), removing the entire document from memory (Full Eviction),
963      * or actually deleting the document (Ephemeral Buckets).
964      *
965      * @param lh Bucket lock associated with the StoredValue.
966      * @param v[in, out] Ref to the StoredValue to be ejected. Based on the
967      *                   VBucket type, policy in the vbucket contents of v and
968      *                   v itself may be changed
969      *
970      * @return true if an item is ejected.
971      */
972     virtual bool pageOut(const HashTable::HashBucketLock& lh,
973                          StoredValue*& v) = 0;
974
975     /**
976      * Add an item in the store
977      *
978      * @param itm the item to add. On success, this will have its seqno and
979      *            CAS updated.
980      * @param cookie the cookie representing the client to store the item
981      * @param engine Reference to ep engine
982      * @param bgFetchDelay
983      *
984      * @return the result of the operation
985      */
986     ENGINE_ERROR_CODE add(Item& itm,
987                           const void* cookie,
988                           EventuallyPersistentEngine& engine,
989                           int bgFetchDelay);
990
991     /**
992      * Retrieve a value, but update its TTL first
993      *
994      * @param key the key to fetch
995      * @param cookie the connection cookie
996      * @param engine Reference to ep engine
997      * @param bgFetchDelay
998      * @param exptime the new expiry time for the object
999      *
1000      * @return a GetValue representing the result of the request
1001      */
1002     GetValue getAndUpdateTtl(const DocKey& key,
1003                              const void* cookie,
1004                              EventuallyPersistentEngine& engine,
1005                              int bgFetchDelay,
1006                              time_t exptime);
1007     /**
1008      * Queue an Item to the checkpoint and return its seqno
1009      *
1010      * @param item an Item object to queue, can be any kind of item and will be
1011      *        given a CAS and seqno by this function.
1012      * @param seqno An optional sequence number, if not specified checkpoint
1013      *        queueing will assign a seqno to the Item.
1014      */
1015     int64_t queueItem(Item* item, OptionalSeqno seqno);
1016
1017     /**
1018      * Insert an item into the VBucket during warmup. If we're trying to insert
1019      * a partial item we mark it as nonResident
1020      *
1021      * @param itm Item to insert. itm is not modified. But cannot be passed as
1022      *            const because it is passed to functions that can generally
1023      *            modify the itm but do not modify it due to the flags passed.
1024      * @param eject true if we should eject the value immediately
1025      * @param keyMetaDataOnly is this just the key and meta-data or a complete
1026      *                        item
1027      *
1028      * @return the result of the operation
1029      */
1030     MutationStatus insertFromWarmup(Item& itm,
1031                                     bool eject,
1032                                     bool keyMetaDataOnly);
1033
1034     /**
1035      * Get metadata and value for a given key
1036      *
1037      * @param key key for which metadata and value should be retrieved
1038      * @param cookie the cookie representing the client
1039      * @param engine Reference to ep engine
1040      * @param bgFetchDelay
1041      * @param options flags indicating some retrieval related info
1042      * @param diskFlushAll
1043      *
1044      * @return the result of the operation
1045      */
1046     GetValue getInternal(const DocKey& key,
1047                          const void* cookie,
1048                          EventuallyPersistentEngine& engine,
1049                          int bgFetchDelay,
1050                          get_options_t options,
1051                          bool diskFlushAll);
1052
1053     /**
1054      * Retrieve the meta data for given key
1055      *
1056      * @param key the key to get the meta data for
1057      * @param cookie the connection cookie
1058      * @param engine Reference to ep engine
1059      * @param bgFetchDelay Delay in secs before we run the bgFetch task
1060      * @param[out] metadata meta information returned to the caller
1061      * @param[out] deleted specifies the caller whether or not the key is
1062      *                     deleted
1063      * @param[out] datatype specifies the datatype of the item
1064      *
1065      * @return the result of the operation
1066      */
1067     ENGINE_ERROR_CODE getMetaData(const DocKey& key,
1068                                   const void* cookie,
1069                                   EventuallyPersistentEngine& engine,
1070                                   int bgFetchDelay,
1071                                   ItemMetaData& metadata,
1072                                   uint32_t& deleted,
1073                                   uint8_t& datatype);
1074
1075     /**
1076      * Looks up the key stats for the given {vbucket, key}.
1077      *
1078      * @param key The key to lookup
1079      * @param cookie The client's cookie
1080      * @param engine Reference to ep engine
1081      * @param bgFetchDelay
1082      * @param[out] kstats On success the keystats for this item.
1083      * @param wantsDeleted If yes then return keystats even if the item is
1084      *                     marked as deleted. If no then will return
1085      *                     ENGINE_KEY_ENOENT for deleted items.
1086      *
1087      * @return the result of the operation
1088      */
1089     ENGINE_ERROR_CODE getKeyStats(const DocKey& key,
1090                                   const void* cookie,
1091                                   EventuallyPersistentEngine& engine,
1092                                   int bgFetchDelay,
1093                                   struct key_stats& kstats,
1094                                   WantsDeleted wantsDeleted);
1095
1096     /**
1097      * Gets a locked item for a given key.
1098      *
1099      * @param key The key to lookup
1100      * @param currentTime Current time to use for locking the item for a
1101      *                    duration of lockTimeout
1102      * @param lockTimeout Timeout for the lock on the item
1103      * @param cookie The client's cookie
1104      * @param engine Reference to ep engine
1105      * @param bgFetchDelay Delay in secs before we run the bgFetch task
1106      *
1107      * @return the result of the operation (contains locked item on success)
1108      */
1109     GetValue getLocked(const DocKey& key,
1110                        rel_time_t currentTime,
1111                        uint32_t lockTimeout,
1112                        const void* cookie,
1113                        EventuallyPersistentEngine& engine,
1114                        int bgFetchDelay);
1115     /**
1116      * Update in memory data structures after an item is deleted on disk
1117      *
1118      * @param queuedItem reference to the deleted item
1119      * @param deleted indicates if item actaully deleted or not (in case item
1120      *                did not exist on disk)
1121      */
1122     void deletedOnDiskCbk(const Item& queuedItem, bool deleted);
1123
1124     /**
1125      * Update in memory data structures after a rollback on disk
1126      *
1127      * @param queuedItem item key
1128      *
1129      * @return indicates if the operation is succcessful
1130      */
1131     bool deleteKey(const DocKey& key);
1132
1133     /**
1134      * Creates a DCP backfill object
1135      *
1136      * @param e ref to EventuallyPersistentEngine
1137      * @param stream ref to the stream for which this backfill obj is created
1138      * @param startSeqno requested start sequence number of the backfill
1139      * @param endSeqno requested end sequence number of the backfill
1140      *
1141      * @return pointer to the backfill object created. Caller to own this
1142      *         object and hence must handle deletion.
1143      */
1144     virtual std::unique_ptr<DCPBackfill> createDCPBackfill(
1145             EventuallyPersistentEngine& e,
1146             const active_stream_t& stream,
1147             uint64_t startSeqno,
1148             uint64_t endSeqno) = 0;
1149
1150     /**
1151      * Update failovers, checkpoint mgr and other vBucket members after
1152      * rollback.
1153      *
1154      * @param rollbackResult contains high seqno of the vBucket after rollback,
1155      *                       snapshot start seqno of the last snapshot in the
1156      *                       vBucket after the rollback,
1157      *                       snapshot end seqno of the last snapshot in the
1158      *                       vBucket after the rollback
1159      * @param prevHighSeqno high seqno before the rollback
1160      */
1161     void postProcessRollback(const RollbackResult& rollbackResult,
1162                              uint64_t prevHighSeqno);
1163
1164     /**
1165      * Debug - print a textual description of the VBucket to stderr.
1166      */
1167     virtual void dump() const;
1168
1169     /**
1170      * Returns the number of deletes in the memory
1171      *
1172      * @return number of deletes
1173      */
1174     size_t getNumInMemoryDeletes() const {
1175         /* couchbase vbuckets: this is generally (after deletes are persisted)
1176                                zero as hash table doesn't keep deletes after
1177                                they are persisted.
1178            ephemeral vbuckets: we keep deletes in both hash table and ordered
1179                                data structure. */
1180         return ht.getNumDeletedItems();
1181     }
1182
1183     static size_t getCheckpointFlushTimeout();
1184
1185     std::queue<queued_item> rejectQueue;
1186     std::unique_ptr<FailoverTable> failovers;
1187
1188     std::atomic<size_t>  opsCreate;
1189     std::atomic<size_t>  opsUpdate;
1190     std::atomic<size_t>  opsDelete;
1191     std::atomic<size_t>  opsReject;
1192
1193     cb::NonNegativeCounter<size_t> dirtyQueueSize;
1194     std::atomic<size_t>  dirtyQueueMem;
1195     std::atomic<size_t>  dirtyQueueFill;
1196     std::atomic<size_t>  dirtyQueueDrain;
1197     std::atomic<uint64_t> dirtyQueueAge;
1198     std::atomic<size_t>  dirtyQueuePendingWrites;
1199     std::atomic<size_t>  metaDataDisk;
1200
1201     std::atomic<size_t>  numExpiredItems;
1202
1203     /**
1204      * A custom delete function for deleting VBucket objects. Any thread could
1205      * be the last thread to release a VBucketPtr and deleting a VB will
1206      * eventually hit the I/O sub-system when we unlink the file, to be sure no
1207      * front-end thread does this work, we schedule the deletion to a background
1208      * task. This task scheduling is triggered by the shared_ptr/VBucketPtr
1209      * using this object as the deleter.
1210      */
1211     struct DeferredDeleter {
1212         DeferredDeleter(EventuallyPersistentEngine& engine) : engine(engine) {
1213         }
1214
1215         /**
1216          * Called when the VBucketPtr has no more owners and runs delete on
1217          * the object.
1218          */
1219         void operator()(VBucket* vb) const;
1220
1221         EventuallyPersistentEngine& engine;
1222     };
1223
1224 protected:
1225     /**
1226      * This function checks cas, expiry and other partition (vbucket) related
1227      * rules before setting an item into other in-memory structure like HT,
1228      * and checkpoint mgr. This function assumes that HT bucket lock is grabbed.
1229      *
1230      * @param hbl Hash table bucket lock that must be held
1231      * @param v Reference to the ptr of StoredValue. This can be changed if a
1232      *          new StoredValue is added or just its contents is changed if the
1233      *          exisiting StoredValue is updated
1234      * @param itm Item to be added/updated. On success, its revSeqno is updated
1235      * @param cas value to match
1236      * @param allowExisting set to false if you want set to fail if the
1237      *                      item exists already
1238      * @param hasMetaData
1239      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1240      *                    backfill queue; NULL if item need not be queued
1241      * @param maybeKeyExists true if bloom filter predicts that key may exist
1242      * @param isReplication true if issued by consumer (for replication)
1243      *
1244      * @return Result indicating the status of the operation and notification
1245      *                info
1246      */
1247     std::pair<MutationStatus, VBNotifyCtx> processSet(
1248             const HashTable::HashBucketLock& hbl,
1249             StoredValue*& v,
1250             Item& itm,
1251             uint64_t cas,
1252             bool allowExisting,
1253             bool hasMetaData,
1254             const VBQueueItemCtx* queueItmCtx = nullptr,
1255             bool maybeKeyExists = true,
1256             bool isReplication = false);
1257
1258     /**
1259      * This function checks cas, expiry and other partition (vbucket) related
1260      * rules before adding an item into other in-memory structure like HT,
1261      * and checkpoint mgr. This function assumes that HT bucket lock is grabbed.
1262      *
1263      * @param hbl Hash table bucket lock that must be held
1264      * @param v[in, out] the stored value to do this operation on
1265      * @param itm Item to be added/updated. On success, its revSeqno is updated
1266      * @param isReplication true if issued by consumer (for replication)
1267      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1268      *                    backfill queue; NULL if item need not be queued
1269      *
1270      * @return Result indicating the status of the operation and notification
1271      *                info
1272      */
1273     std::pair<AddStatus, VBNotifyCtx> processAdd(
1274             const HashTable::HashBucketLock& hbl,
1275             StoredValue*& v,
1276             Item& itm,
1277             bool maybeKeyExists,
1278             bool isReplication,
1279             const VBQueueItemCtx* queueItmCtx = nullptr);
1280
1281     /**
1282      * This function checks cas, eviction policy and other partition
1283      * (vbucket) related rules before logically (soft) deleting an item in
1284      * in-memory structure like HT, and checkpoint mgr.
1285      * Assumes that HT bucket lock is grabbed.
1286      *
1287      * @param hbl Hash table bucket lock that must be held
1288      * @param v Reference to the StoredValue to be soft deleted
1289      * @param cas the expected CAS of the item (or 0 to override)
1290      * @param metadata ref to item meta data
1291      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1292      *                    backfill queue
1293      * @param use_meta Indicates if v must be updated with the metadata
1294      * @param bySeqno seqno of the key being deleted
1295      *
1296      * @return pointer to the updated StoredValue. It can be same as that of
1297      *         v or different value if a new StoredValue is created for the
1298      *         update.
1299      *         status of the operation.
1300      *         notification info.
1301      */
1302     std::tuple<MutationStatus, StoredValue*, VBNotifyCtx> processSoftDelete(
1303             const HashTable::HashBucketLock& hbl,
1304             StoredValue& v,
1305             uint64_t cas,
1306             const ItemMetaData& metadata,
1307             const VBQueueItemCtx& queueItmCtx,
1308             bool use_meta,
1309             uint64_t bySeqno);
1310
1311     /**
1312      * Delete a key (associated StoredValue) from ALL in-memory data structures
1313      * like HT.
1314      * Assumes that HT bucket lock is grabbed.
1315      *
1316      * Currently StoredValues form HashTable intrusively. That is, HashTable
1317      * does not store a reference or a copy of the StoredValue. If any other
1318      * in-memory data strucutures are formed intrusively using StoredValues,
1319      * then it must be decided in this function which data structure deletes
1320      * the StoredValue. Currently it is HashTable that deleted the StoredValue
1321      *
1322      * @param hbl Hash table bucket lock that must be held
1323      * @param v Reference to the StoredValue to be deleted
1324      *
1325      * @return true if an object was deleted, false otherwise
1326      */
1327     bool deleteStoredValue(const HashTable::HashBucketLock& hbl,
1328                            StoredValue& v);
1329
1330     /**
1331      * Queue an item for persistence and replication. Maybe track CAS drift
1332      *
1333      * The caller of this function must hold the lock of the hash table
1334      * partition that contains the StoredValue being Queued.
1335      *
1336      * @param v the dirty item. The cas and seqno maybe updated based on the
1337      *          flags passed
1338      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1339      *                    backfill queue, whether to track cas, generate seqno,
1340      *                    generate new cas
1341      *
1342      * @return Notification context containing info needed to notify the
1343      *         clients (like connections, flusher)
1344      */
1345     VBNotifyCtx queueDirty(StoredValue& v, const VBQueueItemCtx& queueItmCtx);
1346
1347     /**
1348      * Queue an item for persistence and replication
1349      *
1350      * The caller of this function must hold the lock of the hash table
1351      * partition that contains the StoredValue being Queued.
1352      *
1353      * @param v the dirty item. The cas and seqno maybe updated based on the
1354      *          flags passed
1355      * @param generateBySeqno request that the seqno is generated by this call
1356      * @param generateCas request that the CAS is generated by this call
1357      * @param isBackfillItem indicates if the item must be put onto vb queue or
1358      *        onto checkpoint
1359      * @param preLinkDocumentContext context object which allows running the
1360      *        document pre link callback after the cas is assinged (but
1361      *        but document not available for anyone)
1362      *
1363      * @return Notification context containing info needed to notify the
1364      *         clients (like connections, flusher)
1365      */
1366     VBNotifyCtx queueDirty(
1367             StoredValue& v,
1368             GenerateBySeqno generateBySeqno = GenerateBySeqno::Yes,
1369             GenerateCas generateCas = GenerateCas::Yes,
1370             bool isBackfillItem = false,
1371             PreLinkDocumentContext* preLinkDocumentContext = nullptr);
1372
1373     /**
1374      * Adds a temporary StoredValue in in-memory data structures like HT.
1375      * Assumes that HT bucket lock is grabbed.
1376      *
1377      * @param hbl Hash table bucket lock that must be held
1378      * @param key the key for which a temporary item needs to be added
1379      * @param isReplication true if issued by consumer (for replication)
1380      *
1381      * @return Result indicating the status of the operation
1382      */
1383     AddStatus addTempStoredValue(const HashTable::HashBucketLock& hbl,
1384                                  const DocKey& key,
1385                                  bool isReplication = false);
1386
1387     /**
1388      * Internal wrapper function around the callback to be called when a new
1389      * seqno is generated in the vbucket
1390      *
1391      * @param notifyCtx holds info needed for notification
1392      */
1393     void notifyNewSeqno(const VBNotifyCtx& notifyCtx);
1394
1395     /**
1396      * VBucket internal function to store high priority requests on the vbucket.
1397      *
1398      * @param seqnoOrChkId seqno to be seen or checkpoint id to be persisted
1399      * @param cookie cookie of conn to be notified
1400      * @param reqType request type indicating seqno or chk persistence
1401      */
1402     void addHighPriorityVBEntry(uint64_t seqnoOrChkId,
1403                                 const void* cookie,
1404                                 HighPriorityVBNotify reqType);
1405
1406     /**
1407      * Get all high priority notifications as temporary failures because they
1408      * could not be completed.
1409      *
1410      * @param engine Ref to ep-engine
1411      *
1412      * @return map of notifies with conn cookie as the key and notify status as
1413      *         the value
1414      */
1415     std::map<const void*, ENGINE_ERROR_CODE> tmpFailAndGetAllHpNotifies(
1416             EventuallyPersistentEngine& engine);
1417
1418     void _addStats(bool details, ADD_STAT add_stat, const void* c);
1419
1420     template <typename T>
1421     void addStat(const char* nm,
1422                  const T& val,
1423                  ADD_STAT add_stat,
1424                  const void* c);
1425
1426     /* This member holds the eviction policy used */
1427     const item_eviction_policy_t eviction;
1428
1429     /* Reference to global (EP engine wide) stats */
1430     EPStats& stats;
1431
1432     /* last seqno that is persisted on the disk */
1433     std::atomic<uint64_t> persistenceSeqno;
1434
1435     /* holds all high priority async requests to the vbucket */
1436     std::list<HighPriorityVBEntry> hpVBReqs;
1437
1438     /* synchronizes access to hpVBReqs */
1439     std::mutex hpVBReqsMutex;
1440
1441     /* size of list hpVBReqs (to avoid MB-9434) */
1442     Couchbase::RelaxedAtomic<size_t> numHpVBReqs;
1443
1444     /**
1445      * VBucket sub-classes must implement a function that will schedule
1446      * an appropriate task that will delete the VBucket and its resources.
1447      *
1448      * @param engine owning engine (required for task construction)
1449      */
1450     virtual void scheduleDeferredDeletion(
1451             EventuallyPersistentEngine& engine) = 0;
1452
1453 private:
1454     void fireAllOps(EventuallyPersistentEngine& engine, ENGINE_ERROR_CODE code);
1455
1456     void decrDirtyQueueMem(size_t decrementBy);
1457
1458     void decrDirtyQueueAge(uint32_t decrementBy);
1459
1460     void decrDirtyQueuePendingWrites(size_t decrementBy);
1461
1462     /**
1463      * Updates an existing StoredValue in in-memory data structures like HT.
1464      * Assumes that HT bucket lock is grabbed.
1465      *
1466      * @param htLock Hash table lock that must be held
1467      * @param v Reference to the StoredValue to be updated.
1468      * @param itm Item to be updated.
1469      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1470      *                    backfill queue; NULL if item need not be queued
1471      *
1472      * @return pointer to the updated StoredValue. It can be same as that of
1473      *         v or different value if a new StoredValue is created for the
1474      *         update.
1475      *         status of the operation.
1476      *         notification info.
1477      */
1478     virtual std::tuple<StoredValue*, MutationStatus, VBNotifyCtx>
1479     updateStoredValue(const HashTable::HashBucketLock& hbl,
1480                       StoredValue& v,
1481                       const Item& itm,
1482                       const VBQueueItemCtx* queueItmCtx) = 0;
1483
1484     /**
1485      * Adds a new StoredValue in in-memory data structures like HT.
1486      * Assumes that HT bucket lock is grabbed.
1487      *
1488      * @param hbl Hash table bucket lock that must be held
1489      * @param itm Item to be added.
1490      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1491      *                    backfill queue; NULL if item need not be queued
1492      *
1493      * @return Ptr of the StoredValue added and notification info
1494      */
1495     virtual std::pair<StoredValue*, VBNotifyCtx> addNewStoredValue(
1496             const HashTable::HashBucketLock& hbl,
1497             const Item& itm,
1498             const VBQueueItemCtx* queueItmCtx) = 0;
1499
1500     /**
1501      * Logically (soft) delete item in all in-memory data structures. Also
1502      * updates revSeqno. Depending on the in-memory data structure the item may
1503      * be marked delete and/or reset and/or a new value (marked as deleted)
1504      * added.
1505      * Assumes that HT bucket lock is grabbed.
1506      * Also assumes that v is in the hash table.
1507      *
1508      * @param hbl Hash table bucket lock that must be held
1509      * @param v Reference to the StoredValue to be soft deleted
1510      * @param onlyMarkDeleted indicates if we must reset the StoredValue or
1511      *                        just mark deleted
1512      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1513      *                    backfill queue
1514      * @param bySeqno seqno of the key being deleted
1515      *
1516      * @return pointer to the updated StoredValue. It can be same as that of
1517      *         v or different value if a new StoredValue is created for the
1518      *         update.
1519      *         notification info.
1520      */
1521     virtual std::tuple<StoredValue*, VBNotifyCtx> softDeleteStoredValue(
1522             const HashTable::HashBucketLock& hbl,
1523             StoredValue& v,
1524             bool onlyMarkDeleted,
1525             const VBQueueItemCtx& queueItmCtx,
1526             uint64_t bySeqno) = 0;
1527
1528     /**
1529      * This function handles expiry relatead stuff before logically (soft)
1530      * deleting an item in in-memory structures like HT, and checkpoint mgr.
1531      * Assumes that HT bucket lock is grabbed.
1532      *
1533      * @param hbl Hash table bucket lock that must be held
1534      * @param v Reference to the StoredValue to be soft deleted
1535      *
1536      * @return status of the operation.
1537      *         pointer to the updated StoredValue. It can be same as that of
1538      *         v or different value if a new StoredValue is created for the
1539      *         update.
1540      *         notification info.
1541      */
1542     std::tuple<MutationStatus, StoredValue*, VBNotifyCtx> processExpiredItem(
1543             const HashTable::HashBucketLock& hbl, StoredValue& v);
1544
1545     /**
1546      * Add a temporary item in hash table and enqueue a background fetch for a
1547      * key.
1548      *
1549      * @param hbl Reference to the hash table bucket lock
1550      * @param key the key to be bg fetched
1551      * @param cookie the cookie of the requestor
1552      * @param engine Reference to ep engine
1553      * @param bgFetchDelay Delay in secs before we run the bgFetch task
1554      * @param metadataOnly whether the fetch is for a non-resident value or
1555      *                     metadata of a (possibly) deleted item
1556      * @param isReplication indicates if the call is for a replica vbucket
1557      *
1558      * @return ENGINE_ERROR_CODE status notified to be to the front end
1559      */
1560     virtual ENGINE_ERROR_CODE addTempItemAndBGFetch(
1561             HashTable::HashBucketLock& hbl,
1562             const DocKey& key,
1563             const void* cookie,
1564             EventuallyPersistentEngine& engine,
1565             int bgFetchDelay,
1566             bool metadataOnly,
1567             bool isReplication = false) = 0;
1568
1569     /**
1570      * Enqueue a background fetch for a key.
1571      *
1572      * @param key the key to be bg fetched
1573      * @param cookie the cookie of the requestor
1574      * @param engine Reference to ep engine
1575      * @param bgFetchDelay Delay in secs before we run the bgFetch task
1576      * @param isMeta whether the fetch is for a non-resident value or metadata
1577      *               of a (possibly) deleted item
1578      */
1579     virtual void bgFetch(const DocKey& key,
1580                          const void* cookie,
1581                          EventuallyPersistentEngine& engine,
1582                          int bgFetchDelay,
1583                          bool isMeta = false) = 0;
1584
1585     /**
1586      * Get metadata and value for a non-resident key
1587      *
1588      * @param key key for which metadata and value should be retrieved
1589      * @param cookie the cookie representing the client
1590      * @param engine Reference to ep engine
1591      * @param bgFetchDelay Delay in secs before we run the bgFetch task
1592      * @param options flags indicating some retrieval related info
1593      * @param v reference to the stored value of the non-resident key
1594      *
1595      * @return the result of the operation
1596      */
1597     virtual GetValue getInternalNonResident(const DocKey& key,
1598                                             const void* cookie,
1599                                             EventuallyPersistentEngine& engine,
1600                                             int bgFetchDelay,
1601                                             get_options_t options,
1602                                             const StoredValue& v) = 0;
1603
1604     /**
1605      * Update the revision seqno of a newly StoredValue item.
1606      * We must ensure that it is greater the maxDeletedRevSeqno
1607      *
1608      * @param v StoredValue added newly. Its revSeqno is updated
1609      */
1610     void updateRevSeqNoOfNewStoredValue(StoredValue& v);
1611
1612     /**
1613      * Increase the expiration count global stats and in the vbucket stats
1614      */
1615     void incExpirationStat(ExpireBy source);
1616
1617     void adjustCheckpointFlushTimeout(size_t wall_time);
1618
1619     /**
1620      * Given a StoredValue with XATTRs - prune the user keys so only system keys
1621      * remain.
1622      *
1623      * @param v StoredValue with XATTR value
1624      * @param itemMeta New ItemMetaData to use in item creation
1625      * @return unique_ptr<Item> which matches the StoredValue's meta-data and
1626      *         has the XATTR value with only the system-keys. If the pruning
1627      *         removed all keys (because no system-keys exist) an empty
1628      *         unique_ptr is returned.
1629      */
1630     std::unique_ptr<Item> pruneXattrDocument(StoredValue& v,
1631                                              const ItemMetaData& itemMeta);
1632
1633     id_type                         id;
1634     std::atomic<vbucket_state_t>    state;
1635     cb::RWLock                      stateLock;
1636     vbucket_state_t                 initialState;
1637     std::mutex                           pendingOpLock;
1638     std::vector<const void*>        pendingOps;
1639     hrtime_t                        pendingOpsStart;
1640     uint64_t                        purge_seqno;
1641     std::atomic<bool>               takeover_backed_up;
1642
1643     /* snapshotMutex is used to update/read the pair {start, end} atomically,
1644        but not if reading a single field. */
1645     mutable std::mutex snapshotMutex;
1646     uint64_t persisted_snapshot_start;
1647     uint64_t persisted_snapshot_end;
1648
1649     std::mutex bfMutex;
1650     std::unique_ptr<BloomFilter> bFilter;
1651     std::unique_ptr<BloomFilter> tempFilter;    // Used during compaction.
1652
1653     std::atomic<uint64_t> rollbackItemCount;
1654
1655     HLC hlc;
1656     std::string statPrefix;
1657     // The persistence checkpoint ID for this vbucket.
1658     std::atomic<uint64_t> persistenceCheckpointId;
1659     // Flag to indicate the vbucket is being created
1660     std::atomic<bool> bucketCreation;
1661     // Flag to indicate the vbucket deletion is deferred
1662     std::atomic<bool> deferredDeletion;
1663     /// A cookie that can be set when the vbucket is deletion is deferred, the
1664     /// cookie will be notified when the deferred deletion completes
1665     const void* deferredDeletionCookie;
1666
1667     // Ptr to the item conflict resolution module
1668     std::unique_ptr<ConflictResolution> conflictResolver;
1669
1670     // A callback to be called when a new seqno is generated in the vbucket as
1671     // a result of a front end call
1672     NewSeqnoCallback newSeqnoCb;
1673
1674     /// The VBucket collection state
1675     Collections::VB::Manifest manifest;
1676
1677     static std::atomic<size_t> chkFlushTimeout;
1678
1679     friend class VBucketTest;
1680
1681     DISALLOW_COPY_AND_ASSIGN(VBucket);
1682 };
1683
1684 using VBucketPtr = std::shared_ptr<VBucket>;