MB-19246: Fix potentially incorrect persist_time in OBSERVE response
[ep-engine.git] / src / ep.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_EP_H_
19 #define SRC_EP_H_ 1
20
21 #include "config.h"
22
23 #include <memcached/engine.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <time.h>
27
28 #include <algorithm>
29 #include <iostream>
30 #include <limits>
31 #include <list>
32 #include <map>
33 #include <queue>
34 #include <set>
35 #include <stdexcept>
36 #include <string>
37 #include <utility>
38 #include <vector>
39
40 #include "atomic.h"
41 #include "bgfetcher.h"
42 #include "item_pager.h"
43 #include "kvstore.h"
44 #include "locks.h"
45 #include "executorpool.h"
46 #include "stats.h"
47 #include "stored-value.h"
48 #include "vbucket.h"
49 #include "vbucketmap.h"
50
51 /**
52  * vbucket-aware hashtable visitor.
53  */
54 class VBucketVisitor : public HashTableVisitor {
55 public:
56
57     VBucketVisitor() : HashTableVisitor() { }
58
59     VBucketVisitor(const VBucketFilter &filter) :
60         HashTableVisitor(), vBucketFilter(filter) { }
61
62     /**
63      * Begin visiting a bucket.
64      *
65      * @param vb the vbucket we are beginning to visit
66      *
67      * @return true iff we want to walk the hashtable in this vbucket
68      */
69     virtual bool visitBucket(RCPtr<VBucket> &vb) {
70         if (vBucketFilter(vb->getId())) {
71             currentBucket = vb;
72             return true;
73         }
74         return false;
75     }
76
77     // This is unused in all implementations so far.
78     void visit(StoredValue* v) {
79         (void)v;
80         abort();
81     }
82
83     const VBucketFilter &getVBucketFilter() {
84         return vBucketFilter;
85     }
86
87     /**
88      * Called after all vbuckets have been visited.
89      */
90     virtual void complete() { }
91
92     /**
93      * Return true if visiting vbuckets should be paused temporarily.
94      */
95     virtual bool pauseVisitor() {
96         return false;
97     }
98
99 protected:
100     VBucketFilter vBucketFilter;
101     RCPtr<VBucket> currentBucket;
102 };
103
104 // Forward declaration
105 class BGFetchCallback;
106 class ConflictResolution;
107 class EventuallyPersistentStore;
108 class Flusher;
109 class MutationLog;
110 class PersistenceCallback;
111 class Warmup;
112
113 /**
114  * VBucket visitor callback adaptor.
115  */
116 class VBCBAdaptor : public GlobalTask {
117 public:
118
119     VBCBAdaptor(EventuallyPersistentStore *s,
120                 shared_ptr<VBucketVisitor> v, const char *l, const Priority &p,
121                 double sleep=0);
122
123     std::string getDescription() {
124         std::stringstream rv;
125         rv << label << " on vb " << currentvb;
126         return rv.str();
127     }
128
129     bool run(void);
130
131 private:
132     std::queue<uint16_t>        vbList;
133     EventuallyPersistentStore  *store;
134     shared_ptr<VBucketVisitor>  visitor;
135     const char                 *label;
136     double                      sleepTime;
137     uint16_t                    currentvb;
138
139     DISALLOW_COPY_AND_ASSIGN(VBCBAdaptor);
140 };
141
142
143 /**
144  * Vbucket visitor task for a generic scheduler.
145  */
146 class VBucketVisitorTask : public GlobalTask {
147 public:
148
149     VBucketVisitorTask(EventuallyPersistentStore *s,
150                        shared_ptr<VBucketVisitor> v, uint16_t sh,
151                        const char *l, double sleep=0, bool shutdown=true);
152
153     std::string getDescription() {
154         std::stringstream rv;
155         rv << label << " on vb " << currentvb;
156         return rv.str();
157     }
158
159     bool run();
160
161 private:
162     std::queue<uint16_t>         vbList;
163     EventuallyPersistentStore   *store;
164     shared_ptr<VBucketVisitor>   visitor;
165     const char                  *label;
166     double                       sleepTime;
167     uint16_t                     currentvb;
168     uint16_t                     shardID;
169 };
170
171 const uint16_t EP_PRIMARY_SHARD = 0;
172 class KVShard;
173
174 typedef std::pair<uint16_t, ExTask> CompTaskEntry;
175
176 /**
177  * Manager of all interaction with the persistence.
178  */
179 class EventuallyPersistentStore {
180 public:
181
182     EventuallyPersistentStore(EventuallyPersistentEngine &theEngine);
183     ~EventuallyPersistentStore();
184
185     bool initialize();
186
187     /**
188      * Set an item in the store.
189      * @param item the item to set
190      * @param cookie the cookie representing the client to store the item
191      * @param force override access to the vbucket even if the state of the
192      *              vbucket would deny mutations.
193      * @param nru the nru bit value for the item
194      * @return the result of the store operation
195      */
196     ENGINE_ERROR_CODE set(const Item &item,
197                           const void *cookie,
198                           bool force = false,
199                           uint8_t nru = 0xff);
200
201     /**
202      * Add an item in the store.
203      * @param item the item to add
204      * @param cookie the cookie representing the client to store the item
205      * @return the result of the operation
206      */
207     ENGINE_ERROR_CODE add(const Item &item, const void *cookie);
208
209     /**
210      * Replace an item in the store.
211      * @param item the item to replace
212      * @param cookie the cookie representing the client to store the item
213      * @return the result of the operation
214      */
215     ENGINE_ERROR_CODE replace(const Item &item, const void *cookie);
216
217     /**
218      * Add an TAP backfill item into its corresponding vbucket
219      * @param item the item to be added
220      * @param meta contains meta info or not
221      * @param nru the nru bit for the item
222      * @return the result of the operation
223      */
224     ENGINE_ERROR_CODE addTAPBackfillItem(const Item &item, uint8_t nru = 0xff,
225                                          bool genBySeqno = true);
226
227     /**
228      * Retrieve a value.
229      *
230      * @param key the key to fetch
231      * @param vbucket the vbucket from which to retrieve the key
232      * @param cookie the connection cookie
233      * @param queueBG if true, automatically queue a background fetch if necessary
234      * @param honorStates if false, fetch a result regardless of state
235      * @param trackReference true if we want to set the nru bit for the item
236      *
237      * @return a GetValue representing the result of the request
238      */
239     GetValue get(const std::string &key, uint16_t vbucket,
240                  const void *cookie, bool queueBG=true,
241                  bool honorStates=true, bool trackReference=true) {
242         return getInternal(key, vbucket, cookie, queueBG, honorStates,
243                            vbucket_state_active, trackReference);
244     }
245
246     GetValue getRandomKey(void);
247
248     /**
249      * Retrieve a value from a vbucket in replica state.
250      *
251      * @param key the key to fetch
252      * @param vbucket the vbucket from which to retrieve the key
253      * @param cookie the connection cookie
254      * @param queueBG if true, automatically queue a background fetch if necessary
255      *
256      * @return a GetValue representing the result of the request
257      */
258     GetValue getReplica(const std::string &key, uint16_t vbucket,
259                         const void *cookie, bool queueBG=true) {
260         return getInternal(key, vbucket, cookie, queueBG, true,
261                            vbucket_state_replica);
262     }
263
264
265     /**
266      * Retrieve the meta data for an item
267      *
268      * @parapm key the key to get the meta data for
269      * @param vbucket the vbucket from which to retrieve the key
270      * @param cookie the connection cookie
271      * @param metadata where to store the meta informaion
272      * @param true if we want to set the nru bit for the item
273      * @param deleted specifies whether or not the key is deleted
274      */
275     ENGINE_ERROR_CODE getMetaData(const std::string &key,
276                                   uint16_t vbucket,
277                                   const void *cookie,
278                                   ItemMetaData &metadata,
279                                   uint32_t &deleted,
280                                   bool trackReference = false);
281
282     /**
283      * Set an item in the store.
284      * @param item the item to set
285      * @param cas value to match
286      * @param cookie the cookie representing the client to store the item
287      * @param force override vbucket states
288      * @param allowExisting set to false if you want set to fail if the
289      *                      item exists already
290      * @param nru the nru bit for the item
291      * @param isReplication set to true if we are to use replication
292      *                      throttle threshold
293      * @return the result of the store operation
294      */
295     ENGINE_ERROR_CODE setWithMeta(const Item &item,
296                                   uint64_t cas,
297                                   const void *cookie,
298                                   bool force,
299                                   bool allowReplace,
300                                   uint8_t nru = 0xff,
301                                   bool genBySeqno = true,
302                                   bool isReplication = false);
303
304     /**
305      * Retrieve a value, but update its TTL first
306      *
307      * @param key the key to fetch
308      * @param vbucket the vbucket from which to retrieve the key
309      * @param cookie the connection cookie
310      * @param exptime the new expiry time for the object
311      *
312      * @return a GetValue representing the result of the request
313      */
314     GetValue getAndUpdateTtl(const std::string &key, uint16_t vbucket,
315                              const void *cookie, time_t exptime);
316
317     /**
318      * Retrieve an item from the disk for vkey stats
319      *
320      * @param key the key to fetch
321      * @param vbucket the vbucket from which to retrieve the key
322      * @param cookie the connection cookie
323      * @param cb callback to return an item fetched from the disk
324      *
325      * @return a status resulting form executing the method
326      */
327     ENGINE_ERROR_CODE statsVKey(const std::string &key,
328                                 uint16_t vbucket,
329                                 const void *cookie);
330
331     void completeStatsVKey(const void* cookie, std::string &key, uint16_t vbid,
332                            uint64_t bySeqNum);
333
334     protocol_binary_response_status evictKey(const std::string &key,
335                                              uint16_t vbucket,
336                                              const char **msg,
337                                              size_t *msg_size,
338                                              bool force=false);
339
340     /**
341      * delete an item in the store.
342      * @param key the key of the item
343      * @param cas the CAS ID for a CASed delete (0 to override)
344      * @param vbucket the vbucket for the key
345      * @param cookie the cookie representing the client
346      * @param force override access to the vbucket even if the state of the
347      *              vbucket would deny mutations.
348      * @param itemMeta the pointer to the metadata memory.
349      * @param tapBackfill true if an item deletion is from TAP backfill stream
350      *
351      * @param isReplication set to true if we are to use replication
352      *                      throttle threshold
353      * @return the result of the delete operation
354      */
355     ENGINE_ERROR_CODE deleteItem(const std::string &key,
356                                  uint64_t* cas,
357                                  uint16_t vbucket,
358                                  const void *cookie,
359                                  bool force,
360                                  ItemMetaData *itemMeta,
361                                  bool tapBackfill=false);
362
363     ENGINE_ERROR_CODE deleteWithMeta(const std::string &key,
364                                      uint64_t* cas,
365                                      uint16_t vbucket,
366                                      const void *cookie,
367                                      bool force,
368                                      ItemMetaData *itemMeta,
369                                      bool tapBackfill=false,
370                                      bool genBySeqno=true,
371                                      uint64_t bySeqno=0,
372                                      bool isReplication=false);
373
374     void reset();
375
376     /**
377      * Set the background fetch delay.
378      *
379      * This exists for debugging and testing purposes.  It
380      * artificially injects delays into background fetches that are
381      * performed when the user requests an item whose value is not
382      * currently resident.
383      *
384      * @param to how long to delay before performing a bg fetch
385      */
386     void setBGFetchDelay(uint32_t to) {
387         bgFetchDelay = to;
388     }
389
390     double getBGFetchDelay(void) { return (double)bgFetchDelay; }
391
392     void stopFlusher(void);
393
394     bool startFlusher(void);
395
396     bool pauseFlusher(void);
397     bool resumeFlusher(void);
398     void wakeUpFlusher(void);
399
400     bool startBgFetcher(void);
401     void stopBgFetcher(void);
402
403     /**
404      * Takes a snapshot of the current stats and persists them to disk.
405      */
406     void snapshotStats(void);
407
408     /**
409      * Enqueue a background fetch for a key.
410      *
411      * @param key the key to be bg fetched
412      * @param vbucket the vbucket in which the key lives
413      * @param rowid the rowid of the record within its shard
414      * @param cookie the cookie of the requestor
415      * @param type whether the fetch is for a non-resident value or metadata of
416      *             a (possibly) deleted item
417      */
418     void bgFetch(const std::string &key,
419                  uint16_t vbucket,
420                  uint64_t rowid,
421                  const void *cookie,
422                  bool isMeta = false);
423
424     /**
425      * Complete a background fetch of a non resident value or metadata.
426      *
427      * @param key the key that was fetched
428      * @param vbucket the vbucket in which the key lived
429      * @param rowid the rowid of the record within its shard
430      * @param cookie the cookie of the requestor
431      * @param init the timestamp of when the request came in
432      * @param type whether the fetch is for a non-resident value or metadata of
433      *             a (possibly) deleted item
434      */
435     void completeBGFetch(const std::string &key,
436                          uint16_t vbucket,
437                          uint64_t rowid,
438                          const void *cookie,
439                          hrtime_t init,
440                          bool isMeta);
441     /**
442      * Complete a batch of background fetch of a non resident value or metadata.
443      *
444      * @param vbId the vbucket in which the requested key lived
445      * @param fetchedItems vector of completed background feches containing key,
446      *                     value, client cookies
447      * @param start the time when the background fetch was started
448      *
449      */
450     void completeBGFetchMulti(uint16_t vbId,
451                               std::vector<bgfetched_item_t> &fetchedItems,
452                               hrtime_t start);
453
454     /**
455      * Helper function to update stats after completion of a background fetch
456      * for either the value of metadata of a key.
457      *
458      * @param init the time of epstore's initialization
459      * @param start the time when the background fetch was started
460      * @param stop the time when the background fetch completed
461      */
462     void updateBGStats(const hrtime_t init,
463                        const hrtime_t start,
464                        const hrtime_t stop);
465
466     RCPtr<VBucket> getVBucket(uint16_t vbid) {
467         return vbMap.getBucket(vbid);
468     }
469
470     uint64_t getLastPersistedCheckpointId(uint16_t vb) {
471         return vbMap.getPersistenceCheckpointId(vb);
472     }
473
474     uint64_t getLastPersistedSeqno(uint16_t vb) {
475         return vbMap.getPersistenceSeqno(vb);
476     }
477
478     void snapshotVBuckets(const Priority &priority, uint16_t shardId);
479
480     /* transfer should be set to true *only* if this vbucket is becoming master
481      * as the result of the previous master cleanly handing off contorol. */
482     ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t state,
483                                       bool transfer, bool notify_dcp = true);
484
485     /**
486      * Physically deletes a VBucket from disk. This function should only
487      * be called on a VBucket that has already been logically deleted.
488      *
489      * @param vbid vbucket id
490      * @param cookie The connection that requested the deletion
491      */
492     bool completeVBucketDeletion(uint16_t vbid, const void* cookie);
493
494     /**
495      * Deletes a vbucket
496      *
497      * @param vbid The vbucket to delete.
498      * @param c The cookie for this connection.
499      *          Used in synchronous bucket deletes
500      *          to notify the connection of operation completion.
501      */
502     ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL);
503
504     /**
505      * Triggers compaction of a vbucket
506      *
507      * @param vbid The vbucket to compact.
508      * @param c The context for compaction of a DB file
509      * @param ck cookie used to notify connection of operation completion
510      */
511     ENGINE_ERROR_CODE compactDB(uint16_t vbid, compaction_ctx c,
512                                 const void *ck);
513
514     /**
515      * Callback to do the compaction of a vbucket
516      *
517      * @param vbid The Id of the VBucket which needs to be compacted
518      * @param ctx Context for couchstore compaction hooks
519      * @param ck cookie used to notify connection of operation completion
520      */
521     bool compactVBucket(const uint16_t vbid, compaction_ctx *ctx,
522                         const void *ck);
523
524     /**
525      * Reset a given vbucket from memory and disk. This differs from vbucket deletion in that
526      * it does not delete the vbucket instance from memory hash table.
527      */
528     bool resetVBucket(uint16_t vbid);
529
530     void visit(VBucketVisitor &visitor);
531
532     /**
533      * Run a vbucket visitor with separate jobs per vbucket.
534      *
535      * Note that this is asynchronous.
536      */
537     size_t visit(shared_ptr<VBucketVisitor> visitor, const char *lbl,
538                task_type_t taskGroup, const Priority &prio,
539                double sleepTime=0) {
540         return ExecutorPool::get()->schedule(new VBCBAdaptor(this, visitor,
541                                              lbl, prio, sleepTime), taskGroup);
542     }
543
544     const Flusher* getFlusher(uint16_t shardId);
545     Warmup* getWarmup(void) const;
546
547     ENGINE_ERROR_CODE getKeyStats(const std::string &key, uint16_t vbucket,
548                                   const void* cookie, key_stats &kstats,
549                                   bool bgfetch, bool wantsDeleted=false);
550
551     std::string validateKey(const std::string &key,  uint16_t vbucket,
552                             Item &diskItem);
553
554     bool getLocked(const std::string &key, uint16_t vbucket,
555                    Callback<GetValue> &cb,
556                    rel_time_t currentTime, uint32_t lockTimeout,
557                    const void *cookie);
558
559     ENGINE_ERROR_CODE unlockKey(const std::string &key,
560                                 uint16_t vbucket,
561                                 uint64_t cas,
562                                 rel_time_t currentTime);
563
564
565     KVStore* getRWUnderlying(uint16_t vbId) {
566         return vbMap.getShard(vbId)->getRWUnderlying();
567     }
568
569     KVStore* getRWUnderlyingByShard(size_t shardId) {
570         return vbMap.shards[shardId]->getRWUnderlying();
571     }
572
573     KVStore* getROUnderlyingByShard(size_t shardId) {
574         return vbMap.shards[shardId]->getROUnderlying();
575     }
576
577     KVStore* getROUnderlying(uint16_t vbId) {
578         return vbMap.getShard(vbId)->getROUnderlying();
579     }
580
581     void deleteExpiredItem(uint16_t, std::string &, time_t, uint64_t );
582     void deleteExpiredItems(std::list<std::pair<uint16_t, std::string> > &);
583
584
585     /**
586      * Get the memoized storage properties from the DB.kv
587      */
588     const StorageProperties getStorageProperties() const {
589         return *storageProperties;
590     }
591
592     /**
593      * schedule a vb_state snapshot task for all the shards.
594      */
595     bool scheduleVBSnapshot(const Priority &priority);
596
597     /**
598      * schedule a vb_state snapshot task for a given shard.
599      */
600     void scheduleVBSnapshot(const Priority &priority, uint16_t shardId,
601                             bool force = false);
602
603     /**
604      * Schedule a vbstate persistence task for a given vbucket.
605      */
606     void scheduleVBStatePersist(const Priority &priority, uint16_t vbid,
607                                 bool force = false);
608
609     /**
610      * Persist a vbucket's state.
611      */
612     bool persistVBState(const Priority &priority, uint16_t vbid);
613
614     const VBucketMap &getVBuckets() {
615         return vbMap;
616     }
617
618     EventuallyPersistentEngine& getEPEngine() {
619         return engine;
620     }
621
622     size_t getExpiryPagerSleeptime(void) {
623         LockHolder lh(expiryPager.mutex);
624         return expiryPager.sleeptime;
625     }
626
627     size_t getTransactionTimePerItem() {
628         return lastTransTimePerItem.load();
629     }
630
631     bool isFlushAllScheduled() {
632         return diskFlushAll.load();
633     }
634
635     void setBackfillMemoryThreshold(double threshold);
636
637     void setExpiryPagerSleeptime(size_t val);
638
639     void enableAccessScannerTask();
640     void disableAccessScannerTask();
641     void setAccessScannerSleeptime(size_t val);
642     void resetAccessScannerStartTime();
643
644     void resetAccessScannerTasktime() {
645         accessScanner.lastTaskRuntime = gethrtime();
646     }
647
648     void incExpirationStat(RCPtr<VBucket> &vb, bool byPager = true) {
649         if (byPager) {
650             ++stats.expired_pager;
651         } else {
652             ++stats.expired_access;
653         }
654         ++vb->numExpiredItems;
655     }
656
657     void logQTime(type_id_t taskType, hrtime_t enqTime) {
658         stats.schedulingHisto[taskType].add(enqTime);
659     }
660
661     void logRunTime(type_id_t taskType, hrtime_t runTime) {
662         stats.taskRuntimeHisto[taskType].add(runTime);
663     }
664
665     bool multiBGFetchEnabled() {
666         return storageProperties->hasEfficientGet();
667     }
668
669     void updateCachedResidentRatio(size_t activePerc, size_t replicaPerc) {
670         cachedResidentRatio.activeRatio.store(activePerc);
671         cachedResidentRatio.replicaRatio.store(replicaPerc);
672     }
673
674     bool isWarmingUp();
675
676     bool maybeEnableTraffic(void);
677
678     /**
679      * Checks the memory consumption.
680      * To be used by backfill tasks (tap & dcp).
681      */
682     bool isMemoryUsageTooHigh();
683
684     /**
685      * Flushes all items waiting for persistence in a given vbucket
686      * @param vbid The id of the vbucket to flush
687      * @return The amount of items flushed
688      */
689     int flushVBucket(uint16_t vbid);
690
691     void addKVStoreStats(ADD_STAT add_stat, const void* cookie);
692
693     void addKVStoreTimingStats(ADD_STAT add_stat, const void* cookie);
694
695     void resetUnderlyingStats(void);
696     KVStore *getOneROUnderlying(void);
697     KVStore *getOneRWUnderlying(void);
698
699     item_eviction_policy_t getItemEvictionPolicy(void) const {
700         return eviction_policy;
701     }
702
703     ENGINE_ERROR_CODE rollback(uint16_t vbid, uint64_t rollbackSeqno);
704
705     ExTask &fetchItemPagerTask() {
706         return itmpTask;
707     }
708
709     void wakeUpCheckpointRemover() {
710         if (chkTask->getState() == TASK_SNOOZED) {
711             ExecutorPool::get()->wake(chkTask->getId());
712         }
713     }
714
715     void setCompactionWriteQueueCap(size_t to) {
716         compactionWriteQueueCap = to;
717     }
718
719     void setCompactionExpMemThreshold(size_t to) {
720         compactionExpMemThreshold = static_cast<double>(to) / 100.0;
721     }
722
723     bool compactionCanExpireItems() {
724         // Process expired items only if memory usage is lesser than
725         // compaction_exp_mem_threshold and disk queue is small
726         // enough (marked by tap_throttle_queue_cap)
727
728         bool isMemoryUsageOk = (stats.getTotalMemoryUsed() <
729                           (stats.getMaxDataSize() * compactionExpMemThreshold));
730
731         size_t queueSize = stats.diskQueueSize.load();
732         bool isQueueSizeOk = ((stats.tapThrottleWriteQueueCap == -1) ||
733              (queueSize < static_cast<size_t>(stats.tapThrottleWriteQueueCap)));
734
735         return (isMemoryUsageOk && isQueueSizeOk);
736     }
737
738 protected:
739     // During the warmup phase we might want to enable external traffic
740     // at a given point in time.. The LoadStorageKvPairCallback will be
741     // triggered whenever we want to check if we could enable traffic..
742     friend class LoadStorageKVPairCallback;
743
744     // Methods called during warmup
745     std::vector<vbucket_state *> loadVBucketState();
746
747     void warmupCompleted();
748     void stopWarmup(void);
749
750 private:
751
752     void scheduleVBDeletion(RCPtr<VBucket> &vb,
753                             const void* cookie,
754                             double delay = 0);
755
756     RCPtr<VBucket> getVBucket(uint16_t vbid, vbucket_state_t wanted_state);
757
758     /* Queue an item for persistence and replication
759      *
760      * The caller of this function must hold the lock of the hash table
761      * partition that contains the StoredValue being Queued.
762      *
763      * @param vb the vbucket that contains the dirty item
764      * @param v the dirty item
765      * @param plh the pointer to the hash table partition lock for the dirty item.
766      *        Note that the lock is released inside this function.
767      * @param tapBackfill if the item is from backfill replication
768      * @param notifyReplicator whether or not to notify the replicator
769      */
770     void queueDirty(RCPtr<VBucket> &vb,
771                     StoredValue* v,
772                     LockHolder *plh,
773                     bool tapBackfill = false,
774                     bool notifyReplicator = true,
775                     bool genBySeqno = true);
776
777     /**
778      * Retrieve a StoredValue and invoke a method on it.
779      *
780      * Note that because of complications with void/non-void methods
781      * and potentially missing StoredValues along with the way I
782      * actually intend to use this, I don't return any values from
783      * this.
784      *
785      * @param key the item's key to retrieve
786      * @param vbid the vbucket containing the item
787      * @param f the method to invoke on the item
788      *
789      * @return true if the object was found and method was invoked
790      */
791     bool invokeOnLockedStoredValue(const std::string &key, uint16_t vbid,
792                                    void (StoredValue::* f)()) {
793         RCPtr<VBucket> vb = getVBucket(vbid);
794         if (!vb) {
795             return false;
796         }
797
798         int bucket_num(0);
799         LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
800         StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true);
801
802         if (v) {
803             std::mem_fun(f)(v);
804         }
805         return v != NULL;
806     }
807
808     void flushOneDeleteAll(void);
809     PersistenceCallback* flushOneDelOrSet(const queued_item &qi,
810                                           RCPtr<VBucket> &vb);
811
812     StoredValue *fetchValidValue(RCPtr<VBucket> &vb, const std::string &key,
813                                  int bucket_num, bool wantsDeleted=false,
814                                  bool trackReference=true, bool queueExpired=true);
815
816     GetValue getInternal(const std::string &key, uint16_t vbucket,
817                          const void *cookie, bool queueBG,
818                          bool honorStates,
819                          vbucket_state_t allowedState,
820                          bool trackReference=true);
821
822     ENGINE_ERROR_CODE addTempItemForBgFetch(LockHolder &lock, int bucket_num,
823                                             const std::string &key, RCPtr<VBucket> &vb,
824                                             const void *cookie, bool metadataOnly,
825                                             bool isReplication = false);
826
827     friend class Warmup;
828     friend class Flusher;
829     friend class BGFetchCallback;
830     friend class VKeyStatBGFetchCallback;
831     friend class PersistenceCallback;
832     friend class Deleter;
833     friend class VBCBAdaptor;
834     friend class VBucketVisitorTask;
835     friend class ItemPager;
836     friend class PagingVisitor;
837
838     EventuallyPersistentEngine     &engine;
839     EPStats                        &stats;
840     StorageProperties              *storageProperties;
841     Warmup                         *warmupTask;
842     ConflictResolution             *conflictResolver;
843     VBucketMap                      vbMap;
844     ExTask                          itmpTask;
845     ExTask                          chkTask;
846
847     size_t                          compactionWriteQueueCap;
848     float                           compactionExpMemThreshold;
849
850     /* Array of mutexes for each vbucket
851      * Used by flush operations: flushVB, deleteVB, compactVB, snapshotVB */
852     Mutex                          *vb_mutexes;
853     AtomicValue<bool>              *schedule_vbstate_persist;
854     std::vector<MutationLog*>       accessLog;
855
856     AtomicValue<size_t> bgFetchQueue;
857     AtomicValue<bool> diskFlushAll;
858     Mutex vbsetMutex;
859     uint32_t bgFetchDelay;
860     double backfillMemoryThreshold;
861     struct ExpiryPagerDelta {
862         ExpiryPagerDelta() : sleeptime(0), task(0) {}
863         Mutex mutex;
864         size_t sleeptime;
865         size_t task;
866     } expiryPager;
867     struct ALogTask {
868         ALogTask() : sleeptime(0), task(0), lastTaskRuntime(gethrtime()),
869                      enabled(true) {}
870         Mutex mutex;
871         size_t sleeptime;
872         size_t task;
873         hrtime_t lastTaskRuntime;
874         bool enabled;
875     } accessScanner;
876     struct ResidentRatio {
877         AtomicValue<size_t> activeRatio;
878         AtomicValue<size_t> replicaRatio;
879     } cachedResidentRatio;
880     size_t statsSnapshotTaskId;
881     AtomicValue<size_t> lastTransTimePerItem;
882     item_eviction_policy_t eviction_policy;
883
884     Mutex compactionLock;
885     std::list<CompTaskEntry> compactionTasks;
886
887     DISALLOW_COPY_AND_ASSIGN(EventuallyPersistentStore);
888 };
889
890 #endif  // SRC_EP_H_