Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / ep.h
index 1fa757b..af1b9c1 100644 (file)
--- a/src/ep.h
+++ b/src/ep.h
@@ -43,6 +43,7 @@
 #include "kvstore.h"
 #include "locks.h"
 #include "executorpool.h"
+#include "ext_meta_parser.h"
 #include "stats.h"
 #include "stored-value.h"
 #include "vbucket.h"
@@ -104,9 +105,11 @@ protected:
 // Forward declaration
 class BGFetchCallback;
 class ConflictResolution;
+class DefragmenterTask;
 class EventuallyPersistentStore;
 class Flusher;
 class MutationLog;
+class PauseResumeEPStoreVisitor;
 class PersistenceCallback;
 class Warmup;
 
@@ -179,6 +182,27 @@ typedef std::pair<uint16_t, ExTask> CompTaskEntry;
 class EventuallyPersistentStore {
 public:
 
+    /**
+     * Represents a position within the epStore, used when visiting items.
+     *
+     * Currently opaque (and constant), clients can pass them around but
+     * cannot reposition the iterator.
+     */
+    class Position {
+    public:
+        bool operator==(const Position& other) const {
+            return (vbucket_id == other.vbucket_id);
+        }
+
+    private:
+        Position(uint16_t vbucket_id_) : vbucket_id(vbucket_id_) {}
+
+        uint16_t vbucket_id;
+
+        friend class EventuallyPersistentStore;
+        friend std::ostream& operator<<(std::ostream& os, const Position& pos);
+    };
+
     EventuallyPersistentStore(EventuallyPersistentEngine &theEngine);
     ~EventuallyPersistentStore();
 
@@ -217,12 +241,13 @@ public:
     /**
      * Add an TAP backfill item into its corresponding vbucket
      * @param item the item to be added
-     * @param meta contains meta info or not
      * @param nru the nru bit for the item
+     * @param genBySeqno whether or not to generate sequence number
      * @return the result of the operation
      */
     ENGINE_ERROR_CODE addTAPBackfillItem(const Item &item, uint8_t nru = 0xff,
-                                         bool genBySeqno = true);
+                                         bool genBySeqno = true,
+                                         ExtendedMetaData *emd = NULL);
 
     /**
      * Retrieve a value.
@@ -233,14 +258,19 @@ public:
      * @param queueBG if true, automatically queue a background fetch if necessary
      * @param honorStates if false, fetch a result regardless of state
      * @param trackReference true if we want to set the nru bit for the item
+     * @param deleteTempItem true if temporary items need to be deleted
+     * @param hideLockedCAS if true, hide (report as -1) the CAS of locked items.
      *
      * @return a GetValue representing the result of the request
      */
     GetValue get(const std::string &key, uint16_t vbucket,
                  const void *cookie, bool queueBG=true,
-                 bool honorStates=true, bool trackReference=true) {
+                 bool honorStates=true, bool trackReference=true,
+                 bool deleteTempItem=true,
+                 bool hideLockedCAS=true) {
         return getInternal(key, vbucket, cookie, queueBG, honorStates,
-                           vbucket_state_active, trackReference);
+                           vbucket_state_active, trackReference,
+                           deleteTempItem, hideLockedCAS);
     }
 
     GetValue getRandomKey(void);
@@ -269,36 +299,44 @@ public:
      * @param vbucket the vbucket from which to retrieve the key
      * @param cookie the connection cookie
      * @param metadata where to store the meta informaion
-     * @param true if we want to set the nru bit for the item
      * @param deleted specifies whether or not the key is deleted
+     * @param confResMode specifies the Conflict Resolution mode for the item
+     * @param trackReference true if we want to set the nru bit for the item
      */
     ENGINE_ERROR_CODE getMetaData(const std::string &key,
                                   uint16_t vbucket,
                                   const void *cookie,
                                   ItemMetaData &metadata,
                                   uint32_t &deleted,
+                                  uint8_t &confResMode,
                                   bool trackReference = false);
 
     /**
      * Set an item in the store.
      * @param item the item to set
      * @param cas value to match
+     * @param seqno sequence number of mutation
      * @param cookie the cookie representing the client to store the item
      * @param force override vbucket states
      * @param allowExisting set to false if you want set to fail if the
      *                      item exists already
      * @param nru the nru bit for the item
+     * @param genBySeqno whether or not to generate sequence number
+     * @param emd ExtendedMetaData class object that contains any ext meta
      * @param isReplication set to true if we are to use replication
      *                      throttle threshold
+     *
      * @return the result of the store operation
      */
     ENGINE_ERROR_CODE setWithMeta(const Item &item,
                                   uint64_t cas,
+                                  uint64_t *seqno,
                                   const void *cookie,
                                   bool force,
-                                  bool allowReplace,
+                                  bool allowExisting,
                                   uint8_t nru = 0xff,
                                   bool genBySeqno = true,
+                                  ExtendedMetaData *emd = NULL,
                                   bool isReplication = false);
 
     /**
@@ -348,8 +386,12 @@ public:
      * @param itemMeta the pointer to the metadata memory.
      * @param tapBackfill true if an item deletion is from TAP backfill stream
      *
+     * (deleteWithMeta)
+     * @param genBySeqno whether or not to generate sequence number
+     * @param emd ExtendedMetaData class object that contains any ext meta
      * @param isReplication set to true if we are to use replication
      *                      throttle threshold
+     *
      * @return the result of the delete operation
      */
     ENGINE_ERROR_CODE deleteItem(const std::string &key,
@@ -358,10 +400,12 @@ public:
                                  const void *cookie,
                                  bool force,
                                  ItemMetaData *itemMeta,
+                                 mutation_descr_t *mutInfo,
                                  bool tapBackfill=false);
 
     ENGINE_ERROR_CODE deleteWithMeta(const std::string &key,
                                      uint64_t* cas,
+                                     uint64_t* seqno,
                                      uint16_t vbucket,
                                      const void *cookie,
                                      bool force,
@@ -369,6 +413,7 @@ public:
                                      bool tapBackfill=false,
                                      bool genBySeqno=true,
                                      uint64_t bySeqno=0,
+                                     ExtendedMetaData *emd = NULL,
                                      bool isReplication=false);
 
     void reset();
@@ -410,14 +455,12 @@ public:
      *
      * @param key the key to be bg fetched
      * @param vbucket the vbucket in which the key lives
-     * @param rowid the rowid of the record within its shard
      * @param cookie the cookie of the requestor
      * @param type whether the fetch is for a non-resident value or metadata of
      *             a (possibly) deleted item
      */
     void bgFetch(const std::string &key,
                  uint16_t vbucket,
-                 uint64_t rowid,
                  const void *cookie,
                  bool isMeta = false);
 
@@ -426,7 +469,6 @@ public:
      *
      * @param key the key that was fetched
      * @param vbucket the vbucket in which the key lived
-     * @param rowid the rowid of the record within its shard
      * @param cookie the cookie of the requestor
      * @param init the timestamp of when the request came in
      * @param type whether the fetch is for a non-resident value or metadata of
@@ -434,7 +476,6 @@ public:
      */
     void completeBGFetch(const std::string &key,
                          uint16_t vbucket,
-                         uint64_t rowid,
                          const void *cookie,
                          hrtime_t init,
                          bool isMeta);
@@ -527,6 +568,9 @@ public:
      */
     bool resetVBucket(uint16_t vbid);
 
+    /**
+     * Run a vBucket visitor, visiting all items. Synchronous.
+     */
     void visit(VBucketVisitor &visitor);
 
     /**
@@ -541,6 +585,41 @@ public:
                                              lbl, prio, sleepTime), taskGroup);
     }
 
+    /**
+     * Visit the items in this epStore, starting the iteration from the
+     * given startPosition and allowing the visit to be paused at any point.
+     *
+     * During visitation, the visitor object can request that the visit
+     * is stopped after the current item. The position passed to the
+     * visitor can then be used to restart visiting at the *APPROXIMATE*
+     * same position as it paused.
+     * This is approximate as various locks are released when the
+     * function returns, so any changes to the underlying epStore may cause
+     * the visiting to restart at the slightly different place.
+     *
+     * As a consequence, *DO NOT USE THIS METHOD* if you need to guarantee
+     * that all items are visited!
+     *
+     * @param visitor The visitor object.
+     * @return The final epStore position visited; equal to
+     *         EventuallyPersistentStore::end() if all items were visited
+     *         otherwise the position to resume from.
+     */
+    Position pauseResumeVisit(PauseResumeEPStoreVisitor& visitor,
+                              Position& start_pos);
+
+
+    /**
+     * Return a position at the start of the epStore.
+     */
+    Position startPosition() const;
+
+    /**
+     * Return a position at the end of the epStore. Has similar semantics
+     * as STL end() (i.e. one past the last element).
+     */
+    Position endPosition() const;
+
     const Flusher* getFlusher(uint16_t shardId);
     Warmup* getWarmup(void) const;
 
@@ -632,6 +711,10 @@ public:
         return diskFlushAll.load();
     }
 
+    bool scheduleFlushAllTask(const void* cookie, time_t when);
+
+    void setFlushAllComplete();
+
     void setBackfillMemoryThreshold(double threshold);
 
     void setExpiryPagerSleeptime(size_t val);
@@ -645,6 +728,18 @@ public:
         accessScanner.lastTaskRuntime = gethrtime();
     }
 
+    void setAllBloomFilters(bool to);
+
+    float getBfiltersResidencyThreshold() {
+        return bfilterResidencyThreshold;
+    }
+
+    void setBfiltersResidencyThreshold(float to) {
+        bfilterResidencyThreshold = to;
+    }
+
+    bool isMetaDataResident(RCPtr<VBucket> &vb, const std::string &key);
+
     void incExpirationStat(RCPtr<VBucket> &vb, bool byPager = true) {
         if (byPager) {
             ++stats.expired_pager;
@@ -712,6 +807,8 @@ public:
         }
     }
 
+    void runDefragmenterTask();
+
     void setCompactionWriteQueueCap(size_t to) {
         compactionWriteQueueCap = to;
     }
@@ -760,17 +857,22 @@ private:
      *
      * @param vb the vbucket that contains the dirty item
      * @param v the dirty item
-     * @param plh the pointer to the hash table partition lock for the dirty item.
-     *        Note that the lock is released inside this function.
+     * @param plh the pointer to the hash table partition lock for the dirty item
+     *        Note that the lock is released inside this function
+     * @param seqno sequence number of the mutation
      * @param tapBackfill if the item is from backfill replication
      * @param notifyReplicator whether or not to notify the replicator
+     * @param genBySeqno whether or not to generate sequence number
+     * @param setConflictMode set the conflict resolution mode
      */
     void queueDirty(RCPtr<VBucket> &vb,
                     StoredValue* v,
                     LockHolder *plh,
+                    uint64_t *seqno,
                     bool tapBackfill = false,
                     bool notifyReplicator = true,
-                    bool genBySeqno = true);
+                    bool genBySeqno = true,
+                    bool setConflictMode = true);
 
     /**
      * Retrieve a StoredValue and invoke a method on it.
@@ -815,7 +917,9 @@ private:
                          const void *cookie, bool queueBG,
                          bool honorStates,
                          vbucket_state_t allowedState,
-                         bool trackReference=true);
+                         bool trackReference=true,
+                         bool deleteTempItem=true,
+                         bool hideLockedCAS=true);
 
     ENGINE_ERROR_CODE addTempItemForBgFetch(LockHolder &lock, int bucket_num,
                                             const std::string &key, RCPtr<VBucket> &vb,
@@ -841,6 +945,8 @@ private:
     VBucketMap                      vbMap;
     ExTask                          itmpTask;
     ExTask                          chkTask;
+    float                           bfilterResidencyThreshold;
+    ExTask                          defragmenterTask;
 
     size_t                          compactionWriteQueueCap;
     float                           compactionExpMemThreshold;
@@ -852,7 +958,14 @@ private:
     std::vector<MutationLog*>       accessLog;
 
     AtomicValue<size_t> bgFetchQueue;
+
     AtomicValue<bool> diskFlushAll;
+    struct FlushAllTaskCtx {
+        FlushAllTaskCtx(): delayFlushAll(true), cookie(NULL) {}
+        AtomicValue<bool> delayFlushAll;
+        const void* cookie;
+    } flushAllTaskCtx;
+
     Mutex vbsetMutex;
     uint32_t bgFetchDelay;
     double backfillMemoryThreshold;
@@ -885,4 +998,21 @@ private:
     DISALLOW_COPY_AND_ASSIGN(EventuallyPersistentStore);
 };
 
+/**
+ * Base class for visiting an epStore with pause/resume support.
+ */
+class PauseResumeEPStoreVisitor {
+public:
+    virtual ~PauseResumeEPStoreVisitor() {}
+
+    /**
+     * Visit a hashtable within an epStore.
+     *
+     * @param vbucket_id ID of the vbucket being visited.
+     * @param ht a reference to the hashtable.
+     * @return True if visiting should continue, otherwise false.
+     */
+    virtual bool visit(uint16_t vbucket_id, HashTable& ht) = 0;
+};
+
 #endif  // SRC_EP_H_