39aa804a58bd0ab9a9c0229e9b8f7618cb249833
[ep-engine.git] / src / vbucket.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <functional>
21 #include <list>
22 #include <set>
23 #include <string>
24
25 #include "ep_engine.h"
26 #include "failover-table.h"
27 #define STATWRITER_NAMESPACE vbucket
28 #include "statwriter.h"
29 #undef STATWRITER_NAMESPACE
30 #include "vbucket.h"
31
32 VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
33     std::vector<uint16_t> tmp(acceptable.size() + other.size());
34     std::vector<uint16_t>::iterator end;
35     end = std::set_symmetric_difference(acceptable.begin(),
36                                         acceptable.end(),
37                                         other.acceptable.begin(),
38                                         other.acceptable.end(),
39                                         tmp.begin());
40     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
41 }
42
43 VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
44                                                                         const {
45     std::vector<uint16_t> tmp(acceptable.size() + other.size());
46     std::vector<uint16_t>::iterator end;
47
48     end = std::set_intersection(acceptable.begin(), acceptable.end(),
49                                 other.acceptable.begin(),
50                                 other.acceptable.end(),
51                                 tmp.begin());
52     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
53 }
54
55 static bool isRange(std::set<uint16_t>::const_iterator it,
56                     const std::set<uint16_t>::const_iterator &end,
57                     size_t &length)
58 {
59     length = 0;
60     for (uint16_t val = *it;
61          it != end && (val + length) == *it;
62          ++it, ++length) {
63         // empty
64     }
65
66     --length;
67
68     return length > 1;
69 }
70
71 std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
72 {
73     std::set<uint16_t>::const_iterator it;
74
75     if (filter.acceptable.empty()) {
76         out << "{ empty }";
77     } else {
78         bool needcomma = false;
79         out << "{ ";
80         for (it = filter.acceptable.begin();
81              it != filter.acceptable.end();
82              ++it) {
83             if (needcomma) {
84                 out << ", ";
85             }
86
87             size_t length;
88             if (isRange(it, filter.acceptable.end(), length)) {
89                 std::set<uint16_t>::iterator last = it;
90                 for (size_t i = 0; i < length; ++i) {
91                     ++last;
92                 }
93                 out << "[" << *it << "," << *last << "]";
94                 it = last;
95             } else {
96                 out << *it;
97             }
98             needcomma = true;
99         }
100         out << " }";
101     }
102
103     return out;
104 }
105
106 size_t VBucket::chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
107
108 const vbucket_state_t VBucket::ACTIVE =
109                      static_cast<vbucket_state_t>(htonl(vbucket_state_active));
110 const vbucket_state_t VBucket::REPLICA =
111                     static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
112 const vbucket_state_t VBucket::PENDING =
113                     static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
114 const vbucket_state_t VBucket::DEAD =
115                     static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
116
117 VBucket::~VBucket() {
118     if (!pendingOps.empty() || !pendingBGFetches.empty()) {
119         LOG(EXTENSION_LOG_WARNING,
120             "Have %ld pending ops and %ld pending reads "
121             "while destroying vbucket\n",
122             pendingOps.size(), pendingBGFetches.size());
123     }
124
125     stats.decrDiskQueueSize(dirtyQueueSize.load());
126
127     size_t num_pending_fetches = 0;
128     vb_bgfetch_queue_t::iterator itr = pendingBGFetches.begin();
129     for (; itr != pendingBGFetches.end(); ++itr) {
130         std::list<VBucketBGFetchItem *> &bgitems = itr->second;
131         std::list<VBucketBGFetchItem *>::iterator vit = bgitems.begin();
132         for (; vit != bgitems.end(); ++vit) {
133             delete (*vit);
134             ++num_pending_fetches;
135         }
136     }
137     stats.numRemainingBgJobs.fetch_sub(num_pending_fetches);
138     pendingBGFetches.clear();
139     delete failovers;
140
141     stats.memOverhead.fetch_sub(sizeof(VBucket) + ht.memorySize() + sizeof(CheckpointManager));
142     cb_assert(stats.memOverhead.load() < GIGANTOR);
143
144     LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
145 }
146
147 void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
148                          ENGINE_ERROR_CODE code) {
149     if (pendingOpsStart > 0) {
150         hrtime_t now = gethrtime();
151         if (now > pendingOpsStart) {
152             hrtime_t d = (now - pendingOpsStart) / 1000;
153             stats.pendingOpsHisto.add(d);
154             atomic_setIfBigger(stats.pendingOpsMaxDuration, d);
155         }
156     } else {
157         return;
158     }
159
160     pendingOpsStart = 0;
161     stats.pendingOps.fetch_sub(pendingOps.size());
162     atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
163
164     engine.notifyIOComplete(pendingOps, code);
165     pendingOps.clear();
166
167     LOG(EXTENSION_LOG_INFO,
168         "Fired pendings ops for vbucket %d in state %s\n",
169         id, VBucket::toString(state));
170 }
171
172 void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
173     LockHolder lh(pendingOpLock);
174
175     if (state == vbucket_state_active) {
176         fireAllOps(engine, ENGINE_SUCCESS);
177     } else if (state == vbucket_state_pending) {
178         // Nothing
179     } else {
180         fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
181     }
182 }
183
184 void VBucket::setState(vbucket_state_t to, SERVER_HANDLE_V1 *sapi) {
185     cb_assert(sapi);
186     WriterLockHolder wlh(stateLock);
187     vbucket_state_t oldstate(state);
188
189     if (to == vbucket_state_active &&
190         checkpointManager.getOpenCheckpointId() < 2) {
191         checkpointManager.setOpenCheckpointId(2);
192     }
193
194     if (oldstate == vbucket_state_active) {
195         uint64_t highSeqno = (uint64_t)checkpointManager.getHighSeqno();
196         setCurrentSnapshot(highSeqno, highSeqno);
197     }
198
199     LOG(EXTENSION_LOG_DEBUG, "transitioning vbucket %d from %s to %s",
200         id, VBucket::toString(oldstate), VBucket::toString(to));
201
202     state = to;
203 }
204
205 void VBucket::doStatsForQueueing(Item& qi, size_t itemBytes)
206 {
207     ++dirtyQueueSize;
208     dirtyQueueMem.fetch_add(sizeof(Item));
209     ++dirtyQueueFill;
210     dirtyQueueAge.fetch_add(qi.getQueuedTime());
211     dirtyQueuePendingWrites.fetch_add(itemBytes);
212 }
213
214 void VBucket::doStatsForFlushing(Item& qi, size_t itemBytes)
215 {
216     decrDirtyQueueSize(1);
217     if (dirtyQueueMem > sizeof(Item)) {
218         dirtyQueueMem.fetch_sub(sizeof(Item));
219     } else {
220         dirtyQueueMem.store(0);
221     }
222     ++dirtyQueueDrain;
223
224     if (dirtyQueueAge > qi.getQueuedTime()) {
225         dirtyQueueAge.fetch_sub(qi.getQueuedTime());
226     } else {
227         dirtyQueueAge.store(0);
228     }
229
230     if (dirtyQueuePendingWrites > itemBytes) {
231         dirtyQueuePendingWrites.fetch_sub(itemBytes);
232     } else {
233         dirtyQueuePendingWrites.store(0);
234     }
235 }
236
237 void VBucket::incrMetaDataDisk(Item& qi)
238 {
239     metaDataDisk.fetch_add(qi.getNKey() + sizeof(ItemMetaData));
240 }
241
242 void VBucket::decrMetaDataDisk(Item& qi)
243 {
244     // assume couchstore remove approx this much data from disk
245     metaDataDisk.fetch_sub((qi.getNKey() + sizeof(ItemMetaData)));
246 }
247
248 void VBucket::resetStats() {
249     opsCreate.store(0);
250     opsUpdate.store(0);
251     opsDelete.store(0);
252     opsReject.store(0);
253
254     stats.decrDiskQueueSize(dirtyQueueSize.load());
255     dirtyQueueSize.store(0);
256     dirtyQueueMem.store(0);
257     dirtyQueueFill.store(0);
258     dirtyQueueAge.store(0);
259     dirtyQueuePendingWrites.store(0);
260     dirtyQueueDrain.store(0);
261     fileSpaceUsed = 0;
262     fileSize = 0;
263 }
264
265 template <typename T>
266 void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
267                       const void *c) {
268     std::stringstream name;
269     name << "vb_" << id;
270     if (nm != NULL) {
271         name << ":" << nm;
272     }
273     std::stringstream value;
274     value << val;
275     std::string n = name.str();
276     add_casted_stat(n.data(), value.str().data(), add_stat, c);
277 }
278
279 void VBucket::queueBGFetchItem(const std::string &key,
280                                VBucketBGFetchItem *fetch,
281                                BgFetcher *bgFetcher) {
282     LockHolder lh(pendingBGFetchesLock);
283     pendingBGFetches[key].push_back(fetch);
284     bgFetcher->addPendingVB(id);
285     lh.unlock();
286 }
287
288 bool VBucket::getBGFetchItems(vb_bgfetch_queue_t &fetches) {
289     LockHolder lh(pendingBGFetchesLock);
290     fetches.insert(pendingBGFetches.begin(), pendingBGFetches.end());
291     pendingBGFetches.clear();
292     lh.unlock();
293     return fetches.size() > 0;
294 }
295
296 void VBucket::addHighPriorityVBEntry(uint64_t id, const void *cookie,
297                                      bool isBySeqno) {
298     LockHolder lh(hpChksMutex);
299     if (shard) {
300         ++shard->highPriorityCount;
301     }
302     hpChks.push_back(HighPriorityVBEntry(cookie, id, isBySeqno));
303     numHpChks.store(hpChks.size());
304 }
305
306 void VBucket::notifyOnPersistence(EventuallyPersistentEngine &e,
307                                   uint64_t idNum,
308                                   bool isBySeqno) {
309     LockHolder lh(hpChksMutex);
310     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
311     std::list<HighPriorityVBEntry>::iterator entry = hpChks.begin();
312
313     std::string logStr(isBySeqno
314                        ? "seqno persistence"
315                        : "checkpoint persistence");
316
317     while (entry != hpChks.end()) {
318         if (isBySeqno != entry->isBySeqno_) {
319             ++entry;
320             continue;
321         }
322
323         hrtime_t wall_time(gethrtime() - entry->start);
324         size_t spent = wall_time / 1000000000;
325         if (entry->id <= idNum) {
326             toNotify[entry->cookie] = ENGINE_SUCCESS;
327             stats.chkPersistenceHisto.add(wall_time / 1000);
328             adjustCheckpointFlushTimeout(wall_time / 1000000000);
329             LOG(EXTENSION_LOG_WARNING, "Notified the completion of %s "
330                 "for vbucket %" PRIu16 ", Check for: %" PRIu64 ", "
331                 "Persisted upto: %" PRIu64 ", cookie %p",
332                 logStr.c_str(), id, entry->id, idNum, entry->cookie);
333             entry = hpChks.erase(entry);
334             if (shard) {
335                 --shard->highPriorityCount;
336             }
337         } else if (spent > getCheckpointFlushTimeout()) {
338             adjustCheckpointFlushTimeout(spent);
339             e.storeEngineSpecific(entry->cookie, NULL);
340             toNotify[entry->cookie] = ENGINE_TMPFAIL;
341             LOG(EXTENSION_LOG_WARNING, "Notified the timeout on %s "
342                 "for vbucket %" PRIu16 ", Check for: %" PRIu64 ", "
343                 "Persisted upto: %" PRIu64 ", cookie %p",
344                 logStr.c_str(), id, entry->id, idNum, entry->cookie);
345             entry = hpChks.erase(entry);
346             if (shard) {
347                 --shard->highPriorityCount;
348             }
349         } else {
350             ++entry;
351         }
352     }
353     numHpChks.store(hpChks.size());
354     lh.unlock();
355
356     std::map<const void*, ENGINE_ERROR_CODE>::iterator itr = toNotify.begin();
357     for (; itr != toNotify.end(); ++itr) {
358         e.notifyIOComplete(itr->first, itr->second);
359     }
360
361 }
362
363 void VBucket::notifyAllPendingConnsFailed(EventuallyPersistentEngine &e) {
364     LockHolder lh(hpChksMutex);
365     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
366     std::list<HighPriorityVBEntry>::iterator entry = hpChks.begin();
367     while (entry != hpChks.end()) {
368         toNotify[entry->cookie] = ENGINE_TMPFAIL;
369         e.storeEngineSpecific(entry->cookie, NULL);
370         entry = hpChks.erase(entry);
371         if (shard) {
372             --shard->highPriorityCount;
373         }
374     }
375     lh.unlock();
376
377     std::map<const void*, ENGINE_ERROR_CODE>::iterator itr = toNotify.begin();
378     for (; itr != toNotify.end(); ++itr) {
379         e.notifyIOComplete(itr->first, itr->second);
380     }
381
382     fireAllOps(e);
383 }
384
385 void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
386     size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
387
388     if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
389         chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
390     } else if (wall_time <= middle) {
391         chkFlushTimeout = middle;
392     } else {
393         chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
394     }
395 }
396
397 size_t VBucket::getHighPriorityChkSize() {
398     return numHpChks.load();
399 }
400
401 size_t VBucket::getCheckpointFlushTimeout() {
402     return chkFlushTimeout;
403 }
404
405 size_t VBucket::getNumItems(item_eviction_policy_t policy) {
406     if (policy == VALUE_ONLY) {
407         return ht.getNumInMemoryItems();
408     } else {
409         return ht.getNumItems();
410     }
411 }
412
413 size_t VBucket::getNumNonResidentItems(item_eviction_policy_t policy) {
414     if (policy == VALUE_ONLY) {
415         return ht.getNumInMemoryNonResItems();
416     } else {
417         size_t num_items = ht.getNumItems();
418         size_t num_res_items = ht.getNumInMemoryItems() -
419                                ht.getNumInMemoryNonResItems();
420         return num_items > num_res_items ? (num_items - num_res_items) : 0;
421     }
422 }
423
424 void VBucket::addStats(bool details, ADD_STAT add_stat, const void *c,
425                        item_eviction_policy_t policy) {
426     addStat(NULL, toString(state), add_stat, c);
427     if (details) {
428         size_t numItems = getNumItems(policy);
429         size_t tempItems = getNumTempItems();
430         addStat("num_items", numItems, add_stat, c);
431         addStat("num_temp_items", tempItems, add_stat, c);
432         addStat("num_non_resident", getNumNonResidentItems(policy),
433                 add_stat, c);
434         addStat("ht_memory", ht.memorySize(), add_stat, c);
435         addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
436         addStat("ht_cache_size", ht.cacheSize, add_stat, c);
437         addStat("num_ejects", ht.getNumEjects(), add_stat, c);
438         addStat("ops_create", opsCreate, add_stat, c);
439         addStat("ops_update", opsUpdate, add_stat, c);
440         addStat("ops_delete", opsDelete, add_stat, c);
441         addStat("ops_reject", opsReject, add_stat, c);
442         addStat("queue_size", dirtyQueueSize, add_stat, c);
443         addStat("queue_memory", dirtyQueueMem, add_stat, c);
444         addStat("queue_fill", dirtyQueueFill, add_stat, c);
445         addStat("queue_drain", dirtyQueueDrain, add_stat, c);
446         addStat("queue_age", getQueueAge(), add_stat, c);
447         addStat("pending_writes", dirtyQueuePendingWrites, add_stat, c);
448         addStat("db_data_size", fileSpaceUsed, add_stat, c);
449         addStat("db_file_size", fileSize, add_stat, c);
450         addStat("high_seqno", getHighSeqno(), add_stat, c);
451         addStat("uuid", failovers->getLatestEntry().vb_uuid, add_stat, c);
452         addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
453     }
454 }