MB-19359: [1] Address lock inversion with vb's state lock and snapshot lock
[ep-engine.git] / src / vbucket.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <functional>
21 #include <list>
22 #include <set>
23 #include <string>
24
25 #include "ep_engine.h"
26 #include "failover-table.h"
27 #define STATWRITER_NAMESPACE vbucket
28 #include "statwriter.h"
29 #undef STATWRITER_NAMESPACE
30 #include "vbucket.h"
31
32 VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
33     std::vector<uint16_t> tmp(acceptable.size() + other.size());
34     std::vector<uint16_t>::iterator end;
35     end = std::set_symmetric_difference(acceptable.begin(),
36                                         acceptable.end(),
37                                         other.acceptable.begin(),
38                                         other.acceptable.end(),
39                                         tmp.begin());
40     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
41 }
42
43 VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
44                                                                         const {
45     std::vector<uint16_t> tmp(acceptable.size() + other.size());
46     std::vector<uint16_t>::iterator end;
47
48     end = std::set_intersection(acceptable.begin(), acceptable.end(),
49                                 other.acceptable.begin(),
50                                 other.acceptable.end(),
51                                 tmp.begin());
52     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
53 }
54
55 static bool isRange(std::set<uint16_t>::const_iterator it,
56                     const std::set<uint16_t>::const_iterator &end,
57                     size_t &length)
58 {
59     length = 0;
60     for (uint16_t val = *it;
61          it != end && (val + length) == *it;
62          ++it, ++length) {
63         // empty
64     }
65
66     --length;
67
68     return length > 1;
69 }
70
71 std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
72 {
73     std::set<uint16_t>::const_iterator it;
74
75     if (filter.acceptable.empty()) {
76         out << "{ empty }";
77     } else {
78         bool needcomma = false;
79         out << "{ ";
80         for (it = filter.acceptable.begin();
81              it != filter.acceptable.end();
82              ++it) {
83             if (needcomma) {
84                 out << ", ";
85             }
86
87             size_t length;
88             if (isRange(it, filter.acceptable.end(), length)) {
89                 std::set<uint16_t>::iterator last = it;
90                 for (size_t i = 0; i < length; ++i) {
91                     ++last;
92                 }
93                 out << "[" << *it << "," << *last << "]";
94                 it = last;
95             } else {
96                 out << *it;
97             }
98             needcomma = true;
99         }
100         out << " }";
101     }
102
103     return out;
104 }
105
106 size_t VBucket::chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
107
108 const vbucket_state_t VBucket::ACTIVE =
109                      static_cast<vbucket_state_t>(htonl(vbucket_state_active));
110 const vbucket_state_t VBucket::REPLICA =
111                     static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
112 const vbucket_state_t VBucket::PENDING =
113                     static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
114 const vbucket_state_t VBucket::DEAD =
115                     static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
116
117 VBucket::~VBucket() {
118     if (!pendingOps.empty() || !pendingBGFetches.empty()) {
119         LOG(EXTENSION_LOG_WARNING,
120             "Have %ld pending ops and %ld pending reads "
121             "while destroying vbucket\n",
122             pendingOps.size(), pendingBGFetches.size());
123     }
124
125     stats.decrDiskQueueSize(dirtyQueueSize.load());
126
127     size_t num_pending_fetches = 0;
128     vb_bgfetch_queue_t::iterator itr = pendingBGFetches.begin();
129     for (; itr != pendingBGFetches.end(); ++itr) {
130         std::list<VBucketBGFetchItem *> &bgitems = itr->second;
131         std::list<VBucketBGFetchItem *>::iterator vit = bgitems.begin();
132         for (; vit != bgitems.end(); ++vit) {
133             delete (*vit);
134             ++num_pending_fetches;
135         }
136     }
137     stats.numRemainingBgJobs.fetch_sub(num_pending_fetches);
138     pendingBGFetches.clear();
139     delete failovers;
140
141     stats.memOverhead.fetch_sub(sizeof(VBucket) + ht.memorySize() + sizeof(CheckpointManager));
142     cb_assert(stats.memOverhead.load() < GIGANTOR);
143
144     LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
145 }
146
147 void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
148                          ENGINE_ERROR_CODE code) {
149     if (pendingOpsStart > 0) {
150         hrtime_t now = gethrtime();
151         if (now > pendingOpsStart) {
152             hrtime_t d = (now - pendingOpsStart) / 1000;
153             stats.pendingOpsHisto.add(d);
154             atomic_setIfBigger(stats.pendingOpsMaxDuration, d);
155         }
156     } else {
157         return;
158     }
159
160     pendingOpsStart = 0;
161     stats.pendingOps.fetch_sub(pendingOps.size());
162     atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
163
164     engine.notifyIOComplete(pendingOps, code);
165     pendingOps.clear();
166
167     LOG(EXTENSION_LOG_INFO,
168         "Fired pendings ops for vbucket %d in state %s\n",
169         id, VBucket::toString(state));
170 }
171
172 void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
173     LockHolder lh(pendingOpLock);
174
175     if (state == vbucket_state_active) {
176         fireAllOps(engine, ENGINE_SUCCESS);
177     } else if (state == vbucket_state_pending) {
178         // Nothing
179     } else {
180         fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
181     }
182 }
183
184 void VBucket::setState(vbucket_state_t to, SERVER_HANDLE_V1 *sapi) {
185     cb_assert(sapi);
186
187     vbucket_state_t oldstate;
188     {
189         WriterLockHolder wlh(stateLock);
190         oldstate = state;
191
192         if (to == vbucket_state_active &&
193             checkpointManager.getOpenCheckpointId() < 2) {
194             checkpointManager.setOpenCheckpointId(2);
195         }
196
197         LOG(EXTENSION_LOG_DEBUG, "transitioning vbucket %d from %s to %s",
198             id, VBucket::toString(oldstate), VBucket::toString(to));
199
200         state = to;
201     }
202
203     if (oldstate == vbucket_state_active) {
204         uint64_t highSeqno = (uint64_t)checkpointManager.getHighSeqno();
205         setCurrentSnapshot(highSeqno, highSeqno);
206     }
207 }
208
209 void VBucket::doStatsForQueueing(Item& qi, size_t itemBytes)
210 {
211     ++dirtyQueueSize;
212     dirtyQueueMem.fetch_add(sizeof(Item));
213     ++dirtyQueueFill;
214     dirtyQueueAge.fetch_add(qi.getQueuedTime());
215     dirtyQueuePendingWrites.fetch_add(itemBytes);
216 }
217
218 void VBucket::doStatsForFlushing(Item& qi, size_t itemBytes)
219 {
220     decrDirtyQueueSize(1);
221     if (dirtyQueueMem > sizeof(Item)) {
222         dirtyQueueMem.fetch_sub(sizeof(Item));
223     } else {
224         dirtyQueueMem.store(0);
225     }
226     ++dirtyQueueDrain;
227
228     if (dirtyQueueAge > qi.getQueuedTime()) {
229         dirtyQueueAge.fetch_sub(qi.getQueuedTime());
230     } else {
231         dirtyQueueAge.store(0);
232     }
233
234     if (dirtyQueuePendingWrites > itemBytes) {
235         dirtyQueuePendingWrites.fetch_sub(itemBytes);
236     } else {
237         dirtyQueuePendingWrites.store(0);
238     }
239 }
240
241 void VBucket::incrMetaDataDisk(Item& qi)
242 {
243     metaDataDisk.fetch_add(qi.getNKey() + sizeof(ItemMetaData));
244 }
245
246 void VBucket::decrMetaDataDisk(Item& qi)
247 {
248     // assume couchstore remove approx this much data from disk
249     metaDataDisk.fetch_sub((qi.getNKey() + sizeof(ItemMetaData)));
250 }
251
252 void VBucket::resetStats() {
253     opsCreate.store(0);
254     opsUpdate.store(0);
255     opsDelete.store(0);
256     opsReject.store(0);
257
258     stats.decrDiskQueueSize(dirtyQueueSize.load());
259     dirtyQueueSize.store(0);
260     dirtyQueueMem.store(0);
261     dirtyQueueFill.store(0);
262     dirtyQueueAge.store(0);
263     dirtyQueuePendingWrites.store(0);
264     dirtyQueueDrain.store(0);
265     fileSpaceUsed = 0;
266     fileSize = 0;
267 }
268
269 template <typename T>
270 void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
271                       const void *c) {
272     std::stringstream name;
273     name << "vb_" << id;
274     if (nm != NULL) {
275         name << ":" << nm;
276     }
277     std::stringstream value;
278     value << val;
279     std::string n = name.str();
280     add_casted_stat(n.data(), value.str().data(), add_stat, c);
281 }
282
283 void VBucket::queueBGFetchItem(const std::string &key,
284                                VBucketBGFetchItem *fetch,
285                                BgFetcher *bgFetcher) {
286     LockHolder lh(pendingBGFetchesLock);
287     pendingBGFetches[key].push_back(fetch);
288     bgFetcher->addPendingVB(id);
289     lh.unlock();
290 }
291
292 bool VBucket::getBGFetchItems(vb_bgfetch_queue_t &fetches) {
293     LockHolder lh(pendingBGFetchesLock);
294     fetches.insert(pendingBGFetches.begin(), pendingBGFetches.end());
295     pendingBGFetches.clear();
296     lh.unlock();
297     return fetches.size() > 0;
298 }
299
300 void VBucket::addHighPriorityVBEntry(uint64_t id, const void *cookie,
301                                      bool isBySeqno) {
302     LockHolder lh(hpChksMutex);
303     if (shard) {
304         ++shard->highPriorityCount;
305     }
306     hpChks.push_back(HighPriorityVBEntry(cookie, id, isBySeqno));
307     numHpChks.store(hpChks.size());
308 }
309
310 void VBucket::notifyOnPersistence(EventuallyPersistentEngine &e,
311                                   uint64_t idNum,
312                                   bool isBySeqno) {
313     LockHolder lh(hpChksMutex);
314     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
315     std::list<HighPriorityVBEntry>::iterator entry = hpChks.begin();
316
317     std::string logStr(isBySeqno
318                        ? "seqno persistence"
319                        : "checkpoint persistence");
320
321     while (entry != hpChks.end()) {
322         if (isBySeqno != entry->isBySeqno_) {
323             ++entry;
324             continue;
325         }
326
327         hrtime_t wall_time(gethrtime() - entry->start);
328         size_t spent = wall_time / 1000000000;
329         if (entry->id <= idNum) {
330             toNotify[entry->cookie] = ENGINE_SUCCESS;
331             stats.chkPersistenceHisto.add(wall_time / 1000);
332             adjustCheckpointFlushTimeout(wall_time / 1000000000);
333             LOG(EXTENSION_LOG_WARNING, "Notified the completion of %s "
334                 "for vbucket %" PRIu16 ", Check for: %" PRIu64 ", "
335                 "Persisted upto: %" PRIu64 ", cookie %p",
336                 logStr.c_str(), id, entry->id, idNum, entry->cookie);
337             entry = hpChks.erase(entry);
338             if (shard) {
339                 --shard->highPriorityCount;
340             }
341         } else if (spent > getCheckpointFlushTimeout()) {
342             adjustCheckpointFlushTimeout(spent);
343             e.storeEngineSpecific(entry->cookie, NULL);
344             toNotify[entry->cookie] = ENGINE_TMPFAIL;
345             LOG(EXTENSION_LOG_WARNING, "Notified the timeout on %s "
346                 "for vbucket %" PRIu16 ", Check for: %" PRIu64 ", "
347                 "Persisted upto: %" PRIu64 ", cookie %p",
348                 logStr.c_str(), id, entry->id, idNum, entry->cookie);
349             entry = hpChks.erase(entry);
350             if (shard) {
351                 --shard->highPriorityCount;
352             }
353         } else {
354             ++entry;
355         }
356     }
357     numHpChks.store(hpChks.size());
358     lh.unlock();
359
360     std::map<const void*, ENGINE_ERROR_CODE>::iterator itr = toNotify.begin();
361     for (; itr != toNotify.end(); ++itr) {
362         e.notifyIOComplete(itr->first, itr->second);
363     }
364
365 }
366
367 void VBucket::notifyAllPendingConnsFailed(EventuallyPersistentEngine &e) {
368     LockHolder lh(hpChksMutex);
369     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
370     std::list<HighPriorityVBEntry>::iterator entry = hpChks.begin();
371     while (entry != hpChks.end()) {
372         toNotify[entry->cookie] = ENGINE_TMPFAIL;
373         e.storeEngineSpecific(entry->cookie, NULL);
374         entry = hpChks.erase(entry);
375         if (shard) {
376             --shard->highPriorityCount;
377         }
378     }
379     lh.unlock();
380
381     std::map<const void*, ENGINE_ERROR_CODE>::iterator itr = toNotify.begin();
382     for (; itr != toNotify.end(); ++itr) {
383         e.notifyIOComplete(itr->first, itr->second);
384     }
385
386     fireAllOps(e);
387 }
388
389 void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
390     size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
391
392     if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
393         chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
394     } else if (wall_time <= middle) {
395         chkFlushTimeout = middle;
396     } else {
397         chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
398     }
399 }
400
401 size_t VBucket::getHighPriorityChkSize() {
402     return numHpChks.load();
403 }
404
405 size_t VBucket::getCheckpointFlushTimeout() {
406     return chkFlushTimeout;
407 }
408
409 size_t VBucket::getNumItems(item_eviction_policy_t policy) {
410     if (policy == VALUE_ONLY) {
411         return ht.getNumInMemoryItems();
412     } else {
413         return ht.getNumItems();
414     }
415 }
416
417 size_t VBucket::getNumNonResidentItems(item_eviction_policy_t policy) {
418     if (policy == VALUE_ONLY) {
419         return ht.getNumInMemoryNonResItems();
420     } else {
421         size_t num_items = ht.getNumItems();
422         size_t num_res_items = ht.getNumInMemoryItems() -
423                                ht.getNumInMemoryNonResItems();
424         return num_items > num_res_items ? (num_items - num_res_items) : 0;
425     }
426 }
427
428 void VBucket::addStats(bool details, ADD_STAT add_stat, const void *c,
429                        item_eviction_policy_t policy) {
430     addStat(NULL, toString(state), add_stat, c);
431     if (details) {
432         size_t numItems = getNumItems(policy);
433         size_t tempItems = getNumTempItems();
434         addStat("num_items", numItems, add_stat, c);
435         addStat("num_temp_items", tempItems, add_stat, c);
436         addStat("num_non_resident", getNumNonResidentItems(policy),
437                 add_stat, c);
438         addStat("ht_memory", ht.memorySize(), add_stat, c);
439         addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
440         addStat("ht_cache_size", ht.cacheSize, add_stat, c);
441         addStat("num_ejects", ht.getNumEjects(), add_stat, c);
442         addStat("ops_create", opsCreate, add_stat, c);
443         addStat("ops_update", opsUpdate, add_stat, c);
444         addStat("ops_delete", opsDelete, add_stat, c);
445         addStat("ops_reject", opsReject, add_stat, c);
446         addStat("queue_size", dirtyQueueSize, add_stat, c);
447         addStat("queue_memory", dirtyQueueMem, add_stat, c);
448         addStat("queue_fill", dirtyQueueFill, add_stat, c);
449         addStat("queue_drain", dirtyQueueDrain, add_stat, c);
450         addStat("queue_age", getQueueAge(), add_stat, c);
451         addStat("pending_writes", dirtyQueuePendingWrites, add_stat, c);
452         addStat("db_data_size", fileSpaceUsed, add_stat, c);
453         addStat("db_file_size", fileSize, add_stat, c);
454         addStat("high_seqno", getHighSeqno(), add_stat, c);
455         addStat("uuid", failovers->getLatestEntry().vb_uuid, add_stat, c);
456         addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
457     }
458 }