1f3bf0d1498fb7da925b001a2b5abc6d4bad411b
[ep-engine.git] / src / vbucketmap.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <vector>
21
22 #include "ep.h"
23 #include "ep_engine.h"
24 #include "vbucketmap.h"
25
26 VBucketMap::VBucketMap(Configuration &config,
27                        EventuallyPersistentStore &store) :
28     bucketDeletion(new AtomicValue<bool>[config.getMaxVbuckets()]),
29     bucketCreation(new AtomicValue<bool>[config.getMaxVbuckets()]),
30     persistenceSeqnos(new AtomicValue<uint64_t>[config.getMaxVbuckets()]),
31     size(config.getMaxVbuckets())
32 {
33     WorkLoadPolicy &workload = store.getEPEngine().getWorkLoadPolicy();
34     for (size_t shardId = 0; shardId < workload.getNumShards(); shardId++) {
35         KVShard *shard = new KVShard(shardId, store);
36         shards.push_back(shard);
37     }
38
39     for (size_t i = 0; i < size; ++i) {
40         bucketDeletion[i].store(false);
41         bucketCreation[i].store(false);
42         persistenceSeqnos[i].store(0);
43     }
44 }
45
46 VBucketMap::~VBucketMap() {
47     delete[] bucketDeletion;
48     delete[] bucketCreation;
49     delete[] persistenceSeqnos;
50     while (!shards.empty()) {
51         delete shards.back();
52         shards.pop_back();
53     }
54 }
55
56 RCPtr<VBucket> VBucketMap::getBucket(id_type id) const {
57     static RCPtr<VBucket> emptyVBucket;
58     if (id < size) {
59         return getShardByVbId(id)->getBucket(id);
60     } else {
61         return emptyVBucket;
62     }
63 }
64
65 ENGINE_ERROR_CODE VBucketMap::addBucket(const RCPtr<VBucket> &b) {
66     if (b->getId() < size) {
67         getShardByVbId(b->getId())->setBucket(b);
68         LOG(EXTENSION_LOG_INFO, "Mapped new vbucket %d in state %s",
69             b->getId(), VBucket::toString(b->getState()));
70         return ENGINE_SUCCESS;
71     }
72     LOG(EXTENSION_LOG_WARNING,
73         "Cannot create vb %" PRIu16", max vbuckets is %" PRIu16, b->getId(),
74         size);
75     return ENGINE_ERANGE;
76 }
77
78 void VBucketMap::removeBucket(id_type id) {
79     if (id < size) {
80         // Theoretically, this could be off slightly.  In
81         // practice, this happens only on dead vbuckets.
82         getShardByVbId(id)->resetBucket(id);
83     }
84 }
85
86 std::vector<VBucketMap::id_type> VBucketMap::getBuckets(void) const {
87     std::vector<id_type> rv;
88     for (id_type i = 0; i < size; ++i) {
89         RCPtr<VBucket> b(getShardByVbId(i)->getBucket(i));
90         if (b) {
91             rv.push_back(b->getId());
92         }
93     }
94     return rv;
95 }
96
97 std::vector<VBucketMap::id_type> VBucketMap::getBucketsSortedByState(void) const {
98     std::vector<id_type> rv;
99     for (int state = vbucket_state_active;
100          state <= vbucket_state_dead; ++state) {
101         for (size_t i = 0; i < size; ++i) {
102             RCPtr<VBucket> b = getShardByVbId(i)->getBucket(i);
103             if (b && b->getState() == state) {
104                 rv.push_back(b->getId());
105             }
106         }
107     }
108     return rv;
109 }
110
111 std::vector<std::pair<VBucketMap::id_type, size_t> >
112 VBucketMap::getActiveVBucketsSortedByChkMgrMem(void) const {
113     std::vector<std::pair<id_type, size_t> > rv;
114     for (id_type i = 0; i < size; ++i) {
115         RCPtr<VBucket> b = getShardByVbId(i)->getBucket(i);
116         if (b && b->getState() == vbucket_state_active) {
117             rv.push_back(std::make_pair(b->getId(), b->getChkMgrMemUsage()));
118         }
119     }
120
121     struct SortCtx {
122         static bool compareSecond(std::pair<id_type, size_t> a,
123                                   std::pair<id_type, size_t> b) {
124             return (a.second < b.second);
125         }
126     };
127
128     std::sort(rv.begin(), rv.end(), SortCtx::compareSecond);
129
130     return rv;
131 }
132
133
134 VBucketMap::id_type VBucketMap::getSize(void) const {
135     return size;
136 }
137
138 bool VBucketMap::isBucketDeletion(id_type id) const {
139     return bucketDeletion[id].load();
140 }
141
142 bool VBucketMap::setBucketDeletion(id_type id, bool delBucket) {
143     bool inverse = !delBucket;
144     return bucketDeletion[id].compare_exchange_strong(inverse, delBucket);
145 }
146
147 bool VBucketMap::isBucketCreation(id_type id) const {
148     return bucketCreation[id].load();
149 }
150
151 bool VBucketMap::setBucketCreation(id_type id, bool rv) {
152     bool inverse = !rv;
153     return bucketCreation[id].compare_exchange_strong(inverse, rv);
154 }
155
156 uint64_t VBucketMap::getPersistenceCheckpointId(id_type id) const {
157     if (id < size) {
158         auto vb = getBucket(id);
159         if (vb) {
160             return vb->getPersistenceCheckpointId();
161         }
162     }
163     return {};
164 }
165
166 void VBucketMap::setPersistenceCheckpointId(id_type id,
167                                             uint64_t checkpointId) {
168     if (id < size) {
169         auto vb = getBucket(id);
170         if (vb) {
171             getBucket(id)->setPersistenceCheckpointId(checkpointId);
172         }
173     }
174 }
175
176 uint64_t VBucketMap::getPersistenceSeqno(id_type id) const {
177     return persistenceSeqnos[id].load();
178 }
179
180 void VBucketMap::setPersistenceSeqno(id_type id, uint64_t seqno) {
181     persistenceSeqnos[id].store(seqno);
182 }
183
184 void VBucketMap::addBuckets(const std::vector<VBucket*> &newBuckets) {
185     std::vector<VBucket*>::const_iterator it;
186     for (it = newBuckets.begin(); it != newBuckets.end(); ++it) {
187         RCPtr<VBucket> v(*it);
188         addBucket(v);
189     }
190 }
191
192 KVShard* VBucketMap::getShardByVbId(id_type id) const {
193     return shards[id % shards.size()];
194 }
195
196 KVShard* VBucketMap::getShard(KVShard::id_type shardId) const {
197     return shards[shardId];
198 }
199
200 size_t VBucketMap::getNumShards() const {
201     return shards.size();
202 }
203
204 void VBucketMap::setHLCDriftAheadThreshold(uint64_t threshold) {
205     for (id_type id = 0; id < size; id++) {
206         auto vb = getBucket(id);
207         if (vb) {
208             vb->setHLCDriftAheadThreshold(threshold);
209         }
210     }
211 }
212
213 void VBucketMap::setHLCDriftBehindThreshold(uint64_t threshold) {
214     for (id_type id = 0; id < size; id++) {
215         auto vb = getBucket(id);
216         if (vb) {
217             vb->setHLCDriftBehindThreshold(threshold);
218         }
219     }
220 }
221
222 void VBucketMap::VBucketConfigChangeListener::sizeValueChanged(const std::string &key,
223                                                    size_t value) {
224     if (key == "hlc_drift_ahead_threshold_us") {
225         map.setHLCDriftAheadThreshold(value);
226     } else if (key == "hlc_drift_behind_threshold_us") {
227         map.setHLCDriftBehindThreshold(value);
228     }
229 }