MB-23906: Implement delete-with-value with store() instead of delete()
[ep-engine.git] / src / kv_bucket.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *   you may not use this file except in compliance with the License.
4  *   You may obtain a copy of the License at
5  *
6  *       http://www.apache.org/licenses/LICENSE-2.0
7  *
8  *   Unless required by applicable law or agreed to in writing, software
9  *   distributed under the License is distributed on an "AS IS" BASIS,
10  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  *   See the License for the specific language governing permissions and
12  *   limitations under the License.
13  */
14
15 #pragma once
16
17 #include "config.h"
18
19 #include "ep_types.h"
20 #include "executorpool.h"
21 #include "mutation_log.h"
22 #include "storeddockey.h"
23 #include "stored-value.h"
24 #include "task_type.h"
25 #include "vbucket.h"
26 #include "vbucketmap.h"
27 #include "utility.h"
28 #include "kv_bucket_iface.h"
29
30 #include <deque>
31
32 class VBucketCountVisitor;
33
34 /**
35  * VBucket visitor callback adaptor.
36  */
37 class VBCBAdaptor : public GlobalTask {
38 public:
39     VBCBAdaptor(KVBucket* s,
40                 TaskId id,
41                 std::unique_ptr<VBucketVisitor> v,
42                 const char* l,
43                 double sleep = 0,
44                 bool shutdown = false);
45
46     cb::const_char_buffer getDescription() {
47         std::unique_lock<std::mutex> lock(description.mutex);
48         return description.text;
49     }
50
51     bool run(void);
52
53 private:
54     std::queue<uint16_t>        vbList;
55     KVBucket  *store;
56     std::unique_ptr<VBucketVisitor> visitor;
57     const char                 *label;
58     double                      sleepTime;
59     std::atomic<uint16_t>       currentvb;
60
61     struct {
62         std::mutex mutex;
63         std::string text;
64     } description;
65
66     /// re-calculate the description (when the currentVB changes).
67     void updateDescription();
68
69     DISALLOW_COPY_AND_ASSIGN(VBCBAdaptor);
70 };
71
72 const uint16_t EP_PRIMARY_SHARD = 0;
73 class KVShard;
74
75 typedef std::pair<uint16_t, ExTask> CompTaskEntry;
76
77
78 /**
79  * KVBucket is the base class for concrete Key/Value bucket implementations
80  * which use the concept of VBuckets to support replication, persistence and
81  * failover.
82  *
83  */
84 class KVBucket : public KVBucketIface {
85 public:
86
87     KVBucket(EventuallyPersistentEngine &theEngine);
88     virtual ~KVBucket();
89
90     /**
91      * Start necessary tasks.
92      * Client calling initialize must also call deinitialize before deleting
93      * the EPBucket instance
94      */
95     bool initialize();
96
97     /**
98      * Stop tasks started in initialize()
99      */
100     void deinitialize();
101
102     /**
103      * Set an item in the store.
104      * @param item the item to set. On success, this will have its seqno and
105      *             CAS updated.
106      * @param cookie the cookie representing the client to store the item
107      * @return the result of the store operation
108      */
109     ENGINE_ERROR_CODE set(Item &item, const void *cookie);
110
111     /**
112      * Add an item in the store.
113      * @param item the item to add. On success, this will have its seqno and
114      *             CAS updated.
115      * @param cookie the cookie representing the client to store the item
116      * @return the result of the operation
117      */
118     ENGINE_ERROR_CODE add(Item &item, const void *cookie);
119
120     /**
121      * Replace an item in the store.
122      * @param item the item to replace. On success, this will have its seqno
123      *             and CAS updated.
124      * @param cookie the cookie representing the client to store the item
125      * @return the result of the operation
126      */
127     ENGINE_ERROR_CODE replace(Item &item, const void *cookie);
128
129     /**
130      * Add an TAP backfill or DCP backfill item into its corresponding vbucket
131      * @param item the item to be added
132      * @param genBySeqno whether or not to generate sequence number
133      * @return the result of the operation
134      */
135     ENGINE_ERROR_CODE addBackfillItem(Item& item,
136                                       GenerateBySeqno genBySeqno,
137                                       ExtendedMetaData* emd = NULL);
138
139     /**
140      * Retrieve a value.
141      *
142      * @param key     the key to fetch
143      * @param vbucket the vbucket from which to retrieve the key
144      * @param cookie  the connection cookie
145      * @param options options specified for retrieval
146      *
147      * @return a GetValue representing the result of the request
148      */
149     GetValue get(const DocKey& key, uint16_t vbucket,
150                  const void *cookie, get_options_t options) {
151         return getInternal(key, vbucket, cookie, vbucket_state_active,
152                            options);
153     }
154
155     GetValue getRandomKey(void);
156
157     /**
158      * Retrieve a value from a vbucket in replica state.
159      *
160      * @param key     the key to fetch
161      * @param vbucket the vbucket from which to retrieve the key
162      * @param cookie  the connection cookie
163      * @param options options specified for retrieval
164      *
165      * @return a GetValue representing the result of the request
166      */
167     GetValue getReplica(const DocKey& key, uint16_t vbucket,
168                         const void *cookie,
169                         get_options_t options = static_cast<get_options_t>(
170                                                         QUEUE_BG_FETCH |
171                                                         HONOR_STATES |
172                                                         TRACK_REFERENCE |
173                                                         DELETE_TEMP |
174                                                         HIDE_LOCKED_CAS)) {
175         return getInternal(key, vbucket, cookie, vbucket_state_replica,
176                            options);
177     }
178
179
180     /**
181      * Retrieve the meta data for an item
182      *
183      * @parapm key the key to get the meta data for
184      * @param vbucket the vbucket from which to retrieve the key
185      * @param cookie the connection cookie
186      * @param fetchDatatype whether to fetch datatype or not
187      * @param[out] metadata where to store the meta informaion
188      * @param[out] deleted specifies whether or not the key is deleted
189      * @param[out] datatype specifies the datatype for the given item
190      */
191     ENGINE_ERROR_CODE getMetaData(const DocKey& key,
192                                   uint16_t vbucket,
193                                   const void* cookie,
194                                   bool fetchDatatype,
195                                   ItemMetaData& metadata,
196                                   uint32_t& deleted,
197                                   uint8_t& datatype);
198
199     /**
200      * Set an item in the store.
201      * @param item the item to set
202      * @param cas value to match
203      * @param seqno sequence number of mutation
204      * @param cookie the cookie representing the client to store the item
205      * @param force override vbucket states
206      * @param allowExisting set to false if you want set to fail if the
207      *                      item exists already
208      * @param genBySeqno whether or not to generate sequence number
209      * @param emd ExtendedMetaData class object that contains any ext meta
210      * @param isReplication set to true if we are to use replication
211      *                      throttle threshold
212      *
213      * @return the result of the store operation
214      */
215     ENGINE_ERROR_CODE setWithMeta(Item &item,
216                                   uint64_t cas,
217                                   uint64_t *seqno,
218                                   const void *cookie,
219                                   bool force,
220                                   bool allowExisting,
221                                   GenerateBySeqno genBySeqno = GenerateBySeqno::Yes,
222                                   GenerateCas genCas = GenerateCas::No,
223                                   ExtendedMetaData *emd = NULL,
224                                   bool isReplication = false);
225
226     /**
227      * Retrieve a value, but update its TTL first
228      *
229      * @param key the key to fetch
230      * @param vbucket the vbucket from which to retrieve the key
231      * @param cookie the connection cookie
232      * @param exptime the new expiry time for the object
233      *
234      * @return a GetValue representing the result of the request
235      */
236     GetValue getAndUpdateTtl(const DocKey& key, uint16_t vbucket,
237                              const void *cookie, time_t exptime);
238
239     ENGINE_ERROR_CODE deleteItem(const DocKey& key,
240                                  uint64_t& cas,
241                                  uint16_t vbucket,
242                                  const void* cookie,
243                                  ItemMetaData* itemMeta,
244                                  mutation_descr_t* mutInfo);
245
246     ENGINE_ERROR_CODE deleteWithMeta(const DocKey& key,
247                                      uint64_t& cas,
248                                      uint64_t* seqno,
249                                      uint16_t vbucket,
250                                      const void* cookie,
251                                      bool force,
252                                      const ItemMetaData& itemMeta,
253                                      bool backfill,
254                                      GenerateBySeqno genBySeqno,
255                                      GenerateCas generateCas,
256                                      uint64_t bySeqno,
257                                      ExtendedMetaData* emd,
258                                      bool isReplication);
259
260     virtual void reset();
261
262     /**
263      * Set the background fetch delay.
264      *
265      * This exists for debugging and testing purposes.  It
266      * artificially injects delays into background fetches that are
267      * performed when the user requests an item whose value is not
268      * currently resident.
269      *
270      * @param to how long to delay before performing a bg fetch
271      */
272     void setBGFetchDelay(uint32_t to) {
273         bgFetchDelay = to;
274     }
275
276     double getBGFetchDelay(void) { return (double)bgFetchDelay; }
277
278     virtual bool pauseFlusher();
279     virtual bool resumeFlusher();
280     virtual void wakeUpFlusher();
281
282     /**
283      * Takes a snapshot of the current stats and persists them to disk.
284      */
285     void snapshotStats(void);
286
287     virtual void getAggregatedVBucketStats(const void* cookie,
288                                            ADD_STAT add_stat);
289
290    /**
291      * Complete a background fetch of a non resident value or metadata.
292      *
293      * @param key the key that was fetched
294      * @param vbucket the vbucket in which the key lived
295      * @param cookie the cookie of the requestor
296      * @param init the timestamp of when the request came in
297      * @param isMeta whether the fetch is for a non-resident value or metadata of
298      *               a (possibly) deleted item
299      */
300     void completeBGFetch(const DocKey& key,
301                          uint16_t vbucket,
302                          const void* cookie,
303                          ProcessClock::time_point init,
304                          bool isMeta);
305     /**
306      * Complete a batch of background fetch of a non resident value or metadata.
307      *
308      * @param vbId the vbucket in which the requested key lived
309      * @param fetchedItems vector of completed background feches containing key,
310      *                     value, client cookies
311      * @param start the time when the background fetch was started
312      *
313      */
314     void completeBGFetchMulti(uint16_t vbId,
315                               std::vector<bgfetched_item_t>& fetchedItems,
316                               ProcessClock::time_point start);
317
318     VBucketPtr getVBucket(uint16_t vbid) {
319         return vbMap.getBucket(vbid);
320     }
321
322     std::pair<uint64_t, bool> getLastPersistedCheckpointId(uint16_t vb) {
323         // No persistence at the KVBucket class level.
324         return {0, false};
325     }
326
327     uint64_t getLastPersistedSeqno(uint16_t vb) {
328         auto vbucket = vbMap.getBucket(vb);
329         if (vbucket) {
330             return vbucket->getPersistenceSeqno();
331         } else {
332             return 0;
333         }
334     }
335
336     /* transfer should be set to true *only* if this vbucket is becoming master
337      * as the result of the previous master cleanly handing off control. */
338     ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t state,
339                                       bool transfer, bool notify_dcp = true);
340
341     /**
342      * Physically deletes a VBucket from disk. This function should only
343      * be called on a VBucket that has already been logically deleted.
344      *
345      * @param vbid vbucket id
346      * @param cookie The connection that requested the deletion
347      */
348     bool completeVBucketDeletion(uint16_t vbid, const void* cookie);
349
350     /**
351      * Deletes a vbucket
352      *
353      * @param vbid The vbucket to delete.
354      * @param c The cookie for this connection.
355      *          Used in synchronous bucket deletes
356      *          to notify the connection of operation completion.
357      */
358     ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL);
359
360     /**
361      * Check for the existence of a vbucket in the case of couchstore
362      * or shard in the case of forestdb. Note that this function will be
363      * deprecated once forestdb is the only backend supported
364      *
365      * @param db_file_id vbucketid for couchstore or shard id in the
366      *                   case of forestdb
367      */
368     ENGINE_ERROR_CODE checkForDBExistence(uint16_t db_file_id);
369
370     /**
371      * Triggers compaction of a database file
372      *
373      * @param vbid The vbucket being compacted
374      * @param c The context for compaction of a DB file
375      * @param ck cookie used to notify connection of operation completion
376      */
377     ENGINE_ERROR_CODE scheduleCompaction(uint16_t vbid, compaction_ctx c, const void *ck);
378
379     /**
380      * Compaction of a database file
381      *
382      * @param ctx Context for compaction hooks
383      * @param ck cookie used to notify connection of operation completion
384      *
385      * return true if the compaction needs to be rescheduled and false
386      *             otherwise
387      */
388     bool doCompact(compaction_ctx *ctx, const void *ck);
389
390     /**
391      * Get the database file id for the compaction request
392      *
393      * @param req compaction request structure
394      *
395      * returns the database file id from the underlying KV store
396      */
397     uint16_t getDBFileId(const protocol_binary_request_compact_db& req);
398
399     /**
400      * Remove completed compaction tasks or wake snoozed tasks
401      *
402      * @param db_file_id vbucket id for couchstore or shard id in the
403      *                   case of forestdb
404      */
405     void updateCompactionTasks(uint16_t db_file_id);
406
407     /**
408      * Reset a given vbucket from memory and disk. This differs from vbucket deletion in that
409      * it does not delete the vbucket instance from memory hash table.
410      */
411     bool resetVBucket(uint16_t vbid);
412
413     /**
414      * Run a vBucket visitor, visiting all items. Synchronous.
415      */
416     void visit(VBucketVisitor &visitor);
417
418     /**
419      * Run a vbucket visitor with separate jobs per vbucket.
420      *
421      * Note that this is asynchronous.
422      */
423     size_t visit(std::unique_ptr<VBucketVisitor> visitor,
424                  const char* lbl,
425                  TaskId id,
426                  double sleepTime = 0) {
427         return ExecutorPool::get()->schedule(
428                 new VBCBAdaptor(this, id, std::move(visitor), lbl, sleepTime));
429     }
430
431     /**
432      * Visit the items in this epStore, starting the iteration from the
433      * given startPosition and allowing the visit to be paused at any point.
434      *
435      * During visitation, the visitor object can request that the visit
436      * is stopped after the current item. The position passed to the
437      * visitor can then be used to restart visiting at the *APPROXIMATE*
438      * same position as it paused.
439      * This is approximate as various locks are released when the
440      * function returns, so any changes to the underlying epStore may cause
441      * the visiting to restart at the slightly different place.
442      *
443      * As a consequence, *DO NOT USE THIS METHOD* if you need to guarantee
444      * that all items are visited!
445      *
446      * @param visitor The visitor object.
447      * @return The final epStore position visited; equal to
448      *         EPBucket::end() if all items were visited
449      *         otherwise the position to resume from.
450      */
451     Position pauseResumeVisit(PauseResumeEPStoreVisitor& visitor,
452                               Position& start_pos);
453
454
455     /**
456      * Return a position at the start of the epStore.
457      */
458     Position startPosition() const;
459
460     /**
461      * Return a position at the end of the epStore. Has similar semantics
462      * as STL end() (i.e. one past the last element).
463      */
464     Position endPosition() const;
465
466     const Flusher* getFlusher(uint16_t shardId);
467
468     Warmup* getWarmup(void) const;
469
470     /**
471      * Looks up the key stats for the given {vbucket, key}.
472      * @param key The key to lookup
473      * @param vbucket The vbucket the key belongs to.
474      * @param cookie The client's cookie
475      * @param[out] kstats On success the keystats for this item.
476      * @param wantsDeleted If yes then return keystats even if the item is
477      *                     marked as deleted. If no then will return
478      *                     ENGINE_KEY_ENOENT for deleted items.
479      */
480     ENGINE_ERROR_CODE getKeyStats(const DocKey& key,
481                                   uint16_t vbucket,
482                                   const void* cookie,
483                                   key_stats& kstats,
484                                   WantsDeleted wantsDeleted);
485
486     std::string validateKey(const DocKey& key,  uint16_t vbucket,
487                             Item &diskItem);
488
489     GetValue getLocked(const DocKey& key, uint16_t vbucket,
490                        rel_time_t currentTime, uint32_t lockTimeout,
491                        const void *cookie);
492
493     ENGINE_ERROR_CODE unlockKey(const DocKey& key,
494                                 uint16_t vbucket,
495                                 uint64_t cas,
496                                 rel_time_t currentTime);
497
498
499     KVStore* getRWUnderlying(uint16_t vbId) {
500         return vbMap.getShardByVbId(vbId)->getRWUnderlying();
501     }
502
503     KVStore* getRWUnderlyingByShard(size_t shardId) {
504         return vbMap.shards[shardId]->getRWUnderlying();
505     }
506
507     KVStore* getROUnderlyingByShard(size_t shardId) {
508         return vbMap.shards[shardId]->getROUnderlying();
509     }
510
511     KVStore* getROUnderlying(uint16_t vbId) {
512         return vbMap.getShardByVbId(vbId)->getROUnderlying();
513     }
514
515     protocol_binary_response_status evictKey(const DocKey& key,
516                                              VBucket::id_type vbucket,
517                                              const char** msg);
518
519     void deleteExpiredItem(uint16_t, const DocKey&, time_t, uint64_t, ExpireBy);
520     void deleteExpiredItems(std::list<std::pair<uint16_t, StoredDocKey>>&,
521                             ExpireBy);
522
523     /**
524      * Get the memoized storage properties from the DB.kv
525      */
526     const StorageProperties getStorageProperties() const {
527         KVStore* store  = vbMap.shards[0]->getROUnderlying();
528         return store->getStorageProperties();
529     }
530
531     virtual void scheduleVBStatePersist();
532
533     virtual void scheduleVBStatePersist(VBucket::id_type vbid);
534
535     const VBucketMap &getVBuckets() {
536         return vbMap;
537     }
538
539     EventuallyPersistentEngine& getEPEngine() {
540         return engine;
541     }
542
543     size_t getExpiryPagerSleeptime(void) {
544         LockHolder lh(expiryPager.mutex);
545         return expiryPager.sleeptime;
546     }
547
548     size_t getTransactionTimePerItem() {
549         return lastTransTimePerItem.load();
550     }
551
552     bool isDeleteAllScheduled() {
553         return diskDeleteAll.load();
554     }
555
556     bool scheduleDeleteAllTask(const void* cookie);
557
558     void setDeleteAllComplete();
559
560     void setBackfillMemoryThreshold(double threshold);
561
562     void setExpiryPagerSleeptime(size_t val);
563     void setExpiryPagerTasktime(ssize_t val);
564     void enableExpiryPager();
565     void disableExpiryPager();
566
567     void enableItemPager();
568     void disableItemPager();
569
570     void enableAccessScannerTask();
571     void disableAccessScannerTask();
572     void setAccessScannerSleeptime(size_t val, bool useStartTime);
573     void resetAccessScannerStartTime();
574
575     void resetAccessScannerTasktime() {
576         accessScanner.lastTaskRuntime = gethrtime();
577     }
578
579     void setAllBloomFilters(bool to);
580
581     float getBfiltersResidencyThreshold() {
582         return bfilterResidencyThreshold;
583     }
584
585     void setBfiltersResidencyThreshold(float to) {
586         bfilterResidencyThreshold = to;
587     }
588
589     bool isMetaDataResident(VBucketPtr &vb, const DocKey& key);
590
591     void logQTime(TaskId taskType, const ProcessClock::duration enqTime) {
592         const auto ns_count = std::chrono::duration_cast
593                 <std::chrono::microseconds>(enqTime).count();
594         stats.schedulingHisto[static_cast<int>(taskType)].add(ns_count);
595     }
596
597     void logRunTime(TaskId taskType, const ProcessClock::duration runTime) {
598         const auto ns_count = std::chrono::duration_cast
599                 <std::chrono::microseconds>(runTime).count();
600         stats.taskRuntimeHisto[static_cast<int>(taskType)].add(ns_count);
601     }
602
603     bool multiBGFetchEnabled() {
604         StorageProperties storeProp = getStorageProperties();
605         return storeProp.hasEfficientGet();
606     }
607
608     void updateCachedResidentRatio(size_t activePerc, size_t replicaPerc) {
609         cachedResidentRatio.activeRatio.store(activePerc);
610         cachedResidentRatio.replicaRatio.store(replicaPerc);
611     }
612
613     bool isWarmingUp();
614
615     bool maybeEnableTraffic(void);
616
617     /**
618      * Checks the memory consumption.
619      * To be used by backfill tasks (tap & dcp).
620      */
621     bool isMemoryUsageTooHigh();
622
623     /**
624      * Flushes all items waiting for persistence in a given vbucket
625      * @param vbid The id of the vbucket to flush
626      * @return The number of items flushed
627      */
628     int flushVBucket(uint16_t vbid);
629
630     void commit(KVStore& kvstore, const Item* collectionsManifest);
631
632     void addKVStoreStats(ADD_STAT add_stat, const void* cookie);
633
634     void addKVStoreTimingStats(ADD_STAT add_stat, const void* cookie);
635
636     /* Given a named KVStore statistic, return the value of that statistic,
637      * accumulated across any shards.
638      *
639      * @param name The name of the statistic
640      * @param[out] value The value of the statistic.
641      * @param option the KVStore to read stats from.
642      * @return True if the statistic was successfully returned via {value},
643      *              else false.
644      */
645     bool getKVStoreStat(const char* name, size_t& value,
646                         KVSOption option);
647
648     void resetUnderlyingStats(void);
649     KVStore *getOneROUnderlying(void);
650     KVStore *getOneRWUnderlying(void);
651
652     item_eviction_policy_t getItemEvictionPolicy(void) const {
653         return eviction_policy;
654     }
655
656     /*
657      * Request a rollback of the vbucket to the specified seqno.
658      * If the rollbackSeqno is not a checkpoint boundary, then the rollback
659      * will be to the nearest checkpoint.
660      * There are also cases where the rollback will be forced to 0.
661      * various failures or if the rollback is > 50% of the data.
662      *
663      * A check of the vbucket's high-seqno indicates if a rollback request
664      * was not honoured exactly.
665      *
666      * @param vbid The vbucket to rollback
667      * @rollbackSeqno The seqno to rollback to.
668      * @return ENGINE_EINVAL if VB is not replica, ENGINE_NOT_MY_VBUCKET if vbid
669      *         is not managed by this instance or ENGINE_SUCCESS.
670      */
671     ENGINE_ERROR_CODE rollback(uint16_t vbid, uint64_t rollbackSeqno);
672
673     void wakeUpItemPager() {
674         if (itemPagerTask->getState() == TASK_SNOOZED) {
675             ExecutorPool::get()->wake(itemPagerTask->getId());
676         }
677     }
678
679     void wakeUpCheckpointRemover() {
680         if (chkTask->getState() == TASK_SNOOZED) {
681             ExecutorPool::get()->wake(chkTask->getId());
682         }
683     }
684
685     void runDefragmenterTask();
686
687     bool runAccessScannerTask();
688
689     void runVbStatePersistTask(int vbid);
690
691     void setCompactionWriteQueueCap(size_t to) {
692         compactionWriteQueueCap = to;
693     }
694
695     void setCompactionExpMemThreshold(size_t to) {
696         compactionExpMemThreshold = static_cast<double>(to) / 100.0;
697     }
698
699     bool compactionCanExpireItems() {
700         // Process expired items only if memory usage is lesser than
701         // compaction_exp_mem_threshold and disk queue is small
702         // enough (marked by replication_throttle_queue_cap)
703
704         bool isMemoryUsageOk = (stats.getTotalMemoryUsed() <
705                           (stats.getMaxDataSize() * compactionExpMemThreshold));
706
707         size_t queueSize = stats.diskQueueSize.load();
708         bool isQueueSizeOk = ((stats.replicationThrottleWriteQueueCap == -1) ||
709              (queueSize < static_cast<size_t>(stats.replicationThrottleWriteQueueCap)));
710
711         return (isMemoryUsageOk && isQueueSizeOk);
712     }
713
714     void setCursorDroppingLowerUpperThresholds(size_t maxSize);
715
716     bool isAccessScannerEnabled() {
717         LockHolder lh(accessScanner.mutex);
718         return accessScanner.enabled;
719     }
720
721     bool isExpPagerEnabled() {
722         LockHolder lh(expiryPager.mutex);
723         return expiryPager.enabled;
724     }
725
726     //Check if there were any out-of-memory errors during warmup
727     bool isWarmupOOMFailure(void);
728
729     size_t getActiveResidentRatio() const;
730
731     size_t getReplicaResidentRatio() const;
732     /*
733      * Change the max_cas of the specified vbucket to cas without any
734      * care for the data or ongoing operations...
735      */
736     ENGINE_ERROR_CODE forceMaxCas(uint16_t vbucket, uint64_t cas);
737
738     /**
739      * Create a VBucket object appropriate for this Bucket class.
740      */
741     virtual VBucketPtr makeVBucket(
742             VBucket::id_type id,
743             vbucket_state_t state,
744             KVShard* shard,
745             std::unique_ptr<FailoverTable> table,
746             NewSeqnoCallback newSeqnoCb,
747             vbucket_state_t initState = vbucket_state_dead,
748             int64_t lastSeqno = 0,
749             uint64_t lastSnapStart = 0,
750             uint64_t lastSnapEnd = 0,
751             uint64_t purgeSeqno = 0,
752             uint64_t maxCas = 0,
753             const std::string& collectionsManifest = "") = 0;
754
755 protected:
756     // During the warmup phase we might want to enable external traffic
757     // at a given point in time.. The LoadStorageKvPairCallback will be
758     // triggered whenever we want to check if we could enable traffic..
759     friend class LoadStorageKVPairCallback;
760
761     // Methods called during warmup
762     std::vector<vbucket_state *> loadVBucketState();
763
764     void warmupCompleted();
765     void stopWarmup(void);
766
767     /**
768      * Compaction of a database file
769      *
770      * @param ctx Context for compaction hooks
771      */
772     void compactInternal(compaction_ctx *ctx);
773
774     void scheduleVBDeletion(VBucketPtr &vb,
775                             const void* cookie,
776                             double delay = 0);
777
778     void flushOneDeleteAll(void);
779     PersistenceCallback* flushOneDelOrSet(const queued_item &qi,
780                                           VBucketPtr &vb);
781
782     GetValue getInternal(const DocKey& key, uint16_t vbucket, const void *cookie,
783                          vbucket_state_t allowedState,
784                          get_options_t options = TRACK_REFERENCE);
785
786     bool resetVBucket_UNLOCKED(uint16_t vbid, LockHolder& vbset);
787
788     ENGINE_ERROR_CODE setVBucketState_UNLOCKED(uint16_t vbid, vbucket_state_t state,
789                                                bool transfer, bool notify_dcp,
790                                                LockHolder& vbset);
791
792     /* Notify flusher of a new seqno being added in the vbucket */
793     virtual void notifyFlusher(const uint16_t vbid);
794
795     /* Notify replication of a new seqno being added in the vbucket */
796     void notifyReplication(const uint16_t vbid, const int64_t bySeqno);
797
798     /// Helper method from initialize() to setup the expiry pager
799     void initializeExpiryPager(Configuration& config);
800
801     /// Factory method to create a VBucket count visitor of the correct type.
802     virtual std::unique_ptr<VBucketCountVisitor> makeVBCountVisitor(
803             vbucket_state_t state);
804
805     /**
806      * Helper method used by getAggregatedVBucketStats to output aggregated
807      * bucket stats.
808      */
809     virtual void appendAggregatedVBucketStats(VBucketCountVisitor& active,
810                                               VBucketCountVisitor& replica,
811                                               VBucketCountVisitor& pending,
812                                               VBucketCountVisitor& dead,
813                                               const void* cookie,
814                                               ADD_STAT add_stat);
815
816     friend class Warmup;
817     friend class PersistenceCallback;
818
819     EventuallyPersistentEngine     &engine;
820     EPStats                        &stats;
821     std::unique_ptr<Warmup> warmupTask;
822     VBucketMap                      vbMap;
823     ExTask itemPagerTask;
824     ExTask                          chkTask;
825     float                           bfilterResidencyThreshold;
826     ExTask                          defragmenterTask;
827
828     size_t                          compactionWriteQueueCap;
829     float                           compactionExpMemThreshold;
830
831     /* Array of mutexes for each vbucket
832      * Used by flush operations: flushVB, deleteVB, compactVB, snapshotVB */
833     std::mutex                          *vb_mutexes;
834     std::deque<MutationLog>       accessLog;
835
836     std::atomic<bool> diskDeleteAll;
837     struct DeleteAllTaskCtx {
838         DeleteAllTaskCtx() : delay(true), cookie(NULL) {
839         }
840         std::atomic<bool> delay;
841         const void* cookie;
842     } deleteAllTaskCtx;
843
844     std::mutex vbsetMutex;
845     uint32_t bgFetchDelay;
846     double backfillMemoryThreshold;
847     struct ExpiryPagerDelta {
848         ExpiryPagerDelta() : sleeptime(0), task(0), enabled(true) {}
849         std::mutex mutex;
850         size_t sleeptime;
851         size_t task;
852         bool enabled;
853     } expiryPager;
854     struct ALogTask {
855         ALogTask() : sleeptime(0), task(0), lastTaskRuntime(gethrtime()),
856                      enabled(true) {}
857         std::mutex mutex;
858         size_t sleeptime;
859         size_t task;
860         hrtime_t lastTaskRuntime;
861         bool enabled;
862     } accessScanner;
863     struct ResidentRatio {
864         std::atomic<size_t> activeRatio;
865         std::atomic<size_t> replicaRatio;
866     } cachedResidentRatio;
867     size_t statsSnapshotTaskId;
868     std::atomic<size_t> lastTransTimePerItem;
869     item_eviction_policy_t eviction_policy;
870
871     std::mutex compactionLock;
872     std::list<CompTaskEntry> compactionTasks;
873
874     friend class KVBucketTest;
875
876     DISALLOW_COPY_AND_ASSIGN(KVBucket);
877 };
878
879 /**
880  * Callback for notifying clients of the Bucket (KVBucket) about a new item in
881  * the partition (Vbucket).
882  */
883 class NotifyNewSeqnoCB : public Callback<const uint16_t, const VBNotifyCtx&> {
884 public:
885     NotifyNewSeqnoCB(KVBucket& kvb) : kvBucket(kvb) {
886     }
887
888     void callback(const uint16_t& vbid, const VBNotifyCtx& notifyCtx) {
889         kvBucket.notifyNewSeqno(vbid, notifyCtx);
890     }
891
892 private:
893     KVBucket& kvBucket;
894 };