MB-19251: Fix race in updating Vbucket.file{SpaceUsed,Size}
[ep-engine.git] / src / vbucket.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_VBUCKET_H_
19 #define SRC_VBUCKET_H_ 1
20
21 #include "config.h"
22
23 #include <list>
24 #include <queue>
25 #include <set>
26 #include <sstream>
27 #include <string>
28 #include <utility>
29 #include <vector>
30
31 #include "atomic.h"
32 #include "bgfetcher.h"
33 #include "checkpoint.h"
34 #include "common.h"
35 #include "stored-value.h"
36
37 const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
38 const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
39
40 struct HighPriorityVBEntry {
41     HighPriorityVBEntry() :
42         cookie(NULL), id(0), start(gethrtime()), isBySeqno_(false) { }
43     HighPriorityVBEntry(const void *c, uint64_t idNum, bool isBySeqno) :
44         cookie(c), id(idNum), start(gethrtime()), isBySeqno_(isBySeqno) { }
45
46     const void *cookie;
47     uint64_t id;
48     hrtime_t start;
49     bool isBySeqno_;
50 };
51
52 /**
53  * Function object that returns true if the given vbucket is acceptable.
54  */
55 class VBucketFilter {
56 public:
57
58     /**
59      * Instiatiate a VBucketFilter that always returns true.
60      */
61     explicit VBucketFilter() : acceptable() {}
62
63     /**
64      * Instantiate a VBucketFilter that returns true for any of the
65      * given vbucket IDs.
66      */
67     explicit VBucketFilter(const std::vector<uint16_t> &a) :
68         acceptable(a.begin(), a.end()) {}
69
70     explicit VBucketFilter(const std::set<uint16_t> &s) : acceptable(s) {}
71
72     void assign(const std::set<uint16_t> &a) {
73         acceptable = a;
74     }
75
76     bool operator ()(uint16_t v) const {
77         return acceptable.empty() || acceptable.find(v) != acceptable.end();
78     }
79
80     size_t size() const { return acceptable.size(); }
81
82     bool empty() const { return acceptable.empty(); }
83
84     void reset() {
85         acceptable.clear();
86     }
87
88     /**
89      * Calculate the difference between this and another filter.
90      * If "this" contains elements, [1,2,3,4] and other contains [3,4,5,6]
91      * the returned filter contains: [1,2,5,6]
92      * @param other the other filter to compare with
93      * @return a new filter with the elements present in only one of the two
94      *         filters.
95      */
96     VBucketFilter filter_diff(const VBucketFilter &other) const;
97
98     /**
99      * Calculate the intersection between this and another filter.
100      * If "this" contains elements, [1,2,3,4] and other contains [3,4,5,6]
101      * the returned filter contains: [3,4]
102      * @param other the other filter to compare with
103      * @return a new filter with the elements present in both of the two
104      *         filters.
105      */
106     VBucketFilter filter_intersection(const VBucketFilter &other) const;
107
108     const std::set<uint16_t> &getVBSet() const { return acceptable; }
109
110     bool addVBucket(uint16_t vbucket) {
111         std::pair<std::set<uint16_t>::iterator, bool> rv = acceptable.insert(vbucket);
112         return rv.second;
113     }
114
115     void removeVBucket(uint16_t vbucket) {
116         acceptable.erase(vbucket);
117     }
118
119     /**
120      * Dump the filter in a human readable form ( "{ bucket, bucket, bucket }"
121      * to the specified output stream.
122      */
123     friend std::ostream& operator<< (std::ostream& out,
124                                      const VBucketFilter &filter);
125
126 private:
127
128     std::set<uint16_t> acceptable;
129 };
130
131 class EventuallyPersistentEngine;
132 class FailoverTable;
133 class KVShard;
134
135 // First bool is true if an item exists in VB DB file.
136 // second bool is true if the operation is SET (i.e., insert or update).
137 typedef std::pair<bool, bool> kstat_entry_t;
138
139 struct KVStatsCtx{
140     KVStatsCtx() : vbucket(std::numeric_limits<uint16_t>::max()),
141                    fileSpaceUsed(0), fileSize(0) {}
142
143     uint16_t vbucket;
144     size_t fileSpaceUsed;
145     size_t fileSize;
146     unordered_map<std::string, kstat_entry_t> keyStats;
147 };
148
149 typedef struct KVStatsCtx kvstats_ctx;
150
151 /**
152  * An individual vbucket.
153  */
154 class VBucket : public RCValue {
155 public:
156
157     VBucket(int i, vbucket_state_t newState, EPStats &st,
158             CheckpointConfig &chkConfig, KVShard *kvshard,
159             int64_t lastSeqno, uint64_t lastSnapStart,
160             uint64_t lastSnapEnd, FailoverTable *table,
161             vbucket_state_t initState = vbucket_state_dead,
162             uint64_t chkId = 1, uint64_t purgeSeqno = 0) :
163         ht(st),
164         checkpointManager(st, i, chkConfig, lastSeqno, chkId),
165         failovers(table),
166         opsCreate(0),
167         opsUpdate(0),
168         opsDelete(0),
169         opsReject(0),
170         dirtyQueueSize(0),
171         dirtyQueueMem(0),
172         dirtyQueueFill(0),
173         dirtyQueueDrain(0),
174         dirtyQueueAge(0),
175         dirtyQueuePendingWrites(0),
176         metaDataDisk(0),
177         numExpiredItems(0),
178         fileSpaceUsed(0),
179         fileSize(0),
180         id(i),
181         state(newState),
182         initialState(initState),
183         stats(st),
184         purge_seqno(purgeSeqno),
185         cur_snapshot_start(lastSnapStart),
186         cur_snapshot_end(lastSnapEnd),
187         numHpChks(0),
188         shard(kvshard)
189     {
190         backfill.isBackfillPhase = false;
191         pendingOpsStart = 0;
192         stats.memOverhead.fetch_add(sizeof(VBucket)
193                                + ht.memorySize() + sizeof(CheckpointManager));
194         cb_assert(stats.memOverhead.load() < GIGANTOR);
195     }
196
197     ~VBucket();
198
199     int64_t getHighSeqno() {
200         return checkpointManager.getHighSeqno();
201     }
202
203     uint64_t getPurgeSeqno() {
204         return purge_seqno;
205     }
206
207     void setPurgeSeqno(uint64_t to) {
208         purge_seqno = to;
209     }
210
211     LockHolder getSnapshotLock() {
212         LockHolder lh(snapshotMutex);
213         return lh;
214     }
215
216     void setCurrentSnapshot(uint64_t start, uint64_t end) {
217         LockHolder lh(snapshotMutex);
218         setCurrentSnapshot_UNLOCKED(start, end);
219     }
220
221     void setCurrentSnapshot_UNLOCKED(uint64_t start, uint64_t end) {
222         cb_assert(start <= end);
223
224         if (state == vbucket_state_replica) {
225             cb_assert(end >= (uint64_t)checkpointManager.getHighSeqno());
226         }
227
228         cur_snapshot_start = start;
229         cur_snapshot_end = end;
230     }
231
232     void getCurrentSnapshot(uint64_t& start, uint64_t& end) {
233         LockHolder lh(snapshotMutex);
234         getCurrentSnapshot_UNLOCKED(start, end);
235     }
236
237     void getCurrentSnapshot_UNLOCKED(uint64_t& start, uint64_t& end) {
238         start = cur_snapshot_start;
239         end = cur_snapshot_end;
240     }
241
242     int getId(void) const { return id; }
243     vbucket_state_t getState(void) const { return state; }
244     void setState(vbucket_state_t to, SERVER_HANDLE_V1 *sapi);
245     RWLock& getStateLock() {return stateLock;}
246
247     vbucket_state_t getInitialState(void) { return initialState; }
248     void setInitialState(vbucket_state_t initState) {
249         initialState = initState;
250     }
251
252     bool addPendingOp(const void *cookie) {
253         LockHolder lh(pendingOpLock);
254         if (state != vbucket_state_pending) {
255             // State transitioned while we were waiting.
256             return false;
257         }
258         // Start a timer when enqueuing the first client.
259         if (pendingOps.empty()) {
260             pendingOpsStart = gethrtime();
261         }
262         pendingOps.push_back(cookie);
263         ++stats.pendingOps;
264         ++stats.pendingOpsTotal;
265         return true;
266     }
267
268     void doStatsForQueueing(Item& item, size_t itemBytes);
269     void doStatsForFlushing(Item& item, size_t itemBytes);
270     void incrMetaDataDisk(Item& qi);
271     void decrMetaDataDisk(Item& qi);
272
273     void resetStats();
274
275     // Get age sum in millisecond
276     uint64_t getQueueAge() {
277         rel_time_t currentAge = ep_current_time() * dirtyQueueSize;
278         if (currentAge < dirtyQueueAge) {
279             return 0;
280         }
281         return (currentAge - dirtyQueueAge) * 1000;
282     }
283
284     void fireAllOps(EventuallyPersistentEngine &engine);
285
286     size_t size(void) {
287         HashTableDepthStatVisitor v;
288         ht.visitDepth(v);
289         return v.size;
290     }
291
292     size_t getBackfillSize() {
293         LockHolder lh(backfill.mutex);
294         return backfill.items.size();
295     }
296     bool queueBackfillItem(queued_item& qi, bool genSeqno) {
297         LockHolder lh(backfill.mutex);
298         if (genSeqno) {
299             qi->setBySeqno(checkpointManager.nextBySeqno());
300         } else {
301             checkpointManager.setBySeqno(qi->getBySeqno());
302         }
303         backfill.items.push(qi);
304         ++stats.diskQueueSize;
305         ++stats.totalEnqueued;
306         doStatsForQueueing(*qi, qi->size());
307         stats.memOverhead.fetch_add(sizeof(queued_item));
308         return true;
309     }
310     void getBackfillItems(std::vector<queued_item> &items) {
311         LockHolder lh(backfill.mutex);
312         size_t num_items = backfill.items.size();
313         while (!backfill.items.empty()) {
314             items.push_back(backfill.items.front());
315             backfill.items.pop();
316         }
317         stats.memOverhead.fetch_sub(num_items * sizeof(queued_item));
318     }
319     bool isBackfillPhase() {
320         LockHolder lh(backfill.mutex);
321         return backfill.isBackfillPhase;
322     }
323     void setBackfillPhase(bool backfillPhase) {
324         LockHolder lh(backfill.mutex);
325         backfill.isBackfillPhase = backfillPhase;
326     }
327
328     bool getBGFetchItems(vb_bgfetch_queue_t &fetches);
329     void queueBGFetchItem(const std::string &key, VBucketBGFetchItem *fetch,
330                           BgFetcher *bgFetcher);
331     size_t numPendingBGFetchItems(void) {
332         // do a dirty read of number of fetch items
333         return pendingBGFetches.size();
334     }
335     bool hasPendingBGFetchItems(void) {
336         LockHolder lh(pendingBGFetchesLock);
337         return !pendingBGFetches.empty();
338     }
339
340     static const char* toString(vbucket_state_t s) {
341         switch(s) {
342         case vbucket_state_active: return "active"; break;
343         case vbucket_state_replica: return "replica"; break;
344         case vbucket_state_pending: return "pending"; break;
345         case vbucket_state_dead: return "dead"; break;
346         }
347         return "unknown";
348     }
349
350     static vbucket_state_t fromString(const char* state) {
351         if (strcmp(state, "active") == 0) {
352             return vbucket_state_active;
353         } else if (strcmp(state, "replica") == 0) {
354             return vbucket_state_replica;
355         } else if (strcmp(state, "pending") == 0) {
356             return vbucket_state_pending;
357         } else {
358             return vbucket_state_dead;
359         }
360     }
361
362     void addHighPriorityVBEntry(uint64_t id, const void *cookie,
363                                 bool isBySeqno);
364     void notifyOnPersistence(EventuallyPersistentEngine &e,
365                              uint64_t id, bool isBySeqno);
366     void notifyAllPendingConnsFailed(EventuallyPersistentEngine &e);
367     size_t getHighPriorityChkSize();
368     static size_t getCheckpointFlushTimeout();
369
370     void addStats(bool details, ADD_STAT add_stat, const void *c,
371                   item_eviction_policy_t policy);
372
373     size_t getNumItems(item_eviction_policy_t policy);
374
375     size_t getNumNonResidentItems(item_eviction_policy_t policy);
376
377     size_t getNumTempItems(void) {
378         return ht.getNumTempItems();
379     }
380
381     bool decrDirtyQueueSize(size_t decrementBy) {
382         size_t oldVal;
383         do {
384             oldVal = dirtyQueueSize.load();
385             if (oldVal < decrementBy) {
386                 LOG(EXTENSION_LOG_DEBUG,
387                     "Cannot decrement dirty queue size of vbucket %d by %lld, "
388                     "the current value is %lld\n", id, decrementBy, oldVal);
389                 return false;
390             }
391         } while (!dirtyQueueSize.compare_exchange_strong(oldVal, oldVal - decrementBy));
392         return true;
393     }
394
395     static const vbucket_state_t ACTIVE;
396     static const vbucket_state_t REPLICA;
397     static const vbucket_state_t PENDING;
398     static const vbucket_state_t DEAD;
399
400     HashTable         ht;
401     CheckpointManager checkpointManager;
402     struct {
403         Mutex mutex;
404         std::queue<queued_item> items;
405         bool isBackfillPhase;
406     } backfill;
407
408     KVShard *getShard(void) {
409         return shard;
410     }
411
412     std::queue<queued_item> rejectQueue;
413     FailoverTable *failovers;
414
415     AtomicValue<size_t>  opsCreate;
416     AtomicValue<size_t>  opsUpdate;
417     AtomicValue<size_t>  opsDelete;
418     AtomicValue<size_t>  opsReject;
419
420     AtomicValue<size_t>  dirtyQueueSize;
421     AtomicValue<size_t>  dirtyQueueMem;
422     AtomicValue<size_t>  dirtyQueueFill;
423     AtomicValue<size_t>  dirtyQueueDrain;
424     AtomicValue<uint64_t> dirtyQueueAge;
425     AtomicValue<size_t>  dirtyQueuePendingWrites;
426     AtomicValue<size_t>  metaDataDisk;
427
428     AtomicValue<size_t>  numExpiredItems;
429     AtomicValue<size_t>  fileSpaceUsed;
430     AtomicValue<size_t>  fileSize;
431
432 private:
433     template <typename T>
434     void addStat(const char *nm, const T &val, ADD_STAT add_stat, const void *c);
435
436     void fireAllOps(EventuallyPersistentEngine &engine, ENGINE_ERROR_CODE code);
437
438     void adjustCheckpointFlushTimeout(size_t wall_time);
439
440     int                      id;
441     AtomicValue<vbucket_state_t>  state;
442     RWLock                   stateLock;
443     vbucket_state_t          initialState;
444     Mutex                    pendingOpLock;
445     std::vector<const void*> pendingOps;
446     hrtime_t                 pendingOpsStart;
447     EPStats                 &stats;
448     uint64_t                 purge_seqno;
449
450     Mutex pendingBGFetchesLock;
451     vb_bgfetch_queue_t pendingBGFetches;
452
453     Mutex snapshotMutex;
454     uint64_t cur_snapshot_start;
455     uint64_t cur_snapshot_end;
456
457     Mutex hpChksMutex;
458     std::list<HighPriorityVBEntry> hpChks;
459     AtomicValue<size_t> numHpChks; // size of list hpChks (to avoid MB-9434)
460     KVShard *shard;
461
462     static size_t chkFlushTimeout;
463
464     DISALLOW_COPY_AND_ASSIGN(VBucket);
465 };
466
467 #endif  // SRC_VBUCKET_H_