Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / ep.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_EP_H_
19 #define SRC_EP_H_ 1
20
21 #include "config.h"
22
23 #include <memcached/engine.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <time.h>
27
28 #include <algorithm>
29 #include <iostream>
30 #include <limits>
31 #include <list>
32 #include <map>
33 #include <queue>
34 #include <set>
35 #include <stdexcept>
36 #include <string>
37 #include <utility>
38 #include <vector>
39
40 #include "atomic.h"
41 #include "bgfetcher.h"
42 #include "item_pager.h"
43 #include "kvstore.h"
44 #include "locks.h"
45 #include "executorpool.h"
46 #include "ext_meta_parser.h"
47 #include "stats.h"
48 #include "stored-value.h"
49 #include "vbucket.h"
50 #include "vbucketmap.h"
51
52 /**
53  * vbucket-aware hashtable visitor.
54  */
55 class VBucketVisitor : public HashTableVisitor {
56 public:
57
58     VBucketVisitor() : HashTableVisitor() { }
59
60     VBucketVisitor(const VBucketFilter &filter) :
61         HashTableVisitor(), vBucketFilter(filter) { }
62
63     /**
64      * Begin visiting a bucket.
65      *
66      * @param vb the vbucket we are beginning to visit
67      *
68      * @return true iff we want to walk the hashtable in this vbucket
69      */
70     virtual bool visitBucket(RCPtr<VBucket> &vb) {
71         if (vBucketFilter(vb->getId())) {
72             currentBucket = vb;
73             return true;
74         }
75         return false;
76     }
77
78     // This is unused in all implementations so far.
79     void visit(StoredValue* v) {
80         (void)v;
81         abort();
82     }
83
84     const VBucketFilter &getVBucketFilter() {
85         return vBucketFilter;
86     }
87
88     /**
89      * Called after all vbuckets have been visited.
90      */
91     virtual void complete() { }
92
93     /**
94      * Return true if visiting vbuckets should be paused temporarily.
95      */
96     virtual bool pauseVisitor() {
97         return false;
98     }
99
100 protected:
101     VBucketFilter vBucketFilter;
102     RCPtr<VBucket> currentBucket;
103 };
104
105 // Forward declaration
106 class BGFetchCallback;
107 class ConflictResolution;
108 class DefragmenterTask;
109 class EventuallyPersistentStore;
110 class Flusher;
111 class MutationLog;
112 class PauseResumeEPStoreVisitor;
113 class PersistenceCallback;
114 class Warmup;
115
116 /**
117  * VBucket visitor callback adaptor.
118  */
119 class VBCBAdaptor : public GlobalTask {
120 public:
121
122     VBCBAdaptor(EventuallyPersistentStore *s,
123                 shared_ptr<VBucketVisitor> v, const char *l, const Priority &p,
124                 double sleep=0);
125
126     std::string getDescription() {
127         std::stringstream rv;
128         rv << label << " on vb " << currentvb.load();
129         return rv.str();
130     }
131
132     bool run(void);
133
134 private:
135     std::queue<uint16_t>        vbList;
136     EventuallyPersistentStore  *store;
137     shared_ptr<VBucketVisitor>  visitor;
138     const char                 *label;
139     double                      sleepTime;
140     AtomicValue<uint16_t>       currentvb;
141
142     DISALLOW_COPY_AND_ASSIGN(VBCBAdaptor);
143 };
144
145
146 /**
147  * Vbucket visitor task for a generic scheduler.
148  */
149 class VBucketVisitorTask : public GlobalTask {
150 public:
151
152     VBucketVisitorTask(EventuallyPersistentStore *s,
153                        shared_ptr<VBucketVisitor> v, uint16_t sh,
154                        const char *l, double sleep=0, bool shutdown=true);
155
156     std::string getDescription() {
157         std::stringstream rv;
158         rv << label << " on vb " << currentvb;
159         return rv.str();
160     }
161
162     bool run();
163
164 private:
165     std::queue<uint16_t>         vbList;
166     EventuallyPersistentStore   *store;
167     shared_ptr<VBucketVisitor>   visitor;
168     const char                  *label;
169     double                       sleepTime;
170     uint16_t                     currentvb;
171     uint16_t                     shardID;
172 };
173
174 const uint16_t EP_PRIMARY_SHARD = 0;
175 class KVShard;
176
177 typedef std::pair<uint16_t, ExTask> CompTaskEntry;
178
179 /**
180  * Manager of all interaction with the persistence.
181  */
182 class EventuallyPersistentStore {
183 public:
184
185     /**
186      * Represents a position within the epStore, used when visiting items.
187      *
188      * Currently opaque (and constant), clients can pass them around but
189      * cannot reposition the iterator.
190      */
191     class Position {
192     public:
193         bool operator==(const Position& other) const {
194             return (vbucket_id == other.vbucket_id);
195         }
196
197     private:
198         Position(uint16_t vbucket_id_) : vbucket_id(vbucket_id_) {}
199
200         uint16_t vbucket_id;
201
202         friend class EventuallyPersistentStore;
203         friend std::ostream& operator<<(std::ostream& os, const Position& pos);
204     };
205
206     EventuallyPersistentStore(EventuallyPersistentEngine &theEngine);
207     ~EventuallyPersistentStore();
208
209     bool initialize();
210
211     /**
212      * Set an item in the store.
213      * @param item the item to set
214      * @param cookie the cookie representing the client to store the item
215      * @param force override access to the vbucket even if the state of the
216      *              vbucket would deny mutations.
217      * @param nru the nru bit value for the item
218      * @return the result of the store operation
219      */
220     ENGINE_ERROR_CODE set(const Item &item,
221                           const void *cookie,
222                           bool force = false,
223                           uint8_t nru = 0xff);
224
225     /**
226      * Add an item in the store.
227      * @param item the item to add
228      * @param cookie the cookie representing the client to store the item
229      * @return the result of the operation
230      */
231     ENGINE_ERROR_CODE add(const Item &item, const void *cookie);
232
233     /**
234      * Replace an item in the store.
235      * @param item the item to replace
236      * @param cookie the cookie representing the client to store the item
237      * @return the result of the operation
238      */
239     ENGINE_ERROR_CODE replace(const Item &item, const void *cookie);
240
241     /**
242      * Add an TAP backfill item into its corresponding vbucket
243      * @param item the item to be added
244      * @param nru the nru bit for the item
245      * @param genBySeqno whether or not to generate sequence number
246      * @return the result of the operation
247      */
248     ENGINE_ERROR_CODE addTAPBackfillItem(const Item &item, uint8_t nru = 0xff,
249                                          bool genBySeqno = true,
250                                          ExtendedMetaData *emd = NULL);
251
252     /**
253      * Retrieve a value.
254      *
255      * @param key the key to fetch
256      * @param vbucket the vbucket from which to retrieve the key
257      * @param cookie the connection cookie
258      * @param queueBG if true, automatically queue a background fetch if necessary
259      * @param honorStates if false, fetch a result regardless of state
260      * @param trackReference true if we want to set the nru bit for the item
261      * @param deleteTempItem true if temporary items need to be deleted
262      * @param hideLockedCAS if true, hide (report as -1) the CAS of locked items.
263      *
264      * @return a GetValue representing the result of the request
265      */
266     GetValue get(const std::string &key, uint16_t vbucket,
267                  const void *cookie, bool queueBG=true,
268                  bool honorStates=true, bool trackReference=true,
269                  bool deleteTempItem=true,
270                  bool hideLockedCAS=true) {
271         return getInternal(key, vbucket, cookie, queueBG, honorStates,
272                            vbucket_state_active, trackReference,
273                            deleteTempItem, hideLockedCAS);
274     }
275
276     GetValue getRandomKey(void);
277
278     /**
279      * Retrieve a value from a vbucket in replica state.
280      *
281      * @param key the key to fetch
282      * @param vbucket the vbucket from which to retrieve the key
283      * @param cookie the connection cookie
284      * @param queueBG if true, automatically queue a background fetch if necessary
285      *
286      * @return a GetValue representing the result of the request
287      */
288     GetValue getReplica(const std::string &key, uint16_t vbucket,
289                         const void *cookie, bool queueBG=true) {
290         return getInternal(key, vbucket, cookie, queueBG, true,
291                            vbucket_state_replica);
292     }
293
294
295     /**
296      * Retrieve the meta data for an item
297      *
298      * @parapm key the key to get the meta data for
299      * @param vbucket the vbucket from which to retrieve the key
300      * @param cookie the connection cookie
301      * @param metadata where to store the meta informaion
302      * @param deleted specifies whether or not the key is deleted
303      * @param confResMode specifies the Conflict Resolution mode for the item
304      * @param trackReference true if we want to set the nru bit for the item
305      */
306     ENGINE_ERROR_CODE getMetaData(const std::string &key,
307                                   uint16_t vbucket,
308                                   const void *cookie,
309                                   ItemMetaData &metadata,
310                                   uint32_t &deleted,
311                                   uint8_t &confResMode,
312                                   bool trackReference = false);
313
314     /**
315      * Set an item in the store.
316      * @param item the item to set
317      * @param cas value to match
318      * @param seqno sequence number of mutation
319      * @param cookie the cookie representing the client to store the item
320      * @param force override vbucket states
321      * @param allowExisting set to false if you want set to fail if the
322      *                      item exists already
323      * @param nru the nru bit for the item
324      * @param genBySeqno whether or not to generate sequence number
325      * @param emd ExtendedMetaData class object that contains any ext meta
326      * @param isReplication set to true if we are to use replication
327      *                      throttle threshold
328      *
329      * @return the result of the store operation
330      */
331     ENGINE_ERROR_CODE setWithMeta(const Item &item,
332                                   uint64_t cas,
333                                   uint64_t *seqno,
334                                   const void *cookie,
335                                   bool force,
336                                   bool allowExisting,
337                                   uint8_t nru = 0xff,
338                                   bool genBySeqno = true,
339                                   ExtendedMetaData *emd = NULL,
340                                   bool isReplication = false);
341
342     /**
343      * Retrieve a value, but update its TTL first
344      *
345      * @param key the key to fetch
346      * @param vbucket the vbucket from which to retrieve the key
347      * @param cookie the connection cookie
348      * @param exptime the new expiry time for the object
349      *
350      * @return a GetValue representing the result of the request
351      */
352     GetValue getAndUpdateTtl(const std::string &key, uint16_t vbucket,
353                              const void *cookie, time_t exptime);
354
355     /**
356      * Retrieve an item from the disk for vkey stats
357      *
358      * @param key the key to fetch
359      * @param vbucket the vbucket from which to retrieve the key
360      * @param cookie the connection cookie
361      * @param cb callback to return an item fetched from the disk
362      *
363      * @return a status resulting form executing the method
364      */
365     ENGINE_ERROR_CODE statsVKey(const std::string &key,
366                                 uint16_t vbucket,
367                                 const void *cookie);
368
369     void completeStatsVKey(const void* cookie, std::string &key, uint16_t vbid,
370                            uint64_t bySeqNum);
371
372     protocol_binary_response_status evictKey(const std::string &key,
373                                              uint16_t vbucket,
374                                              const char **msg,
375                                              size_t *msg_size,
376                                              bool force=false);
377
378     /**
379      * delete an item in the store.
380      * @param key the key of the item
381      * @param cas the CAS ID for a CASed delete (0 to override)
382      * @param vbucket the vbucket for the key
383      * @param cookie the cookie representing the client
384      * @param force override access to the vbucket even if the state of the
385      *              vbucket would deny mutations.
386      * @param itemMeta the pointer to the metadata memory.
387      * @param tapBackfill true if an item deletion is from TAP backfill stream
388      *
389      * (deleteWithMeta)
390      * @param genBySeqno whether or not to generate sequence number
391      * @param emd ExtendedMetaData class object that contains any ext meta
392      * @param isReplication set to true if we are to use replication
393      *                      throttle threshold
394      *
395      * @return the result of the delete operation
396      */
397     ENGINE_ERROR_CODE deleteItem(const std::string &key,
398                                  uint64_t* cas,
399                                  uint16_t vbucket,
400                                  const void *cookie,
401                                  bool force,
402                                  ItemMetaData *itemMeta,
403                                  mutation_descr_t *mutInfo,
404                                  bool tapBackfill=false);
405
406     ENGINE_ERROR_CODE deleteWithMeta(const std::string &key,
407                                      uint64_t* cas,
408                                      uint64_t* seqno,
409                                      uint16_t vbucket,
410                                      const void *cookie,
411                                      bool force,
412                                      ItemMetaData *itemMeta,
413                                      bool tapBackfill=false,
414                                      bool genBySeqno=true,
415                                      uint64_t bySeqno=0,
416                                      ExtendedMetaData *emd = NULL,
417                                      bool isReplication=false);
418
419     void reset();
420
421     /**
422      * Set the background fetch delay.
423      *
424      * This exists for debugging and testing purposes.  It
425      * artificially injects delays into background fetches that are
426      * performed when the user requests an item whose value is not
427      * currently resident.
428      *
429      * @param to how long to delay before performing a bg fetch
430      */
431     void setBGFetchDelay(uint32_t to) {
432         bgFetchDelay = to;
433     }
434
435     double getBGFetchDelay(void) { return (double)bgFetchDelay; }
436
437     void stopFlusher(void);
438
439     bool startFlusher(void);
440
441     bool pauseFlusher(void);
442     bool resumeFlusher(void);
443     void wakeUpFlusher(void);
444
445     bool startBgFetcher(void);
446     void stopBgFetcher(void);
447
448     /**
449      * Takes a snapshot of the current stats and persists them to disk.
450      */
451     void snapshotStats(void);
452
453     /**
454      * Enqueue a background fetch for a key.
455      *
456      * @param key the key to be bg fetched
457      * @param vbucket the vbucket in which the key lives
458      * @param cookie the cookie of the requestor
459      * @param type whether the fetch is for a non-resident value or metadata of
460      *             a (possibly) deleted item
461      */
462     void bgFetch(const std::string &key,
463                  uint16_t vbucket,
464                  const void *cookie,
465                  bool isMeta = false);
466
467     /**
468      * Complete a background fetch of a non resident value or metadata.
469      *
470      * @param key the key that was fetched
471      * @param vbucket the vbucket in which the key lived
472      * @param cookie the cookie of the requestor
473      * @param init the timestamp of when the request came in
474      * @param type whether the fetch is for a non-resident value or metadata of
475      *             a (possibly) deleted item
476      */
477     void completeBGFetch(const std::string &key,
478                          uint16_t vbucket,
479                          const void *cookie,
480                          hrtime_t init,
481                          bool isMeta);
482     /**
483      * Complete a batch of background fetch of a non resident value or metadata.
484      *
485      * @param vbId the vbucket in which the requested key lived
486      * @param fetchedItems vector of completed background feches containing key,
487      *                     value, client cookies
488      * @param start the time when the background fetch was started
489      *
490      */
491     void completeBGFetchMulti(uint16_t vbId,
492                               std::vector<bgfetched_item_t> &fetchedItems,
493                               hrtime_t start);
494
495     /**
496      * Helper function to update stats after completion of a background fetch
497      * for either the value of metadata of a key.
498      *
499      * @param init the time of epstore's initialization
500      * @param start the time when the background fetch was started
501      * @param stop the time when the background fetch completed
502      */
503     void updateBGStats(const hrtime_t init,
504                        const hrtime_t start,
505                        const hrtime_t stop);
506
507     RCPtr<VBucket> getVBucket(uint16_t vbid) {
508         return vbMap.getBucket(vbid);
509     }
510
511     uint64_t getLastPersistedCheckpointId(uint16_t vb) {
512         return vbMap.getPersistenceCheckpointId(vb);
513     }
514
515     uint64_t getLastPersistedSeqno(uint16_t vb) {
516         return vbMap.getPersistenceSeqno(vb);
517     }
518
519     void snapshotVBuckets(const Priority &priority, uint16_t shardId);
520
521     /* transfer should be set to true *only* if this vbucket is becoming master
522      * as the result of the previous master cleanly handing off contorol. */
523     ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t state,
524                                       bool transfer, bool notify_dcp = true);
525
526     /**
527      * Physically deletes a VBucket from disk. This function should only
528      * be called on a VBucket that has already been logically deleted.
529      *
530      * @param vbid vbucket id
531      * @param cookie The connection that requested the deletion
532      */
533     bool completeVBucketDeletion(uint16_t vbid, const void* cookie);
534
535     /**
536      * Deletes a vbucket
537      *
538      * @param vbid The vbucket to delete.
539      * @param c The cookie for this connection.
540      *          Used in synchronous bucket deletes
541      *          to notify the connection of operation completion.
542      */
543     ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL);
544
545     /**
546      * Triggers compaction of a vbucket
547      *
548      * @param vbid The vbucket to compact.
549      * @param c The context for compaction of a DB file
550      * @param ck cookie used to notify connection of operation completion
551      */
552     ENGINE_ERROR_CODE compactDB(uint16_t vbid, compaction_ctx c,
553                                 const void *ck);
554
555     /**
556      * Callback to do the compaction of a vbucket
557      *
558      * @param vbid The Id of the VBucket which needs to be compacted
559      * @param ctx Context for couchstore compaction hooks
560      * @param ck cookie used to notify connection of operation completion
561      */
562     bool compactVBucket(const uint16_t vbid, compaction_ctx *ctx,
563                         const void *ck);
564
565     /**
566      * Reset a given vbucket from memory and disk. This differs from vbucket deletion in that
567      * it does not delete the vbucket instance from memory hash table.
568      */
569     bool resetVBucket(uint16_t vbid);
570
571     /**
572      * Run a vBucket visitor, visiting all items. Synchronous.
573      */
574     void visit(VBucketVisitor &visitor);
575
576     /**
577      * Run a vbucket visitor with separate jobs per vbucket.
578      *
579      * Note that this is asynchronous.
580      */
581     size_t visit(shared_ptr<VBucketVisitor> visitor, const char *lbl,
582                task_type_t taskGroup, const Priority &prio,
583                double sleepTime=0) {
584         return ExecutorPool::get()->schedule(new VBCBAdaptor(this, visitor,
585                                              lbl, prio, sleepTime), taskGroup);
586     }
587
588     /**
589      * Visit the items in this epStore, starting the iteration from the
590      * given startPosition and allowing the visit to be paused at any point.
591      *
592      * During visitation, the visitor object can request that the visit
593      * is stopped after the current item. The position passed to the
594      * visitor can then be used to restart visiting at the *APPROXIMATE*
595      * same position as it paused.
596      * This is approximate as various locks are released when the
597      * function returns, so any changes to the underlying epStore may cause
598      * the visiting to restart at the slightly different place.
599      *
600      * As a consequence, *DO NOT USE THIS METHOD* if you need to guarantee
601      * that all items are visited!
602      *
603      * @param visitor The visitor object.
604      * @return The final epStore position visited; equal to
605      *         EventuallyPersistentStore::end() if all items were visited
606      *         otherwise the position to resume from.
607      */
608     Position pauseResumeVisit(PauseResumeEPStoreVisitor& visitor,
609                               Position& start_pos);
610
611
612     /**
613      * Return a position at the start of the epStore.
614      */
615     Position startPosition() const;
616
617     /**
618      * Return a position at the end of the epStore. Has similar semantics
619      * as STL end() (i.e. one past the last element).
620      */
621     Position endPosition() const;
622
623     const Flusher* getFlusher(uint16_t shardId);
624     Warmup* getWarmup(void) const;
625
626     ENGINE_ERROR_CODE getKeyStats(const std::string &key, uint16_t vbucket,
627                                   const void* cookie, key_stats &kstats,
628                                   bool bgfetch, bool wantsDeleted=false);
629
630     std::string validateKey(const std::string &key,  uint16_t vbucket,
631                             Item &diskItem);
632
633     bool getLocked(const std::string &key, uint16_t vbucket,
634                    Callback<GetValue> &cb,
635                    rel_time_t currentTime, uint32_t lockTimeout,
636                    const void *cookie);
637
638     ENGINE_ERROR_CODE unlockKey(const std::string &key,
639                                 uint16_t vbucket,
640                                 uint64_t cas,
641                                 rel_time_t currentTime);
642
643
644     KVStore* getRWUnderlying(uint16_t vbId) {
645         return vbMap.getShard(vbId)->getRWUnderlying();
646     }
647
648     KVStore* getRWUnderlyingByShard(size_t shardId) {
649         return vbMap.shards[shardId]->getRWUnderlying();
650     }
651
652     KVStore* getROUnderlyingByShard(size_t shardId) {
653         return vbMap.shards[shardId]->getROUnderlying();
654     }
655
656     KVStore* getROUnderlying(uint16_t vbId) {
657         return vbMap.getShard(vbId)->getROUnderlying();
658     }
659
660     void deleteExpiredItem(uint16_t, std::string &, time_t, uint64_t );
661     void deleteExpiredItems(std::list<std::pair<uint16_t, std::string> > &);
662
663
664     /**
665      * Get the memoized storage properties from the DB.kv
666      */
667     const StorageProperties getStorageProperties() const {
668         return *storageProperties;
669     }
670
671     /**
672      * schedule a vb_state snapshot task for all the shards.
673      */
674     bool scheduleVBSnapshot(const Priority &priority);
675
676     /**
677      * schedule a vb_state snapshot task for a given shard.
678      */
679     void scheduleVBSnapshot(const Priority &priority, uint16_t shardId,
680                             bool force = false);
681
682     /**
683      * Schedule a vbstate persistence task for a given vbucket.
684      */
685     void scheduleVBStatePersist(const Priority &priority, uint16_t vbid,
686                                 bool force = false);
687
688     /**
689      * Persist a vbucket's state.
690      */
691     bool persistVBState(const Priority &priority, uint16_t vbid);
692
693     const VBucketMap &getVBuckets() {
694         return vbMap;
695     }
696
697     EventuallyPersistentEngine& getEPEngine() {
698         return engine;
699     }
700
701     size_t getExpiryPagerSleeptime(void) {
702         LockHolder lh(expiryPager.mutex);
703         return expiryPager.sleeptime;
704     }
705
706     size_t getTransactionTimePerItem() {
707         return lastTransTimePerItem.load();
708     }
709
710     bool isFlushAllScheduled() {
711         return diskFlushAll.load();
712     }
713
714     bool scheduleFlushAllTask(const void* cookie, time_t when);
715
716     void setFlushAllComplete();
717
718     void setBackfillMemoryThreshold(double threshold);
719
720     void setExpiryPagerSleeptime(size_t val);
721
722     void enableAccessScannerTask();
723     void disableAccessScannerTask();
724     void setAccessScannerSleeptime(size_t val);
725     void resetAccessScannerStartTime();
726
727     void resetAccessScannerTasktime() {
728         accessScanner.lastTaskRuntime = gethrtime();
729     }
730
731     void setAllBloomFilters(bool to);
732
733     float getBfiltersResidencyThreshold() {
734         return bfilterResidencyThreshold;
735     }
736
737     void setBfiltersResidencyThreshold(float to) {
738         bfilterResidencyThreshold = to;
739     }
740
741     bool isMetaDataResident(RCPtr<VBucket> &vb, const std::string &key);
742
743     void incExpirationStat(RCPtr<VBucket> &vb, bool byPager = true) {
744         if (byPager) {
745             ++stats.expired_pager;
746         } else {
747             ++stats.expired_access;
748         }
749         ++vb->numExpiredItems;
750     }
751
752     void logQTime(type_id_t taskType, hrtime_t enqTime) {
753         stats.schedulingHisto[taskType].add(enqTime);
754     }
755
756     void logRunTime(type_id_t taskType, hrtime_t runTime) {
757         stats.taskRuntimeHisto[taskType].add(runTime);
758     }
759
760     bool multiBGFetchEnabled() {
761         return storageProperties->hasEfficientGet();
762     }
763
764     void updateCachedResidentRatio(size_t activePerc, size_t replicaPerc) {
765         cachedResidentRatio.activeRatio.store(activePerc);
766         cachedResidentRatio.replicaRatio.store(replicaPerc);
767     }
768
769     bool isWarmingUp();
770
771     bool maybeEnableTraffic(void);
772
773     /**
774      * Checks the memory consumption.
775      * To be used by backfill tasks (tap & dcp).
776      */
777     bool isMemoryUsageTooHigh();
778
779     /**
780      * Flushes all items waiting for persistence in a given vbucket
781      * @param vbid The id of the vbucket to flush
782      * @return The amount of items flushed
783      */
784     int flushVBucket(uint16_t vbid);
785
786     void addKVStoreStats(ADD_STAT add_stat, const void* cookie);
787
788     void addKVStoreTimingStats(ADD_STAT add_stat, const void* cookie);
789
790     void resetUnderlyingStats(void);
791     KVStore *getOneROUnderlying(void);
792     KVStore *getOneRWUnderlying(void);
793
794     item_eviction_policy_t getItemEvictionPolicy(void) const {
795         return eviction_policy;
796     }
797
798     ENGINE_ERROR_CODE rollback(uint16_t vbid, uint64_t rollbackSeqno);
799
800     ExTask &fetchItemPagerTask() {
801         return itmpTask;
802     }
803
804     void wakeUpCheckpointRemover() {
805         if (chkTask->getState() == TASK_SNOOZED) {
806             ExecutorPool::get()->wake(chkTask->getId());
807         }
808     }
809
810     void runDefragmenterTask();
811
812     void setCompactionWriteQueueCap(size_t to) {
813         compactionWriteQueueCap = to;
814     }
815
816     void setCompactionExpMemThreshold(size_t to) {
817         compactionExpMemThreshold = static_cast<double>(to) / 100.0;
818     }
819
820     bool compactionCanExpireItems() {
821         // Process expired items only if memory usage is lesser than
822         // compaction_exp_mem_threshold and disk queue is small
823         // enough (marked by tap_throttle_queue_cap)
824
825         bool isMemoryUsageOk = (stats.getTotalMemoryUsed() <
826                           (stats.getMaxDataSize() * compactionExpMemThreshold));
827
828         size_t queueSize = stats.diskQueueSize.load();
829         bool isQueueSizeOk = ((stats.tapThrottleWriteQueueCap == -1) ||
830              (queueSize < static_cast<size_t>(stats.tapThrottleWriteQueueCap)));
831
832         return (isMemoryUsageOk && isQueueSizeOk);
833     }
834
835 protected:
836     // During the warmup phase we might want to enable external traffic
837     // at a given point in time.. The LoadStorageKvPairCallback will be
838     // triggered whenever we want to check if we could enable traffic..
839     friend class LoadStorageKVPairCallback;
840
841     // Methods called during warmup
842     std::vector<vbucket_state *> loadVBucketState();
843
844     void warmupCompleted();
845     void stopWarmup(void);
846
847 private:
848
849     void scheduleVBDeletion(RCPtr<VBucket> &vb,
850                             const void* cookie,
851                             double delay = 0);
852
853     /* Queue an item for persistence and replication
854      *
855      * The caller of this function must hold the lock of the hash table
856      * partition that contains the StoredValue being Queued.
857      *
858      * @param vb the vbucket that contains the dirty item
859      * @param v the dirty item
860      * @param plh the pointer to the hash table partition lock for the dirty item
861      *        Note that the lock is released inside this function
862      * @param seqno sequence number of the mutation
863      * @param tapBackfill if the item is from backfill replication
864      * @param notifyReplicator whether or not to notify the replicator
865      * @param genBySeqno whether or not to generate sequence number
866      * @param setConflictMode set the conflict resolution mode
867      */
868     void queueDirty(RCPtr<VBucket> &vb,
869                     StoredValue* v,
870                     LockHolder *plh,
871                     uint64_t *seqno,
872                     bool tapBackfill = false,
873                     bool notifyReplicator = true,
874                     bool genBySeqno = true,
875                     bool setConflictMode = true);
876
877     /**
878      * Retrieve a StoredValue and invoke a method on it.
879      *
880      * Note that because of complications with void/non-void methods
881      * and potentially missing StoredValues along with the way I
882      * actually intend to use this, I don't return any values from
883      * this.
884      *
885      * @param key the item's key to retrieve
886      * @param vbid the vbucket containing the item
887      * @param f the method to invoke on the item
888      *
889      * @return true if the object was found and method was invoked
890      */
891     bool invokeOnLockedStoredValue(const std::string &key, uint16_t vbid,
892                                    void (StoredValue::* f)()) {
893         RCPtr<VBucket> vb = getVBucket(vbid);
894         if (!vb) {
895             return false;
896         }
897
898         int bucket_num(0);
899         LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
900         StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true);
901
902         if (v) {
903             std::mem_fun(f)(v);
904         }
905         return v != NULL;
906     }
907
908     void flushOneDeleteAll(void);
909     PersistenceCallback* flushOneDelOrSet(const queued_item &qi,
910                                           RCPtr<VBucket> &vb);
911
912     StoredValue *fetchValidValue(RCPtr<VBucket> &vb, const std::string &key,
913                                  int bucket_num, bool wantsDeleted=false,
914                                  bool trackReference=true, bool queueExpired=true);
915
916     GetValue getInternal(const std::string &key, uint16_t vbucket,
917                          const void *cookie, bool queueBG,
918                          bool honorStates,
919                          vbucket_state_t allowedState,
920                          bool trackReference=true,
921                          bool deleteTempItem=true,
922                          bool hideLockedCAS=true);
923
924     ENGINE_ERROR_CODE addTempItemForBgFetch(LockHolder &lock, int bucket_num,
925                                             const std::string &key, RCPtr<VBucket> &vb,
926                                             const void *cookie, bool metadataOnly,
927                                             bool isReplication = false);
928
929     friend class Warmup;
930     friend class Flusher;
931     friend class BGFetchCallback;
932     friend class VKeyStatBGFetchCallback;
933     friend class PersistenceCallback;
934     friend class Deleter;
935     friend class VBCBAdaptor;
936     friend class VBucketVisitorTask;
937     friend class ItemPager;
938     friend class PagingVisitor;
939
940     EventuallyPersistentEngine     &engine;
941     EPStats                        &stats;
942     StorageProperties              *storageProperties;
943     Warmup                         *warmupTask;
944     ConflictResolution             *conflictResolver;
945     VBucketMap                      vbMap;
946     ExTask                          itmpTask;
947     ExTask                          chkTask;
948     float                           bfilterResidencyThreshold;
949     ExTask                          defragmenterTask;
950
951     size_t                          compactionWriteQueueCap;
952     float                           compactionExpMemThreshold;
953
954     /* Array of mutexes for each vbucket
955      * Used by flush operations: flushVB, deleteVB, compactVB, snapshotVB */
956     Mutex                          *vb_mutexes;
957     AtomicValue<bool>              *schedule_vbstate_persist;
958     std::vector<MutationLog*>       accessLog;
959
960     AtomicValue<size_t> bgFetchQueue;
961
962     AtomicValue<bool> diskFlushAll;
963     struct FlushAllTaskCtx {
964         FlushAllTaskCtx(): delayFlushAll(true), cookie(NULL) {}
965         AtomicValue<bool> delayFlushAll;
966         const void* cookie;
967     } flushAllTaskCtx;
968
969     Mutex vbsetMutex;
970     uint32_t bgFetchDelay;
971     double backfillMemoryThreshold;
972     struct ExpiryPagerDelta {
973         ExpiryPagerDelta() : sleeptime(0), task(0) {}
974         Mutex mutex;
975         size_t sleeptime;
976         size_t task;
977     } expiryPager;
978     struct ALogTask {
979         ALogTask() : sleeptime(0), task(0), lastTaskRuntime(gethrtime()),
980                      enabled(true) {}
981         Mutex mutex;
982         size_t sleeptime;
983         size_t task;
984         hrtime_t lastTaskRuntime;
985         bool enabled;
986     } accessScanner;
987     struct ResidentRatio {
988         AtomicValue<size_t> activeRatio;
989         AtomicValue<size_t> replicaRatio;
990     } cachedResidentRatio;
991     size_t statsSnapshotTaskId;
992     AtomicValue<size_t> lastTransTimePerItem;
993     item_eviction_policy_t eviction_policy;
994
995     Mutex compactionLock;
996     std::list<CompTaskEntry> compactionTasks;
997
998     DISALLOW_COPY_AND_ASSIGN(EventuallyPersistentStore);
999 };
1000
1001 /**
1002  * Base class for visiting an epStore with pause/resume support.
1003  */
1004 class PauseResumeEPStoreVisitor {
1005 public:
1006     virtual ~PauseResumeEPStoreVisitor() {}
1007
1008     /**
1009      * Visit a hashtable within an epStore.
1010      *
1011      * @param vbucket_id ID of the vbucket being visited.
1012      * @param ht a reference to the hashtable.
1013      * @return True if visiting should continue, otherwise false.
1014      */
1015     virtual bool visit(uint16_t vbucket_id, HashTable& ht) = 0;
1016 };
1017
1018 #endif  // SRC_EP_H_