1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2015 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
23 #include "ep_engine.h"
24 #include "vbucketmap.h"
26 VBucketMap::VBucketMap(Configuration &config,
27 EventuallyPersistentStore &store) :
28 bucketDeletion(new AtomicValue<bool>[config.getMaxVbuckets()]),
29 bucketCreation(new AtomicValue<bool>[config.getMaxVbuckets()]),
30 persistenceSeqnos(new AtomicValue<uint64_t>[config.getMaxVbuckets()]),
31 size(config.getMaxVbuckets())
33 WorkLoadPolicy &workload = store.getEPEngine().getWorkLoadPolicy();
34 for (size_t shardId = 0; shardId < workload.getNumShards(); shardId++) {
35 KVShard *shard = new KVShard(shardId, store);
36 shards.push_back(shard);
39 for (size_t i = 0; i < size; ++i) {
40 bucketDeletion[i].store(false);
41 bucketCreation[i].store(false);
42 persistenceSeqnos[i].store(0);
46 VBucketMap::~VBucketMap() {
47 delete[] bucketDeletion;
48 delete[] bucketCreation;
49 delete[] persistenceSeqnos;
50 while (!shards.empty()) {
56 RCPtr<VBucket> VBucketMap::getBucket(id_type id) const {
57 static RCPtr<VBucket> emptyVBucket;
59 return getShardByVbId(id)->getBucket(id);
65 ENGINE_ERROR_CODE VBucketMap::addBucket(const RCPtr<VBucket> &b) {
66 if (b->getId() < size) {
67 getShardByVbId(b->getId())->setBucket(b);
68 LOG(EXTENSION_LOG_INFO, "Mapped new vbucket %d in state %s",
69 b->getId(), VBucket::toString(b->getState()));
70 return ENGINE_SUCCESS;
72 LOG(EXTENSION_LOG_WARNING,
73 "Cannot create vb %" PRIu16", max vbuckets is %" PRIu16, b->getId(),
78 void VBucketMap::removeBucket(id_type id) {
80 // Theoretically, this could be off slightly. In
81 // practice, this happens only on dead vbuckets.
82 getShardByVbId(id)->resetBucket(id);
86 std::vector<VBucketMap::id_type> VBucketMap::getBuckets(void) const {
87 std::vector<id_type> rv;
88 for (id_type i = 0; i < size; ++i) {
89 RCPtr<VBucket> b(getShardByVbId(i)->getBucket(i));
91 rv.push_back(b->getId());
97 std::vector<VBucketMap::id_type> VBucketMap::getBucketsSortedByState(void) const {
98 std::vector<id_type> rv;
99 for (int state = vbucket_state_active;
100 state <= vbucket_state_dead; ++state) {
101 for (size_t i = 0; i < size; ++i) {
102 RCPtr<VBucket> b = getShardByVbId(i)->getBucket(i);
103 if (b && b->getState() == state) {
104 rv.push_back(b->getId());
111 std::vector<std::pair<VBucketMap::id_type, size_t> >
112 VBucketMap::getActiveVBucketsSortedByChkMgrMem(void) const {
113 std::vector<std::pair<id_type, size_t> > rv;
114 for (id_type i = 0; i < size; ++i) {
115 RCPtr<VBucket> b = getShardByVbId(i)->getBucket(i);
116 if (b && b->getState() == vbucket_state_active) {
117 rv.push_back(std::make_pair(b->getId(), b->getChkMgrMemUsage()));
122 static bool compareSecond(std::pair<id_type, size_t> a,
123 std::pair<id_type, size_t> b) {
124 return (a.second < b.second);
128 std::sort(rv.begin(), rv.end(), SortCtx::compareSecond);
134 VBucketMap::id_type VBucketMap::getSize(void) const {
138 bool VBucketMap::isBucketDeletion(id_type id) const {
139 return bucketDeletion[id].load();
142 bool VBucketMap::setBucketDeletion(id_type id, bool delBucket) {
143 bool inverse = !delBucket;
144 return bucketDeletion[id].compare_exchange_strong(inverse, delBucket);
147 bool VBucketMap::isBucketCreation(id_type id) const {
148 return bucketCreation[id].load();
151 bool VBucketMap::setBucketCreation(id_type id, bool rv) {
153 return bucketCreation[id].compare_exchange_strong(inverse, rv);
156 uint64_t VBucketMap::getPersistenceCheckpointId(id_type id) const {
158 auto vb = getBucket(id);
160 return vb->getPersistenceCheckpointId();
166 void VBucketMap::setPersistenceCheckpointId(id_type id,
167 uint64_t checkpointId) {
169 auto vb = getBucket(id);
171 getBucket(id)->setPersistenceCheckpointId(checkpointId);
176 uint64_t VBucketMap::getPersistenceSeqno(id_type id) const {
177 return persistenceSeqnos[id].load();
180 void VBucketMap::setPersistenceSeqno(id_type id, uint64_t seqno) {
181 persistenceSeqnos[id].store(seqno);
184 void VBucketMap::addBuckets(const std::vector<VBucket*> &newBuckets) {
185 std::vector<VBucket*>::const_iterator it;
186 for (it = newBuckets.begin(); it != newBuckets.end(); ++it) {
187 RCPtr<VBucket> v(*it);
192 KVShard* VBucketMap::getShardByVbId(id_type id) const {
193 return shards[id % shards.size()];
196 KVShard* VBucketMap::getShard(KVShard::id_type shardId) const {
197 return shards[shardId];
200 size_t VBucketMap::getNumShards() const {
201 return shards.size();
204 void VBucketMap::setHLCDriftAheadThreshold(std::chrono::microseconds threshold) {
205 for (id_type id = 0; id < size; id++) {
206 auto vb = getBucket(id);
208 vb->setHLCDriftAheadThreshold(threshold);
213 void VBucketMap::setHLCDriftBehindThreshold(std::chrono::microseconds threshold) {
214 for (id_type id = 0; id < size; id++) {
215 auto vb = getBucket(id);
217 vb->setHLCDriftBehindThreshold(threshold);
222 void VBucketMap::VBucketConfigChangeListener::sizeValueChanged(const std::string &key,
224 if (key == "hlc_drift_ahead_threshold_us") {
225 map.setHLCDriftAheadThreshold(std::chrono::microseconds(value));
226 } else if (key == "hlc_drift_behind_threshold_us") {
227 map.setHLCDriftBehindThreshold(std::chrono::microseconds(value));