Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / vbucket.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_VBUCKET_H_
19 #define SRC_VBUCKET_H_ 1
20
21 #include "config.h"
22
23 #include <list>
24 #include <queue>
25 #include <set>
26 #include <sstream>
27 #include <string>
28 #include <utility>
29 #include <vector>
30
31 #include "atomic.h"
32 #include "bgfetcher.h"
33 #include "bloomfilter.h"
34 #include "checkpoint.h"
35 #include "common.h"
36 #include "stored-value.h"
37
38 const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
39 const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
40 static const int64_t INITIAL_DRIFT = -140737488355328; //lowest possible 48-bit integer
41
42 struct HighPriorityVBEntry {
43     HighPriorityVBEntry() :
44         cookie(NULL), id(0), start(gethrtime()), isBySeqno_(false) { }
45     HighPriorityVBEntry(const void *c, uint64_t idNum, bool isBySeqno) :
46         cookie(c), id(idNum), start(gethrtime()), isBySeqno_(isBySeqno) { }
47
48     const void *cookie;
49     uint64_t id;
50     hrtime_t start;
51     bool isBySeqno_;
52 };
53
54 /**
55  * Function object that returns true if the given vbucket is acceptable.
56  */
57 class VBucketFilter {
58 public:
59
60     /**
61      * Instiatiate a VBucketFilter that always returns true.
62      */
63     explicit VBucketFilter() : acceptable() {}
64
65     /**
66      * Instantiate a VBucketFilter that returns true for any of the
67      * given vbucket IDs.
68      */
69     explicit VBucketFilter(const std::vector<uint16_t> &a) :
70         acceptable(a.begin(), a.end()) {}
71
72     explicit VBucketFilter(const std::set<uint16_t> &s) : acceptable(s) {}
73
74     void assign(const std::set<uint16_t> &a) {
75         acceptable = a;
76     }
77
78     bool operator ()(uint16_t v) const {
79         return acceptable.empty() || acceptable.find(v) != acceptable.end();
80     }
81
82     size_t size() const { return acceptable.size(); }
83
84     bool empty() const { return acceptable.empty(); }
85
86     void reset() {
87         acceptable.clear();
88     }
89
90     /**
91      * Calculate the difference between this and another filter.
92      * If "this" contains elements, [1,2,3,4] and other contains [3,4,5,6]
93      * the returned filter contains: [1,2,5,6]
94      * @param other the other filter to compare with
95      * @return a new filter with the elements present in only one of the two
96      *         filters.
97      */
98     VBucketFilter filter_diff(const VBucketFilter &other) const;
99
100     /**
101      * Calculate the intersection between this and another filter.
102      * If "this" contains elements, [1,2,3,4] and other contains [3,4,5,6]
103      * the returned filter contains: [3,4]
104      * @param other the other filter to compare with
105      * @return a new filter with the elements present in both of the two
106      *         filters.
107      */
108     VBucketFilter filter_intersection(const VBucketFilter &other) const;
109
110     const std::set<uint16_t> &getVBSet() const { return acceptable; }
111
112     bool addVBucket(uint16_t vbucket) {
113         std::pair<std::set<uint16_t>::iterator, bool> rv = acceptable.insert(vbucket);
114         return rv.second;
115     }
116
117     void removeVBucket(uint16_t vbucket) {
118         acceptable.erase(vbucket);
119     }
120
121     /**
122      * Dump the filter in a human readable form ( "{ bucket, bucket, bucket }"
123      * to the specified output stream.
124      */
125     friend std::ostream& operator<< (std::ostream& out,
126                                      const VBucketFilter &filter);
127
128 private:
129
130     std::set<uint16_t> acceptable;
131 };
132
133 class EventuallyPersistentEngine;
134 class FailoverTable;
135 class KVShard;
136
137 // First bool is true if an item exists in VB DB file.
138 // second bool is true if the operation is SET (i.e., insert or update).
139 typedef std::pair<bool, bool> kstat_entry_t;
140
141 struct KVStatsCtx{
142     KVStatsCtx() : vbucket(std::numeric_limits<uint16_t>::max()),
143                    fileSpaceUsed(0), fileSize(0) {}
144
145     uint16_t vbucket;
146     size_t fileSpaceUsed;
147     size_t fileSize;
148     unordered_map<std::string, kstat_entry_t> keyStats;
149 };
150
151 typedef struct KVStatsCtx kvstats_ctx;
152
153 /**
154  * An individual vbucket.
155  */
156 class VBucket : public RCValue {
157 public:
158
159     VBucket(int i, vbucket_state_t newState, EPStats &st,
160             CheckpointConfig &chkConfig, KVShard *kvshard,
161             int64_t lastSeqno, uint64_t lastSnapStart,
162             uint64_t lastSnapEnd, FailoverTable *table,
163             shared_ptr<Callback<uint16_t> > cb,
164             vbucket_state_t initState = vbucket_state_dead,
165             uint64_t chkId = 1, uint64_t purgeSeqno = 0,
166             uint64_t maxCas = 0, int64_t driftCounter = INITIAL_DRIFT):
167         ht(st),
168         checkpointManager(st, i, chkConfig, lastSeqno, lastSnapStart,
169                           lastSnapEnd, cb, chkId),
170         failovers(table),
171         opsCreate(0),
172         opsUpdate(0),
173         opsDelete(0),
174         opsReject(0),
175         dirtyQueueSize(0),
176         dirtyQueueMem(0),
177         dirtyQueueFill(0),
178         dirtyQueueDrain(0),
179         dirtyQueueAge(0),
180         dirtyQueuePendingWrites(0),
181         metaDataDisk(0),
182         numExpiredItems(0),
183         fileSpaceUsed(0),
184         fileSize(0),
185         id(i),
186         state(newState),
187         initialState(initState),
188         stats(st),
189         purge_seqno(purgeSeqno),
190         max_cas(maxCas),
191         drift_counter(driftCounter),
192         time_sync_enabled(false),
193         persisted_snapshot_start(lastSnapStart),
194         persisted_snapshot_end(lastSnapEnd),
195         numHpChks(0),
196         shard(kvshard),
197         bFilter(NULL),
198         tempFilter(NULL)
199     {
200         backfill.isBackfillPhase = false;
201         pendingOpsStart = 0;
202         stats.memOverhead.fetch_add(sizeof(VBucket)
203                                + ht.memorySize() + sizeof(CheckpointManager));
204         cb_assert(stats.memOverhead.load() < GIGANTOR);
205     }
206
207     ~VBucket();
208
209     int64_t getHighSeqno() {
210         return checkpointManager.getHighSeqno();
211     }
212
213     uint64_t getPurgeSeqno() {
214         return purge_seqno;
215     }
216
217     void setPurgeSeqno(uint64_t to) {
218         purge_seqno = to;
219     }
220
221     void setPersistedSnapshot(uint64_t start, uint64_t end) {
222         LockHolder lh(snapshotMutex);
223         persisted_snapshot_start = start;
224         persisted_snapshot_end = end;
225     }
226
227     void getPersistedSnapshot(snapshot_range_t& range) {
228         LockHolder lh(snapshotMutex);
229         range.start = persisted_snapshot_start;
230         range.end = persisted_snapshot_end;
231     }
232
233     uint64_t getMaxCas() {
234         return max_cas;
235     }
236
237     bool isTimeSyncEnabled() {
238         return time_sync_enabled.load();
239     }
240
241     void setMaxCas(uint64_t cas) {
242         atomic_setIfBigger(max_cas, cas);
243     }
244
245     /**
246      * To set drift counter's initial value
247      * and to toggle the timeSync between ON/OFF.
248      */
249     void setDriftCounterState(int64_t initial_drift, uint8_t time_sync) {
250         drift_counter = initial_drift;
251         time_sync_enabled = time_sync;
252     }
253
254     int64_t getDriftCounter() {
255         return drift_counter;
256     }
257
258     void setDriftCounter(int64_t adjustedTime) {
259         // Update drift counter only if timeSync is enabled for
260         // the vbucket.
261         if (time_sync_enabled) {
262             int64_t wallTime = gethrtime();
263             if ((wallTime + getDriftCounter()) < adjustedTime) {
264                 drift_counter = (adjustedTime - wallTime);
265             }
266         }
267     }
268
269     int getId(void) const { return id; }
270     vbucket_state_t getState(void) const { return state.load(); }
271     void setState(vbucket_state_t to, SERVER_HANDLE_V1 *sapi);
272     RWLock& getStateLock() {return stateLock;}
273
274     vbucket_state_t getInitialState(void) { return initialState; }
275     void setInitialState(vbucket_state_t initState) {
276         initialState = initState;
277     }
278
279     bool addPendingOp(const void *cookie) {
280         LockHolder lh(pendingOpLock);
281         if (state != vbucket_state_pending) {
282             // State transitioned while we were waiting.
283             return false;
284         }
285         // Start a timer when enqueuing the first client.
286         if (pendingOps.empty()) {
287             pendingOpsStart = gethrtime();
288         }
289         pendingOps.push_back(cookie);
290         ++stats.pendingOps;
291         ++stats.pendingOpsTotal;
292         return true;
293     }
294
295     void doStatsForQueueing(Item& item, size_t itemBytes);
296     void doStatsForFlushing(Item& item, size_t itemBytes);
297     void incrMetaDataDisk(Item& qi);
298     void decrMetaDataDisk(Item& qi);
299
300     void resetStats();
301
302     // Get age sum in millisecond
303     uint64_t getQueueAge() {
304         uint64_t currDirtyQueueAge = dirtyQueueAge.load(
305                                                     std::memory_order_relaxed);
306         rel_time_t currentAge = ep_current_time() * dirtyQueueSize;
307         if (currentAge < currDirtyQueueAge) {
308             return 0;
309         }
310         return (currentAge - currDirtyQueueAge) * 1000;
311     }
312
313     void fireAllOps(EventuallyPersistentEngine &engine);
314
315     size_t size(void) {
316         HashTableDepthStatVisitor v;
317         ht.visitDepth(v);
318         return v.size;
319     }
320
321     size_t getBackfillSize() {
322         LockHolder lh(backfill.mutex);
323         return backfill.items.size();
324     }
325     bool queueBackfillItem(queued_item& qi, bool genSeqno) {
326         LockHolder lh(backfill.mutex);
327         if (genSeqno) {
328             qi->setBySeqno(checkpointManager.nextBySeqno());
329         } else {
330             checkpointManager.setBySeqno(qi->getBySeqno());
331         }
332         backfill.items.push(qi);
333         ++stats.diskQueueSize;
334         ++stats.totalEnqueued;
335         doStatsForQueueing(*qi, qi->size());
336         stats.memOverhead.fetch_add(sizeof(queued_item));
337         return true;
338     }
339     void getBackfillItems(std::vector<queued_item> &items) {
340         LockHolder lh(backfill.mutex);
341         size_t num_items = backfill.items.size();
342         while (!backfill.items.empty()) {
343             items.push_back(backfill.items.front());
344             backfill.items.pop();
345         }
346         stats.memOverhead.fetch_sub(num_items * sizeof(queued_item));
347     }
348     bool isBackfillPhase() {
349         LockHolder lh(backfill.mutex);
350         return backfill.isBackfillPhase;
351     }
352     void setBackfillPhase(bool backfillPhase) {
353         LockHolder lh(backfill.mutex);
354         backfill.isBackfillPhase = backfillPhase;
355     }
356
357     bool getBGFetchItems(vb_bgfetch_queue_t &fetches);
358     void queueBGFetchItem(const std::string &key, VBucketBGFetchItem *fetch,
359                           BgFetcher *bgFetcher);
360     size_t numPendingBGFetchItems(void) {
361         LockHolder lh(pendingBGFetchesLock);
362         // do a dirty read of number of fetch items
363         return pendingBGFetches.size();
364     }
365     bool hasPendingBGFetchItems(void) {
366         LockHolder lh(pendingBGFetchesLock);
367         return !pendingBGFetches.empty();
368     }
369
370     static const char* toString(vbucket_state_t s) {
371         switch(s) {
372         case vbucket_state_active: return "active"; break;
373         case vbucket_state_replica: return "replica"; break;
374         case vbucket_state_pending: return "pending"; break;
375         case vbucket_state_dead: return "dead"; break;
376         }
377         return "unknown";
378     }
379
380     static vbucket_state_t fromString(const char* state) {
381         if (strcmp(state, "active") == 0) {
382             return vbucket_state_active;
383         } else if (strcmp(state, "replica") == 0) {
384             return vbucket_state_replica;
385         } else if (strcmp(state, "pending") == 0) {
386             return vbucket_state_pending;
387         } else {
388             return vbucket_state_dead;
389         }
390     }
391
392     void addHighPriorityVBEntry(uint64_t id, const void *cookie,
393                                 bool isBySeqno);
394     void notifyOnPersistence(EventuallyPersistentEngine &e,
395                              uint64_t id, bool isBySeqno);
396     void notifyAllPendingConnsFailed(EventuallyPersistentEngine &e);
397     size_t getHighPriorityChkSize();
398     static size_t getCheckpointFlushTimeout();
399
400     /**
401      * BloomFilter operations for vbucket
402      */
403     void initTempFilter(size_t key_count, double probability);
404     void addToFilter(const std::string &key);
405     bool maybeKeyExistsInFilter(const std::string &key);
406     bool isTempFilterAvailable();
407     void addToTempFilter(const std::string &key);
408     void swapFilter();
409     void clearFilter();
410     void setFilterStatus(bfilter_status_t to);
411     std::string getFilterStatusString();
412
413     uint64_t nextHLCCas();
414
415     // Applicable only for FULL EVICTION POLICY
416     bool isResidentRatioUnderThreshold(float threshold,
417                                        item_eviction_policy_t policy);
418
419     void addStats(bool details, ADD_STAT add_stat, const void *c,
420                   item_eviction_policy_t policy);
421
422     size_t getNumItems(item_eviction_policy_t policy);
423
424     size_t getNumNonResidentItems(item_eviction_policy_t policy);
425
426     size_t getNumTempItems(void) {
427         return ht.getNumTempItems();
428     }
429
430     bool decrDirtyQueueSize(size_t decrementBy) {
431         size_t oldVal;
432         do {
433             oldVal = dirtyQueueSize.load();
434             if (oldVal < decrementBy) {
435                 LOG(EXTENSION_LOG_DEBUG,
436                     "Cannot decrement dirty queue size of vbucket %d by %lld, "
437                     "the current value is %lld\n", id, decrementBy, oldVal);
438                 return false;
439             }
440         } while (!dirtyQueueSize.compare_exchange_strong(oldVal, oldVal - decrementBy));
441         return true;
442     }
443
444     void addPersistenceNotification(shared_ptr<Callback<uint64_t> > cb);
445     void notifySeqnoPersisted(uint64_t highseqno);
446
447     static const vbucket_state_t ACTIVE;
448     static const vbucket_state_t REPLICA;
449     static const vbucket_state_t PENDING;
450     static const vbucket_state_t DEAD;
451
452     HashTable         ht;
453     CheckpointManager checkpointManager;
454     struct {
455         Mutex mutex;
456         std::queue<queued_item> items;
457         bool isBackfillPhase;
458     } backfill;
459
460     KVShard *getShard(void) {
461         return shard;
462     }
463
464     std::queue<queued_item> rejectQueue;
465     FailoverTable *failovers;
466
467     AtomicValue<size_t>  opsCreate;
468     AtomicValue<size_t>  opsUpdate;
469     AtomicValue<size_t>  opsDelete;
470     AtomicValue<size_t>  opsReject;
471
472     AtomicValue<size_t>  dirtyQueueSize;
473     AtomicValue<size_t>  dirtyQueueMem;
474     AtomicValue<size_t>  dirtyQueueFill;
475     AtomicValue<size_t>  dirtyQueueDrain;
476     AtomicValue<uint64_t> dirtyQueueAge;
477     AtomicValue<size_t>  dirtyQueuePendingWrites;
478     AtomicValue<size_t>  metaDataDisk;
479
480     AtomicValue<size_t>  numExpiredItems;
481     AtomicValue<size_t>  fileSpaceUsed;
482     AtomicValue<size_t>  fileSize;
483
484 private:
485     template <typename T>
486     void addStat(const char *nm, const T &val, ADD_STAT add_stat, const void *c);
487
488     void fireAllOps(EventuallyPersistentEngine &engine, ENGINE_ERROR_CODE code);
489
490     void adjustCheckpointFlushTimeout(size_t wall_time);
491
492     void decrDirtyQueueMem(size_t decrementBy);
493
494     void decrDirtyQueueAge(uint32_t decrementBy);
495
496     void decrDirtyQueuePendingWrites(size_t decrementBy);
497
498     int                      id;
499     AtomicValue<vbucket_state_t>  state;
500     RWLock                   stateLock;
501     vbucket_state_t          initialState;
502     Mutex                    pendingOpLock;
503     std::vector<const void*> pendingOps;
504     hrtime_t                 pendingOpsStart;
505     EPStats                 &stats;
506     uint64_t                 purge_seqno;
507     AtomicValue<uint64_t>    max_cas;
508     AtomicValue<int64_t>     drift_counter;
509     AtomicValue<bool>        time_sync_enabled;
510
511     Mutex pendingBGFetchesLock;
512     vb_bgfetch_queue_t pendingBGFetches;
513
514     /* snapshotMutex is used to update/read the pair {start, end} atomically,
515        but not if reading a single field. */
516     Mutex snapshotMutex;
517     uint64_t persisted_snapshot_start;
518     uint64_t persisted_snapshot_end;
519
520     Mutex hpChksMutex;
521     std::list<HighPriorityVBEntry> hpChks;
522     AtomicValue<size_t> numHpChks; // size of list hpChks (to avoid MB-9434)
523     KVShard *shard;
524
525     Mutex bfMutex;
526     BloomFilter *bFilter;
527     BloomFilter *tempFilter;    // Used during compaction.
528
529     /**
530      * The following list is to contain pending notifications
531      * that need to be alerted whenever the desired sequence
532      * numbers have been persisted.
533      */
534     Mutex persistedNotificationsMutex;
535     std::list<shared_ptr<Callback<uint64_t>> > persistedNotifications;
536
537     static size_t chkFlushTimeout;
538
539     DISALLOW_COPY_AND_ASSIGN(VBucket);
540 };
541
542 #endif  // SRC_VBUCKET_H_