MB-20819: Use std::chrono::system_clock for HLC
[ep-engine.git] / src / vbucket.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <functional>
21 #include <list>
22 #include <set>
23 #include <string>
24 #include <vector>
25
26 #include "atomic.h"
27 #include "bgfetcher.h"
28 #include "ep_engine.h"
29
30 #define STATWRITER_NAMESPACE vbucket
31 #include "statwriter.h"
32 #undef STATWRITER_NAMESPACE
33
34 #include "vbucket.h"
35
36 VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
37     std::vector<uint16_t> tmp(acceptable.size() + other.size());
38     std::vector<uint16_t>::iterator end;
39     end = std::set_symmetric_difference(acceptable.begin(),
40                                         acceptable.end(),
41                                         other.acceptable.begin(),
42                                         other.acceptable.end(),
43                                         tmp.begin());
44     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
45 }
46
47 VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
48                                                                         const {
49     std::vector<uint16_t> tmp(acceptable.size() + other.size());
50     std::vector<uint16_t>::iterator end;
51
52     end = std::set_intersection(acceptable.begin(), acceptable.end(),
53                                 other.acceptable.begin(),
54                                 other.acceptable.end(),
55                                 tmp.begin());
56     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
57 }
58
59 static bool isRange(std::set<uint16_t>::const_iterator it,
60                     const std::set<uint16_t>::const_iterator &end,
61                     size_t &length)
62 {
63     length = 0;
64     for (uint16_t val = *it;
65          it != end && (val + length) == *it;
66          ++it, ++length) {
67         // empty
68     }
69
70     --length;
71
72     return length > 1;
73 }
74
75 std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
76 {
77     std::set<uint16_t>::const_iterator it;
78
79     if (filter.acceptable.empty()) {
80         out << "{ empty }";
81     } else {
82         bool needcomma = false;
83         out << "{ ";
84         for (it = filter.acceptable.begin();
85              it != filter.acceptable.end();
86              ++it) {
87             if (needcomma) {
88                 out << ", ";
89             }
90
91             size_t length;
92             if (isRange(it, filter.acceptable.end(), length)) {
93                 std::set<uint16_t>::iterator last = it;
94                 for (size_t i = 0; i < length; ++i) {
95                     ++last;
96                 }
97                 out << "[" << *it << "," << *last << "]";
98                 it = last;
99             } else {
100                 out << *it;
101             }
102             needcomma = true;
103         }
104         out << " }";
105     }
106
107     return out;
108 }
109
110 size_t VBucket::chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
111
112 const vbucket_state_t VBucket::ACTIVE =
113                      static_cast<vbucket_state_t>(htonl(vbucket_state_active));
114 const vbucket_state_t VBucket::REPLICA =
115                     static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
116 const vbucket_state_t VBucket::PENDING =
117                     static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
118 const vbucket_state_t VBucket::DEAD =
119                     static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
120
121 VBucket::~VBucket() {
122     if (!pendingOps.empty() || !pendingBGFetches.empty()) {
123         LOG(EXTENSION_LOG_WARNING,
124             "Have %ld pending ops and %ld pending reads "
125             "while destroying vbucket\n",
126             pendingOps.size(), pendingBGFetches.size());
127     }
128
129     stats.decrDiskQueueSize(dirtyQueueSize.load());
130
131     size_t num_pending_fetches = 0;
132     vb_bgfetch_queue_t::iterator itr = pendingBGFetches.begin();
133     for (; itr != pendingBGFetches.end(); ++itr) {
134         vb_bgfetch_item_ctx_t &bg_itm_ctx = itr->second;
135         std::list<VBucketBGFetchItem *> &bgitems = bg_itm_ctx.bgfetched_list;
136         std::list<VBucketBGFetchItem *>::iterator vit = bgitems.begin();
137         for (; vit != bgitems.end(); ++vit) {
138             delete (*vit);
139             ++num_pending_fetches;
140         }
141     }
142     stats.numRemainingBgJobs.fetch_sub(num_pending_fetches);
143     pendingBGFetches.clear();
144     delete failovers;
145
146     // Clear out the bloomfilter(s)
147     clearFilter();
148
149     stats.memOverhead.fetch_sub(sizeof(VBucket) + ht.memorySize() +
150                                 sizeof(CheckpointManager));
151
152     LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
153 }
154
155 void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
156                          ENGINE_ERROR_CODE code) {
157     LockHolder lh(pendingOpLock);
158
159     if (pendingOpsStart > 0) {
160         hrtime_t now = gethrtime();
161         if (now > pendingOpsStart) {
162             hrtime_t d = (now - pendingOpsStart) / 1000;
163             stats.pendingOpsHisto.add(d);
164             atomic_setIfBigger(stats.pendingOpsMaxDuration, d);
165         }
166     } else {
167         return;
168     }
169
170     pendingOpsStart = 0;
171     stats.pendingOps.fetch_sub(pendingOps.size());
172     atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
173
174     while (!pendingOps.empty()) {
175         const void *pendingOperation = pendingOps.back();
176         pendingOps.pop_back();
177         // We don't want to hold the pendingOpLock when
178         // calling notifyIOComplete.
179         lh.unlock();
180         engine.notifyIOComplete(pendingOperation, code);
181         lh.lock();
182     }
183
184     LOG(EXTENSION_LOG_INFO,
185         "Fired pendings ops for vbucket %" PRIu16 " in state %s\n",
186         id, VBucket::toString(state));
187 }
188
189 void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
190
191     if (state == vbucket_state_active) {
192         fireAllOps(engine, ENGINE_SUCCESS);
193     } else if (state == vbucket_state_pending) {
194         // Nothing
195     } else {
196         fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
197     }
198 }
199
200 void VBucket::setState(vbucket_state_t to) {
201     vbucket_state_t oldstate;
202     {
203         WriterLockHolder wlh(stateLock);
204         oldstate = state;
205
206         if (to == vbucket_state_active &&
207             checkpointManager.getOpenCheckpointId() < 2) {
208             checkpointManager.setOpenCheckpointId(2);
209         }
210
211         LOG(EXTENSION_LOG_NOTICE,
212             "VBucket::setState: transitioning vbucket:%" PRIu16 " from:%s to:%s",
213             id, VBucket::toString(oldstate), VBucket::toString(to));
214
215         state = to;
216     }
217 }
218
219 void VBucket::doStatsForQueueing(Item& qi, size_t itemBytes)
220 {
221     ++dirtyQueueSize;
222     dirtyQueueMem.fetch_add(sizeof(Item));
223     ++dirtyQueueFill;
224     dirtyQueueAge.fetch_add(qi.getQueuedTime());
225     dirtyQueuePendingWrites.fetch_add(itemBytes);
226 }
227
228 void VBucket::doStatsForFlushing(Item& qi, size_t itemBytes)
229 {
230     decrDirtyQueueSize(1);
231     decrDirtyQueueMem(sizeof(Item));
232     ++dirtyQueueDrain;
233     decrDirtyQueueAge(qi.getQueuedTime());
234     decrDirtyQueuePendingWrites(itemBytes);
235 }
236
237 void VBucket::incrMetaDataDisk(Item& qi)
238 {
239     metaDataDisk.fetch_add(qi.getNKey() + sizeof(ItemMetaData));
240 }
241
242 void VBucket::decrMetaDataDisk(Item& qi)
243 {
244     // assume couchstore remove approx this much data from disk
245     metaDataDisk.fetch_sub((qi.getNKey() + sizeof(ItemMetaData)));
246 }
247
248 void VBucket::resetStats() {
249     opsCreate.store(0);
250     opsUpdate.store(0);
251     opsDelete.store(0);
252     opsReject.store(0);
253
254     stats.decrDiskQueueSize(dirtyQueueSize.load());
255     dirtyQueueSize.store(0);
256     dirtyQueueMem.store(0);
257     dirtyQueueFill.store(0);
258     dirtyQueueAge.store(0);
259     dirtyQueuePendingWrites.store(0);
260     dirtyQueueDrain.store(0);
261     fileSpaceUsed = 0;
262     fileSize = 0;
263 }
264
265 template <typename T>
266 void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
267                       const void *c) {
268     std::stringstream name;
269     name << "vb_" << id;
270     if (nm != NULL) {
271         name << ":" << nm;
272     }
273     std::stringstream value;
274     value << val;
275     std::string n = name.str();
276     add_casted_stat(n.data(), value.str().data(), add_stat, c);
277 }
278
279 size_t VBucket::queueBGFetchItem(const std::string &key,
280                                  VBucketBGFetchItem *fetch,
281                                  BgFetcher *bgFetcher) {
282     LockHolder lh(pendingBGFetchesLock);
283     vb_bgfetch_item_ctx_t& bgfetch_itm_ctx = pendingBGFetches[key];
284
285     if (bgfetch_itm_ctx.bgfetched_list.empty()) {
286         bgfetch_itm_ctx.isMetaOnly = true;
287     }
288
289     bgfetch_itm_ctx.bgfetched_list.push_back(fetch);
290
291     if (!fetch->metaDataOnly) {
292         bgfetch_itm_ctx.isMetaOnly = false;
293     }
294     bgFetcher->addPendingVB(id);
295     return pendingBGFetches.size();
296 }
297
298 bool VBucket::getBGFetchItems(vb_bgfetch_queue_t &fetches) {
299     LockHolder lh(pendingBGFetchesLock);
300     fetches.insert(pendingBGFetches.begin(), pendingBGFetches.end());
301     pendingBGFetches.clear();
302     return fetches.size() > 0;
303 }
304
305 void VBucket::addHighPriorityVBEntry(uint64_t id, const void *cookie,
306                                      bool isBySeqno) {
307     LockHolder lh(hpChksMutex);
308     if (shard) {
309         ++shard->highPriorityCount;
310     }
311     hpChks.push_back(HighPriorityVBEntry(cookie, id, isBySeqno));
312     numHpChks.store(hpChks.size());
313 }
314
315 void VBucket::notifyOnPersistence(EventuallyPersistentEngine &e,
316                                   uint64_t idNum,
317                                   bool isBySeqno) {
318     LockHolder lh(hpChksMutex);
319     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
320     std::list<HighPriorityVBEntry>::iterator entry = hpChks.begin();
321
322     std::string logStr(isBySeqno
323                        ? "seqno persistence"
324                        : "checkpoint persistence");
325
326     while (entry != hpChks.end()) {
327         if (isBySeqno != entry->isBySeqno_) {
328             ++entry;
329             continue;
330         }
331
332         std::string logStr(isBySeqno ?
333                            "seqno persistence" :
334                            "checkpoint persistence");
335
336         hrtime_t wall_time(gethrtime() - entry->start);
337         size_t spent = wall_time / 1000000000;
338         if (entry->id <= idNum) {
339             toNotify[entry->cookie] = ENGINE_SUCCESS;
340             stats.chkPersistenceHisto.add(wall_time / 1000);
341             adjustCheckpointFlushTimeout(wall_time / 1000000000);
342             LOG(EXTENSION_LOG_NOTICE, "Notified the completion of %s "
343                 "for vbucket %" PRIu16 ", Check for: %" PRIu64 ", "
344                 "Persisted upto: %" PRIu64 ", cookie %p",
345                 logStr.c_str(), id, entry->id, idNum, entry->cookie);
346             entry = hpChks.erase(entry);
347             if (shard) {
348                 --shard->highPriorityCount;
349             }
350         } else if (spent > getCheckpointFlushTimeout()) {
351             adjustCheckpointFlushTimeout(spent);
352             e.storeEngineSpecific(entry->cookie, NULL);
353             toNotify[entry->cookie] = ENGINE_TMPFAIL;
354             LOG(EXTENSION_LOG_WARNING, "Notified the timeout on %s "
355                 "for vbucket %" PRIu16 ", Check for: %" PRIu64 ", "
356                 "Persisted upto: %" PRIu64 ", cookie %p",
357                 logStr.c_str(), id, entry->id, idNum, entry->cookie);
358             entry = hpChks.erase(entry);
359             if (shard) {
360                 --shard->highPriorityCount;
361             }
362         } else {
363             ++entry;
364         }
365     }
366     numHpChks.store(hpChks.size());
367     lh.unlock();
368
369     std::map<const void*, ENGINE_ERROR_CODE>::iterator itr = toNotify.begin();
370     for (; itr != toNotify.end(); ++itr) {
371         e.notifyIOComplete(itr->first, itr->second);
372     }
373
374 }
375
376 void VBucket::notifyAllPendingConnsFailed(EventuallyPersistentEngine &e) {
377     LockHolder lh(hpChksMutex);
378     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
379     std::list<HighPriorityVBEntry>::iterator entry = hpChks.begin();
380     while (entry != hpChks.end()) {
381         toNotify[entry->cookie] = ENGINE_TMPFAIL;
382         e.storeEngineSpecific(entry->cookie, NULL);
383         entry = hpChks.erase(entry);
384         if (shard) {
385             --shard->highPriorityCount;
386         }
387     }
388     lh.unlock();
389
390     std::map<const void*, ENGINE_ERROR_CODE>::iterator itr = toNotify.begin();
391     for (; itr != toNotify.end(); ++itr) {
392         e.notifyIOComplete(itr->first, itr->second);
393     }
394
395     fireAllOps(e);
396 }
397
398 void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
399     size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
400
401     if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
402         chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
403     } else if (wall_time <= middle) {
404         chkFlushTimeout = middle;
405     } else {
406         chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
407     }
408 }
409
410 size_t VBucket::getHighPriorityChkSize() {
411     return numHpChks.load();
412 }
413
414 size_t VBucket::getCheckpointFlushTimeout() {
415     return chkFlushTimeout;
416 }
417
418 size_t VBucket::getNumItems(item_eviction_policy_t policy) {
419     if (policy == VALUE_ONLY) {
420         return ht.getNumInMemoryItems();
421     } else {
422         return ht.getNumItems();
423     }
424 }
425
426 size_t VBucket::getNumNonResidentItems(item_eviction_policy_t policy) {
427     if (policy == VALUE_ONLY) {
428         return ht.getNumInMemoryNonResItems();
429     } else {
430         size_t num_items = ht.getNumItems();
431         size_t num_res_items = ht.getNumInMemoryItems() -
432                                ht.getNumInMemoryNonResItems();
433         return num_items > num_res_items ? (num_items - num_res_items) : 0;
434     }
435 }
436
437 bool VBucket::isResidentRatioUnderThreshold(float threshold,
438                                             item_eviction_policy_t policy) {
439     if (policy != FULL_EVICTION) {
440         throw std::invalid_argument("VBucket::isResidentRatioUnderThreshold: "
441                 "policy (which is " + std::to_string(policy) +
442                 ") must be FULL_EVICTION");
443     }
444     size_t num_items = getNumItems(policy);
445     size_t num_non_resident_items = getNumNonResidentItems(policy);
446     if (threshold >= ((float)(num_items - num_non_resident_items) /
447                                                                 num_items)) {
448         return true;
449     } else {
450         return false;
451     }
452 }
453
454 void VBucket::createFilter(size_t key_count, double probability) {
455     // Create the actual bloom filter upon vbucket creation during
456     // scenarios:
457     //      - Bucket creation
458     //      - Rebalance
459     LockHolder lh(bfMutex);
460     if (bFilter == nullptr && tempFilter == nullptr) {
461         bFilter = new BloomFilter(key_count, probability, BFILTER_ENABLED);
462     } else {
463         LOG(EXTENSION_LOG_WARNING, "(vb %" PRIu16 ") Bloom filter / Temp filter"
464             " already exist!", id);
465     }
466 }
467
468 void VBucket::initTempFilter(size_t key_count, double probability) {
469     // Create a temp bloom filter with status as COMPACTING,
470     // if the main filter is found to exist, set its state to
471     // COMPACTING as well.
472     LockHolder lh(bfMutex);
473     if (tempFilter) {
474         delete tempFilter;
475     }
476     tempFilter = new BloomFilter(key_count, probability, BFILTER_COMPACTING);
477     if (bFilter) {
478         bFilter->setStatus(BFILTER_COMPACTING);
479     }
480 }
481
482 void VBucket::addToFilter(const std::string &key) {
483     LockHolder lh(bfMutex);
484     if (bFilter) {
485         bFilter->addKey(key.c_str(), key.length());
486     }
487
488     // If the temp bloom filter is not found to be NULL,
489     // it means that compaction is running on the particular
490     // vbucket. Therefore add the key to the temp filter as
491     // well, as once compaction completes the temp filter
492     // will replace the main bloom filter.
493     if (tempFilter) {
494         tempFilter->addKey(key.c_str(), key.length());
495     }
496 }
497
498 bool VBucket::maybeKeyExistsInFilter(const std::string &key) {
499     LockHolder lh(bfMutex);
500     if (bFilter) {
501         return bFilter->maybeKeyExists(key.c_str(), key.length());
502     } else {
503         // If filter doesn't exist, allow the BgFetch to go through.
504         return true;
505     }
506 }
507
508 bool VBucket::isTempFilterAvailable() {
509     LockHolder lh(bfMutex);
510     if (tempFilter &&
511         (tempFilter->getStatus() == BFILTER_COMPACTING ||
512          tempFilter->getStatus() == BFILTER_ENABLED)) {
513         return true;
514     } else {
515         return false;
516     }
517 }
518
519 void VBucket::addToTempFilter(const std::string &key) {
520     // Keys will be added to only the temp filter during
521     // compaction.
522     LockHolder lh(bfMutex);
523     if (tempFilter) {
524         tempFilter->addKey(key.c_str(), key.length());
525     }
526 }
527
528 void VBucket::swapFilter() {
529     // Delete the main bloom filter and replace it with
530     // the temp filter that was populated during compaction,
531     // only if the temp filter's state is found to be either at
532     // COMPACTING or ENABLED (if in the case the user enables
533     // bloomfilters for some reason while compaction was running).
534     // Otherwise, it indicates that the filter's state was
535     // possibly disabled during compaction, therefore clear out
536     // the temp filter. If it gets enabled at some point, a new
537     // bloom filter will be made available after the next
538     // compaction.
539
540     LockHolder lh(bfMutex);
541     if (bFilter) {
542         delete bFilter;
543         bFilter = NULL;
544     }
545     if (tempFilter &&
546         (tempFilter->getStatus() == BFILTER_COMPACTING ||
547          tempFilter->getStatus() == BFILTER_ENABLED)) {
548         bFilter = tempFilter;
549         tempFilter = NULL;
550         bFilter->setStatus(BFILTER_ENABLED);
551     } else if (tempFilter) {
552         delete tempFilter;
553         tempFilter = NULL;
554     }
555 }
556
557 void VBucket::clearFilter() {
558     LockHolder lh(bfMutex);
559     if (bFilter) {
560         delete bFilter;
561         bFilter = NULL;
562     }
563     if (tempFilter) {
564         delete tempFilter;
565         tempFilter = NULL;
566     }
567 }
568
569 void VBucket::setFilterStatus(bfilter_status_t to) {
570     LockHolder lh(bfMutex);
571     if (bFilter) {
572         bFilter->setStatus(to);
573     }
574     if (tempFilter) {
575         tempFilter->setStatus(to);
576     }
577 }
578
579 std::string VBucket::getFilterStatusString() {
580     LockHolder lh(bfMutex);
581     if (bFilter) {
582         return bFilter->getStatusString();
583     } else if (tempFilter) {
584         return tempFilter->getStatusString();
585     } else {
586         return "DOESN'T EXIST";
587     }
588 }
589
590 size_t VBucket::getFilterSize() {
591     if (bFilter) {
592         return bFilter->getFilterSize();
593     } else {
594         return 0;
595     }
596 }
597
598 size_t VBucket::getNumOfKeysInFilter() {
599     if (bFilter) {
600         return bFilter->getNumOfKeysInFilter();
601     } else {
602         return 0;
603     }
604 }
605
606 uint64_t VBucket::nextHLCCas() {
607     // Create a monotonic timestamp using part of the HLC algorithm by.
608     // a) Reading system time (now)
609     // b) dropping 16-bits (masking 65355 ┬Ás)
610     // c) comparing it with the last known time (max_cas)
611     // d) returning either now or max_cas + 1
612     auto now = std::chrono::duration_cast<std::chrono::microseconds>
613             (std::chrono::system_clock::now().time_since_epoch()).count();
614
615     uint64_t now48bits = ((uint64_t)now) & ~((1 << 16) - 1);
616     uint64_t local_max_cas = max_cas.load();
617
618     if (now48bits > local_max_cas) {
619         atomic_setIfBigger(max_cas, now48bits);
620         return now48bits;
621     }
622
623     atomic_setIfBigger(max_cas, local_max_cas + 1);
624     return local_max_cas + 1;
625 }
626
627 void VBucket::addStats(bool details, ADD_STAT add_stat, const void *c,
628                        item_eviction_policy_t policy) {
629     addStat(NULL, toString(state), add_stat, c);
630     if (details) {
631         size_t numItems = getNumItems(policy);
632         size_t tempItems = getNumTempItems();
633         addStat("num_items", numItems, add_stat, c);
634         addStat("num_temp_items", tempItems, add_stat, c);
635         addStat("num_non_resident", getNumNonResidentItems(policy),
636                 add_stat, c);
637         addStat("ht_memory", ht.memorySize(), add_stat, c);
638         addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
639         addStat("ht_cache_size", ht.cacheSize, add_stat, c);
640         addStat("num_ejects", ht.getNumEjects(), add_stat, c);
641         addStat("ops_create", opsCreate, add_stat, c);
642         addStat("ops_update", opsUpdate, add_stat, c);
643         addStat("ops_delete", opsDelete, add_stat, c);
644         addStat("ops_reject", opsReject, add_stat, c);
645         addStat("queue_size", dirtyQueueSize, add_stat, c);
646         addStat("queue_memory", dirtyQueueMem, add_stat, c);
647         addStat("queue_fill", dirtyQueueFill, add_stat, c);
648         addStat("queue_drain", dirtyQueueDrain, add_stat, c);
649         addStat("queue_age", getQueueAge(), add_stat, c);
650         addStat("pending_writes", dirtyQueuePendingWrites, add_stat, c);
651         addStat("db_data_size", fileSpaceUsed, add_stat, c);
652         addStat("db_file_size", fileSize, add_stat, c);
653         addStat("high_seqno", getHighSeqno(), add_stat, c);
654         addStat("uuid", failovers->getLatestUUID(), add_stat, c);
655         addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
656         addStat("bloom_filter", getFilterStatusString().data(),
657                 add_stat, c);
658         addStat("bloom_filter_size", getFilterSize(), add_stat, c);
659         addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
660         addStat("max_cas", getMaxCas(), add_stat, c);
661         addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
662     }
663 }
664
665 void VBucket::decrDirtyQueueMem(size_t decrementBy)
666 {
667     size_t oldVal, newVal;
668     do {
669         oldVal = dirtyQueueMem.load(std::memory_order_relaxed);
670         if (oldVal < decrementBy) {
671             newVal = 0;
672         } else {
673             newVal = oldVal - decrementBy;
674         }
675     } while (!dirtyQueueMem.compare_exchange_strong(oldVal, newVal));
676 }
677
678 void VBucket::decrDirtyQueueAge(uint32_t decrementBy)
679 {
680     uint64_t oldVal, newVal;
681     do {
682         oldVal = dirtyQueueAge.load(std::memory_order_relaxed);
683         if (oldVal < decrementBy) {
684             newVal = 0;
685         } else {
686             newVal = oldVal - decrementBy;
687         }
688     } while (!dirtyQueueAge.compare_exchange_strong(oldVal, newVal));
689 }
690
691 void VBucket::decrDirtyQueuePendingWrites(size_t decrementBy)
692 {
693     size_t oldVal, newVal;
694     do {
695         oldVal = dirtyQueuePendingWrites.load(std::memory_order_relaxed);
696         if (oldVal < decrementBy) {
697             newVal = 0;
698         } else {
699             newVal = oldVal - decrementBy;
700         }
701     } while (!dirtyQueuePendingWrites.compare_exchange_strong(oldVal, newVal));
702 }