bec3b5febae7c2de74ee1238061060fe33788f14
[ep-engine.git] / src / vbucket.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_VBUCKET_H_
19 #define SRC_VBUCKET_H_ 1
20
21 #include "config.h"
22
23 #include "bloomfilter.h"
24 #include "checkpoint.h"
25 #include "ep_types.h"
26 #include "failover-table.h"
27 #include "hlc.h"
28 #include "kvstore.h"
29 #include "stored-value.h"
30 #include "utility.h"
31
32 #include <queue>
33
34 class BgFetcher;
35
36 const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
37 const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
38
39 struct HighPriorityVBEntry {
40     HighPriorityVBEntry() :
41         cookie(NULL), id(0), start(gethrtime()), isBySeqno_(false) { }
42     HighPriorityVBEntry(const void *c, uint64_t idNum, bool isBySeqno) :
43         cookie(c), id(idNum), start(gethrtime()), isBySeqno_(isBySeqno) { }
44
45     const void *cookie;
46     uint64_t id;
47     hrtime_t start;
48     bool isBySeqno_;
49 };
50
51 /**
52  * Function object that returns true if the given vbucket is acceptable.
53  */
54 class VBucketFilter {
55 public:
56
57     /**
58      * Instiatiate a VBucketFilter that always returns true.
59      */
60     explicit VBucketFilter() : acceptable() {}
61
62     /**
63      * Instantiate a VBucketFilter that returns true for any of the
64      * given vbucket IDs.
65      */
66     explicit VBucketFilter(const std::vector<uint16_t> &a) :
67         acceptable(a.begin(), a.end()) {}
68
69     explicit VBucketFilter(const std::set<uint16_t> &s) : acceptable(s) {}
70
71     void assign(const std::set<uint16_t> &a) {
72         acceptable = a;
73     }
74
75     bool operator ()(uint16_t v) const {
76         return acceptable.empty() || acceptable.find(v) != acceptable.end();
77     }
78
79     size_t size() const { return acceptable.size(); }
80
81     bool empty() const { return acceptable.empty(); }
82
83     void reset() {
84         acceptable.clear();
85     }
86
87     /**
88      * Calculate the difference between this and another filter.
89      * If "this" contains elements, [1,2,3,4] and other contains [3,4,5,6]
90      * the returned filter contains: [1,2,5,6]
91      * @param other the other filter to compare with
92      * @return a new filter with the elements present in only one of the two
93      *         filters.
94      */
95     VBucketFilter filter_diff(const VBucketFilter &other) const;
96
97     /**
98      * Calculate the intersection between this and another filter.
99      * If "this" contains elements, [1,2,3,4] and other contains [3,4,5,6]
100      * the returned filter contains: [3,4]
101      * @param other the other filter to compare with
102      * @return a new filter with the elements present in both of the two
103      *         filters.
104      */
105     VBucketFilter filter_intersection(const VBucketFilter &other) const;
106
107     const std::set<uint16_t> &getVBSet() const { return acceptable; }
108
109     bool addVBucket(uint16_t vbucket) {
110         std::pair<std::set<uint16_t>::iterator, bool> rv = acceptable.insert(vbucket);
111         return rv.second;
112     }
113
114     void removeVBucket(uint16_t vbucket) {
115         acceptable.erase(vbucket);
116     }
117
118     /**
119      * Dump the filter in a human readable form ( "{ bucket, bucket, bucket }"
120      * to the specified output stream.
121      */
122     friend std::ostream& operator<< (std::ostream& out,
123                                      const VBucketFilter &filter);
124
125 private:
126
127     std::set<uint16_t> acceptable;
128 };
129
130 class EventuallyPersistentEngine;
131 class FailoverTable;
132 class KVShard;
133
134 /**
135  * An individual vbucket.
136  */
137 class VBucket : public RCValue {
138 public:
139
140     // Identifier for a vBucket
141     typedef uint16_t id_type;
142
143     VBucket(id_type i, vbucket_state_t newState, EPStats &st,
144             CheckpointConfig &chkConfig, KVShard *kvshard,
145             int64_t lastSeqno, uint64_t lastSnapStart,
146             uint64_t lastSnapEnd, FailoverTable *table,
147             std::shared_ptr<Callback<id_type> > cb,
148             Configuration& config,
149             vbucket_state_t initState = vbucket_state_dead,
150             uint64_t chkId = 1, uint64_t purgeSeqno = 0,
151             uint64_t maxCas = 0):
152         ht(st),
153         checkpointManager(st, i, chkConfig, lastSeqno, lastSnapStart,
154                           lastSnapEnd, cb, chkId),
155         failovers(table),
156         opsCreate(0),
157         opsUpdate(0),
158         opsDelete(0),
159         opsReject(0),
160         dirtyQueueSize(0),
161         dirtyQueueMem(0),
162         dirtyQueueFill(0),
163         dirtyQueueDrain(0),
164         dirtyQueueAge(0),
165         dirtyQueuePendingWrites(0),
166         metaDataDisk(0),
167         numExpiredItems(0),
168         fileSpaceUsed(0),
169         fileSize(0),
170         id(i),
171         state(newState),
172         initialState(initState),
173         stats(st),
174         purge_seqno(purgeSeqno),
175         takeover_backed_up(false),
176         persisted_snapshot_start(lastSnapStart),
177         persisted_snapshot_end(lastSnapEnd),
178         numHpChks(0),
179         shard(kvshard),
180         bFilter(NULL),
181         tempFilter(NULL),
182         rollbackItemCount(0),
183         hlc(maxCas,
184             config.getHlcAheadThresholdUs(),
185             config.getHlcBehindThresholdUs()),
186         statPrefix("vb_" + std::to_string(i))
187     {
188         backfill.isBackfillPhase = false;
189         pendingOpsStart = 0;
190         stats.memOverhead.fetch_add(sizeof(VBucket)
191                                + ht.memorySize() + sizeof(CheckpointManager));
192         LOG(EXTENSION_LOG_NOTICE,
193             "VBucket: created vbucket:%" PRIu16 " with state:%s "
194                     "initialState:%s "
195                     "lastSeqno:%" PRIu64 " "
196                     "lastSnapshot:{%" PRIu64 ",%" PRIu64 "} "
197                     "persisted_snapshot:{%" PRIu64 ",%" PRIu64 "} "
198                     "max_cas:%" PRIu64,
199             id, VBucket::toString(state), VBucket::toString(initialState),
200             lastSeqno, lastSnapStart, lastSnapEnd,
201             persisted_snapshot_start, persisted_snapshot_end,
202             getMaxCas());
203     }
204
205     ~VBucket();
206
207     int64_t getHighSeqno() const {
208         return checkpointManager.getHighSeqno();
209     }
210
211     size_t getChkMgrMemUsage() {
212         return checkpointManager.getMemoryUsage();
213     }
214
215     size_t getChkMgrMemUsageOfUnrefCheckpoints() {
216         return checkpointManager.getMemoryUsageOfUnrefCheckpoints();
217     }
218
219     uint64_t getPurgeSeqno() const {
220         return purge_seqno;
221     }
222
223     void setPurgeSeqno(uint64_t to) {
224         purge_seqno = to;
225     }
226
227     void setPersistedSnapshot(uint64_t start, uint64_t end) {
228         LockHolder lh(snapshotMutex);
229         persisted_snapshot_start = start;
230         persisted_snapshot_end = end;
231     }
232
233     snapshot_range_t getPersistedSnapshot() const {
234         LockHolder lh(snapshotMutex);
235         return {persisted_snapshot_start, persisted_snapshot_end};
236     }
237
238     uint64_t getMaxCas() const {
239         return hlc.getMaxHLC();
240     }
241
242     void setMaxCas(uint64_t cas) {
243         hlc.setMaxHLC(cas);
244     }
245
246     void setMaxCasAndTrackDrift(uint64_t cas) {
247         hlc.setMaxHLCAndTrackDrift(cas);
248     }
249
250     void forceMaxCas(uint64_t cas) {
251         hlc.forceMaxHLC(cas);
252     }
253
254     HLC::DriftStats getHLCDriftStats() const {
255         return hlc.getDriftStats();
256     }
257
258     HLC::DriftExceptions getHLCDriftExceptionCounters() const {
259         return hlc.getDriftExceptionCounters();
260     }
261
262     void setHLCDriftAheadThreshold(uint64_t threshold) {
263         hlc.setDriftAheadThreshold(threshold);
264     }
265
266     void setHLCDriftBehindThreshold(uint64_t threshold) {
267         hlc.setDriftBehindThreshold(threshold);
268     }
269
270     bool isTakeoverBackedUp() {
271         return takeover_backed_up.load();
272     }
273
274     void setTakeoverBackedUpState(bool to) {
275         bool inverse = !to;
276         takeover_backed_up.compare_exchange_strong(inverse, to);
277     }
278
279     id_type getId() const { return id; }
280     vbucket_state_t getState(void) const { return state.load(); }
281     void setState(vbucket_state_t to);
282     RWLock& getStateLock() {return stateLock;}
283
284     vbucket_state_t getInitialState(void) { return initialState; }
285     void setInitialState(vbucket_state_t initState) {
286         initialState = initState;
287     }
288
289     vbucket_state getVBucketState() const;
290
291     bool addPendingOp(const void *cookie) {
292         LockHolder lh(pendingOpLock);
293         if (state != vbucket_state_pending) {
294             // State transitioned while we were waiting.
295             return false;
296         }
297         // Start a timer when enqueuing the first client.
298         if (pendingOps.empty()) {
299             pendingOpsStart = gethrtime();
300         }
301         pendingOps.push_back(cookie);
302         ++stats.pendingOps;
303         ++stats.pendingOpsTotal;
304         return true;
305     }
306
307     void doStatsForQueueing(const Item& item, size_t itemBytes);
308     void doStatsForFlushing(Item& item, size_t itemBytes);
309     void incrMetaDataDisk(Item& qi);
310     void decrMetaDataDisk(Item& qi);
311
312     void resetStats();
313
314     // Get age sum in millisecond
315     uint64_t getQueueAge() {
316         uint64_t currDirtyQueueAge = dirtyQueueAge.load(
317                                                     std::memory_order_relaxed);
318         rel_time_t currentAge = ep_current_time() * dirtyQueueSize;
319         if (currentAge < currDirtyQueueAge) {
320             return 0;
321         }
322         return (currentAge - currDirtyQueueAge) * 1000;
323     }
324
325     void fireAllOps(EventuallyPersistentEngine &engine);
326
327     size_t size(void) {
328         HashTableDepthStatVisitor v;
329         ht.visitDepth(v);
330         return v.size;
331     }
332
333     size_t getBackfillSize() {
334         LockHolder lh(backfill.mutex);
335         return backfill.items.size();
336     }
337     bool queueBackfillItem(queued_item& qi,
338                            const GenerateBySeqno generateBySeqno) {
339         LockHolder lh(backfill.mutex);
340         if (GenerateBySeqno::Yes == generateBySeqno) {
341             qi->setBySeqno(checkpointManager.nextBySeqno());
342         } else {
343             checkpointManager.setBySeqno(qi->getBySeqno());
344         }
345         backfill.items.push(qi);
346         ++stats.diskQueueSize;
347         ++stats.totalEnqueued;
348         doStatsForQueueing(*qi, qi->size());
349         stats.memOverhead.fetch_add(sizeof(queued_item));
350         return true;
351     }
352     void getBackfillItems(std::vector<queued_item> &items) {
353         LockHolder lh(backfill.mutex);
354         size_t num_items = backfill.items.size();
355         while (!backfill.items.empty()) {
356             items.push_back(backfill.items.front());
357             backfill.items.pop();
358         }
359         stats.memOverhead.fetch_sub(num_items * sizeof(queued_item));
360     }
361     bool isBackfillPhase() {
362         LockHolder lh(backfill.mutex);
363         return backfill.isBackfillPhase;
364     }
365     void setBackfillPhase(bool backfillPhase) {
366         LockHolder lh(backfill.mutex);
367         backfill.isBackfillPhase = backfillPhase;
368     }
369
370     bool getBGFetchItems(vb_bgfetch_queue_t &fetches);
371
372     /* queue a background fetch of the specified item.
373      * Returns the number of pending background fetches after
374      * adding the specified item.
375      **/
376     size_t queueBGFetchItem(const std::string &key, VBucketBGFetchItem *fetch,
377                             BgFetcher *bgFetcher);
378
379     bool hasPendingBGFetchItems(void) {
380         LockHolder lh(pendingBGFetchesLock);
381         return !pendingBGFetches.empty();
382     }
383
384     static const char* toString(vbucket_state_t s) {
385         switch(s) {
386         case vbucket_state_active: return "active"; break;
387         case vbucket_state_replica: return "replica"; break;
388         case vbucket_state_pending: return "pending"; break;
389         case vbucket_state_dead: return "dead"; break;
390         }
391         return "unknown";
392     }
393
394     static vbucket_state_t fromString(const char* state) {
395         if (strcmp(state, "active") == 0) {
396             return vbucket_state_active;
397         } else if (strcmp(state, "replica") == 0) {
398             return vbucket_state_replica;
399         } else if (strcmp(state, "pending") == 0) {
400             return vbucket_state_pending;
401         } else {
402             return vbucket_state_dead;
403         }
404     }
405
406     void addHighPriorityVBEntry(uint64_t id, const void *cookie,
407                                 bool isBySeqno);
408     void notifyOnPersistence(EventuallyPersistentEngine &e,
409                              uint64_t id, bool isBySeqno);
410     void notifyAllPendingConnsFailed(EventuallyPersistentEngine &e);
411     size_t getHighPriorityChkSize();
412     static size_t getCheckpointFlushTimeout();
413
414     /**
415      * BloomFilter operations for vbucket
416      */
417     void createFilter(size_t key_count, double probability);
418     void initTempFilter(size_t key_count, double probability);
419     void addToFilter(const std::string &key);
420     bool maybeKeyExistsInFilter(const std::string &key);
421     bool isTempFilterAvailable();
422     void addToTempFilter(const std::string &key);
423     void swapFilter();
424     void clearFilter();
425     void setFilterStatus(bfilter_status_t to);
426     std::string getFilterStatusString();
427     size_t getFilterSize();
428     size_t getNumOfKeysInFilter();
429
430     uint64_t nextHLCCas() {
431         return hlc.nextHLC();
432     }
433
434     // Applicable only for FULL EVICTION POLICY
435     bool isResidentRatioUnderThreshold(float threshold,
436                                        item_eviction_policy_t policy);
437
438     void addStats(bool details, ADD_STAT add_stat, const void *c,
439                   item_eviction_policy_t policy);
440
441     size_t getNumItems(item_eviction_policy_t policy);
442
443     size_t getNumNonResidentItems(item_eviction_policy_t policy);
444
445     size_t getNumTempItems(void) {
446         return ht.getNumTempItems();
447     }
448
449     bool decrDirtyQueueSize(size_t decrementBy) {
450         size_t oldVal;
451         do {
452             oldVal = dirtyQueueSize.load();
453             if (oldVal < decrementBy) {
454                 LOG(EXTENSION_LOG_DEBUG,
455                     "Cannot decrement dirty queue size of vbucket %" PRIu16
456                     "by %" PRIu64 ", the current value is %" PRIu64 "\n", id,
457                     uint64_t(decrementBy), uint64_t(oldVal));
458                 return false;
459             }
460         } while (!dirtyQueueSize.compare_exchange_strong(oldVal, oldVal - decrementBy));
461         return true;
462     }
463
464     void incrRollbackItemCount(uint64_t val) {
465         rollbackItemCount.fetch_add(val, std::memory_order_relaxed);
466     }
467
468     uint64_t getRollbackItemCount(void) {
469         return rollbackItemCount.load(std::memory_order_relaxed);
470     }
471
472     // Return the persistence checkpoint ID
473     uint64_t getPersistenceCheckpointId() const;
474
475     // Set the persistence checkpoint ID to the given value.
476     void setPersistenceCheckpointId(uint64_t checkpointId);
477
478     static const vbucket_state_t ACTIVE;
479     static const vbucket_state_t REPLICA;
480     static const vbucket_state_t PENDING;
481     static const vbucket_state_t DEAD;
482
483     HashTable         ht;
484     CheckpointManager checkpointManager;
485
486     // Struct for managing 'backfill' items - Items which have been added by
487     // an incoming TAP stream and need to be persisted to disk.
488     struct {
489         Mutex mutex;
490         std::queue<queued_item> items;
491         bool isBackfillPhase;
492     } backfill;
493
494     KVShard *getShard(void) {
495         return shard;
496     }
497
498     std::queue<queued_item> rejectQueue;
499     FailoverTable *failovers;
500
501     AtomicValue<size_t>  opsCreate;
502     AtomicValue<size_t>  opsUpdate;
503     AtomicValue<size_t>  opsDelete;
504     AtomicValue<size_t>  opsReject;
505
506     AtomicValue<size_t>  dirtyQueueSize;
507     AtomicValue<size_t>  dirtyQueueMem;
508     AtomicValue<size_t>  dirtyQueueFill;
509     AtomicValue<size_t>  dirtyQueueDrain;
510     AtomicValue<uint64_t> dirtyQueueAge;
511     AtomicValue<size_t>  dirtyQueuePendingWrites;
512     AtomicValue<size_t>  metaDataDisk;
513
514     AtomicValue<size_t>  numExpiredItems;
515     AtomicValue<size_t>  fileSpaceUsed;
516     AtomicValue<size_t>  fileSize;
517
518 private:
519     template <typename T>
520     void addStat(const char *nm, const T &val, ADD_STAT add_stat, const void *c);
521
522     void fireAllOps(EventuallyPersistentEngine &engine, ENGINE_ERROR_CODE code);
523
524     void adjustCheckpointFlushTimeout(size_t wall_time);
525
526     void decrDirtyQueueMem(size_t decrementBy);
527
528     void decrDirtyQueueAge(uint32_t decrementBy);
529
530     void decrDirtyQueuePendingWrites(size_t decrementBy);
531
532     id_type                         id;
533     AtomicValue<vbucket_state_t>    state;
534     RWLock                          stateLock;
535     vbucket_state_t                 initialState;
536     Mutex                           pendingOpLock;
537     std::vector<const void*>        pendingOps;
538     hrtime_t                        pendingOpsStart;
539     EPStats                        &stats;
540     uint64_t                        purge_seqno;
541
542     AtomicValue<bool>               takeover_backed_up;
543
544     Mutex pendingBGFetchesLock;
545     vb_bgfetch_queue_t pendingBGFetches;
546
547     /* snapshotMutex is used to update/read the pair {start, end} atomically,
548        but not if reading a single field. */
549     mutable Mutex snapshotMutex;
550     uint64_t persisted_snapshot_start;
551     uint64_t persisted_snapshot_end;
552
553     Mutex hpChksMutex;
554     std::list<HighPriorityVBEntry> hpChks;
555     AtomicValue<size_t> numHpChks; // size of list hpChks (to avoid MB-9434)
556     KVShard *shard;
557
558     Mutex bfMutex;
559     BloomFilter *bFilter;
560     BloomFilter *tempFilter;    // Used during compaction.
561
562     AtomicValue<uint64_t> rollbackItemCount;
563
564     HLC hlc;
565     std::string statPrefix;
566     // The persistence checkpoint ID for this vbucket.
567     AtomicValue<uint64_t> persistenceCheckpointId;
568
569     static std::atomic<size_t> chkFlushTimeout;
570
571     DISALLOW_COPY_AND_ASSIGN(VBucket);
572 };
573
574 #endif  // SRC_VBUCKET_H_