Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / vbucket.h
index fc2f951..fb685d6 100644 (file)
 
 #include "atomic.h"
 #include "bgfetcher.h"
+#include "bloomfilter.h"
 #include "checkpoint.h"
 #include "common.h"
 #include "stored-value.h"
 
 const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
 const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
+static const int64_t INITIAL_DRIFT = -140737488355328; //lowest possible 48-bit integer
 
 struct HighPriorityVBEntry {
     HighPriorityVBEntry() :
@@ -158,10 +160,13 @@ public:
             CheckpointConfig &chkConfig, KVShard *kvshard,
             int64_t lastSeqno, uint64_t lastSnapStart,
             uint64_t lastSnapEnd, FailoverTable *table,
+            shared_ptr<Callback<uint16_t> > cb,
             vbucket_state_t initState = vbucket_state_dead,
-            uint64_t chkId = 1, uint64_t purgeSeqno = 0) :
+            uint64_t chkId = 1, uint64_t purgeSeqno = 0,
+            uint64_t maxCas = 0, int64_t driftCounter = INITIAL_DRIFT):
         ht(st),
-        checkpointManager(st, i, chkConfig, lastSeqno, chkId),
+        checkpointManager(st, i, chkConfig, lastSeqno, lastSnapStart,
+                          lastSnapEnd, cb, chkId),
         failovers(table),
         opsCreate(0),
         opsUpdate(0),
@@ -182,10 +187,15 @@ public:
         initialState(initState),
         stats(st),
         purge_seqno(purgeSeqno),
-        cur_snapshot_start(lastSnapStart),
-        cur_snapshot_end(lastSnapEnd),
+        max_cas(maxCas),
+        drift_counter(driftCounter),
+        time_sync_enabled(false),
+        persisted_snapshot_start(lastSnapStart),
+        persisted_snapshot_end(lastSnapEnd),
         numHpChks(0),
-        shard(kvshard)
+        shard(kvshard),
+        bFilter(NULL),
+        tempFilter(NULL)
     {
         backfill.isBackfillPhase = false;
         pendingOpsStart = 0;
@@ -208,43 +218,56 @@ public:
         purge_seqno = to;
     }
 
-    LockHolder getSnapshotLock() {
+    void setPersistedSnapshot(uint64_t start, uint64_t end) {
         LockHolder lh(snapshotMutex);
-        return lh;
+        persisted_snapshot_start = start;
+        persisted_snapshot_end = end;
     }
 
-    void setCurrentSnapshot(uint64_t start, uint64_t end) {
+    void getPersistedSnapshot(snapshot_range_t& range) {
         LockHolder lh(snapshotMutex);
-        setCurrentSnapshot_UNLOCKED(start, end);
+        range.start = persisted_snapshot_start;
+        range.end = persisted_snapshot_end;
     }
 
-    void setCurrentSnapshot_UNLOCKED(uint64_t start, uint64_t end) {
-        cb_assert(start <= end);
+    uint64_t getMaxCas() {
+        return max_cas;
+    }
 
-        if (state == vbucket_state_replica) {
-            cb_assert(end >= (uint64_t)checkpointManager.getHighSeqno());
-        }
+    bool isTimeSyncEnabled() {
+        return time_sync_enabled.load();
+    }
 
-        cur_snapshot_start.store(start);
-        cur_snapshot_end.store(end);
+    void setMaxCas(uint64_t cas) {
+        atomic_setIfBigger(max_cas, cas);
     }
 
-    void getCurrentSnapshot(uint64_t& start, uint64_t& end) {
-        LockHolder lh(snapshotMutex);
-        getCurrentSnapshot_UNLOCKED(start, end);
+    /**
+     * To set drift counter's initial value
+     * and to toggle the timeSync between ON/OFF.
+     */
+    void setDriftCounterState(int64_t initial_drift, uint8_t time_sync) {
+        drift_counter = initial_drift;
+        time_sync_enabled = time_sync;
     }
 
-    void getCurrentSnapshot_UNLOCKED(uint64_t& start, uint64_t& end) {
-        start = cur_snapshot_start.load();
-        end = cur_snapshot_end.load();
+    int64_t getDriftCounter() {
+        return drift_counter;
     }
 
-    void getCurrentSnapshotEnd(uint64_t& end) {
-        end = cur_snapshot_end.load();
+    void setDriftCounter(int64_t adjustedTime) {
+        // Update drift counter only if timeSync is enabled for
+        // the vbucket.
+        if (time_sync_enabled) {
+            int64_t wallTime = gethrtime();
+            if ((wallTime + getDriftCounter()) < adjustedTime) {
+                drift_counter = (adjustedTime - wallTime);
+            }
+        }
     }
 
     int getId(void) const { return id; }
-    vbucket_state_t getState(void) const { return state; }
+    vbucket_state_t getState(void) const { return state.load(); }
     void setState(vbucket_state_t to, SERVER_HANDLE_V1 *sapi);
     RWLock& getStateLock() {return stateLock;}
 
@@ -278,11 +301,13 @@ public:
 
     // Get age sum in millisecond
     uint64_t getQueueAge() {
+        uint64_t currDirtyQueueAge = dirtyQueueAge.load(
+                                                    std::memory_order_relaxed);
         rel_time_t currentAge = ep_current_time() * dirtyQueueSize;
-        if (currentAge < dirtyQueueAge) {
+        if (currentAge < currDirtyQueueAge) {
             return 0;
         }
-        return (currentAge - dirtyQueueAge) * 1000;
+        return (currentAge - currDirtyQueueAge) * 1000;
     }
 
     void fireAllOps(EventuallyPersistentEngine &engine);
@@ -372,6 +397,25 @@ public:
     size_t getHighPriorityChkSize();
     static size_t getCheckpointFlushTimeout();
 
+    /**
+     * BloomFilter operations for vbucket
+     */
+    void initTempFilter(size_t key_count, double probability);
+    void addToFilter(const std::string &key);
+    bool maybeKeyExistsInFilter(const std::string &key);
+    bool isTempFilterAvailable();
+    void addToTempFilter(const std::string &key);
+    void swapFilter();
+    void clearFilter();
+    void setFilterStatus(bfilter_status_t to);
+    std::string getFilterStatusString();
+
+    uint64_t nextHLCCas();
+
+    // Applicable only for FULL EVICTION POLICY
+    bool isResidentRatioUnderThreshold(float threshold,
+                                       item_eviction_policy_t policy);
+
     void addStats(bool details, ADD_STAT add_stat, const void *c,
                   item_eviction_policy_t policy);
 
@@ -397,6 +441,9 @@ public:
         return true;
     }
 
+    void addPersistenceNotification(shared_ptr<Callback<uint64_t> > cb);
+    void notifySeqnoPersisted(uint64_t highseqno);
+
     static const vbucket_state_t ACTIVE;
     static const vbucket_state_t REPLICA;
     static const vbucket_state_t PENDING;
@@ -442,6 +489,12 @@ private:
 
     void adjustCheckpointFlushTimeout(size_t wall_time);
 
+    void decrDirtyQueueMem(size_t decrementBy);
+
+    void decrDirtyQueueAge(uint32_t decrementBy);
+
+    void decrDirtyQueuePendingWrites(size_t decrementBy);
+
     int                      id;
     AtomicValue<vbucket_state_t>  state;
     RWLock                   stateLock;
@@ -451,6 +504,9 @@ private:
     hrtime_t                 pendingOpsStart;
     EPStats                 &stats;
     uint64_t                 purge_seqno;
+    AtomicValue<uint64_t>    max_cas;
+    AtomicValue<int64_t>     drift_counter;
+    AtomicValue<bool>        time_sync_enabled;
 
     Mutex pendingBGFetchesLock;
     vb_bgfetch_queue_t pendingBGFetches;
@@ -458,14 +514,26 @@ private:
     /* snapshotMutex is used to update/read the pair {start, end} atomically,
        but not if reading a single field. */
     Mutex snapshotMutex;
-    AtomicValue<uint64_t> cur_snapshot_start;
-    AtomicValue<uint64_t> cur_snapshot_end;
+    uint64_t persisted_snapshot_start;
+    uint64_t persisted_snapshot_end;
 
     Mutex hpChksMutex;
     std::list<HighPriorityVBEntry> hpChks;
     AtomicValue<size_t> numHpChks; // size of list hpChks (to avoid MB-9434)
     KVShard *shard;
 
+    Mutex bfMutex;
+    BloomFilter *bFilter;
+    BloomFilter *tempFilter;    // Used during compaction.
+
+    /**
+     * The following list is to contain pending notifications
+     * that need to be alerted whenever the desired sequence
+     * numbers have been persisted.
+     */
+    Mutex persistedNotificationsMutex;
+    std::list<shared_ptr<Callback<uint64_t>> > persistedNotifications;
+
     static size_t chkFlushTimeout;
 
     DISALLOW_COPY_AND_ASSIGN(VBucket);