1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2015 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
23 * The MutationLog is used to maintain a log of mutations which have occurred
24 * in one or more vbuckets. It only records the additions or removals of keys,
25 * and then only the key of the item (no value).
27 * The original intent of this class was to record a log in parallel with the
28 * normal couchstore snapshots, see docs/klog.org, however this has not been
29 * used since MB-7590 (March 2013).
31 * The current use of MutationLog is for the access.log. This is a slightly
32 * different use-case - periodically (default daily) the AccessScanner walks
33 * each vBucket's HashTable and records the set of keys currently resident.
34 * This doesn't make use of the MutationLog's commit functionality - its simply
35 * a list of keys which were resident. When we later come to read the Access log
36 * during warmup there's no guarantee that the keys listed still exist - the
37 * contents of the Access log is essentially just a hint / suggestion.
43 #include "mutation_log_entry.h"
50 #include <unordered_map>
54 #include <platform/histogram.h>
57 #define ML_BUFLEN (128 * 1024 * 1024)
60 typedef HANDLE file_handle_t;
61 #define INVALID_FILE_VALUE INVALID_HANDLE_VALUE
63 typedef int file_handle_t;
64 #define INVALID_FILE_VALUE -1
68 const size_t MAX_LOG_SIZE((size_t)(unsigned int)-1);
69 const size_t MAX_ENTRY_RATIO(10);
70 const size_t LOG_COMPACTOR_QUEUE_CAP(500000);
71 const int MUTATION_LOG_COMPACTOR_FREQ(3600);
73 const size_t MIN_LOG_HEADER_SIZE(4096);
74 const size_t HEADER_RESERVED(4);
76 enum class MutationLogVersion { V1 = 1, V2 = 2, Current = V2 };
78 const size_t LOG_ENTRY_BUF_SIZE(512);
80 const uint8_t SYNC_COMMIT_1(1);
81 const uint8_t SYNC_COMMIT_2(2);
82 const uint8_t SYNC_FULL(SYNC_COMMIT_1 | SYNC_COMMIT_2);
83 const uint8_t FLUSH_COMMIT_1(4);
84 const uint8_t FLUSH_COMMIT_2(8);
85 const uint8_t FLUSH_FULL(FLUSH_COMMIT_1 | FLUSH_COMMIT_2);
87 const uint8_t DEFAULT_SYNC_CONF(FLUSH_COMMIT_2 | SYNC_COMMIT_2);
90 * The header block representing the first 4k (or so) of a MutationLog
93 class LogHeaderBlock {
95 LogHeaderBlock(MutationLogVersion version = MutationLogVersion::Current)
96 : _version(htonl(int(version))),
102 void set(uint32_t bs, uint32_t bc=1) {
103 _blockSize = htonl(bs);
104 _blockCount = htonl(bc);
107 void set(const std::array<uint8_t, MIN_LOG_HEADER_SIZE>& buf) {
109 memcpy(&_version, buf.data() + offset, sizeof(_version));
110 offset += sizeof(_version);
111 memcpy(&_blockSize, buf.data() + offset, sizeof(_blockSize));
112 offset += sizeof(_blockSize);
113 memcpy(&_blockCount, buf.data() + offset, sizeof(_blockCount));
114 offset += sizeof(_blockCount);
115 memcpy(&_rdwr, buf.data() + offset, sizeof(_rdwr));
118 MutationLogVersion version() const {
119 return MutationLogVersion(ntohl(_version));
122 uint32_t blockSize() const {
123 return ntohl(_blockSize);
126 uint32_t blockCount() const {
127 return ntohl(_blockCount);
130 uint32_t rdwr() const {
134 void setRdwr(uint32_t nval) {
142 uint32_t _blockCount;
147 * Mutation log compactor config that is used to control the scheduling of
150 class MutationLogCompactorConfig {
152 MutationLogCompactorConfig() :
153 maxLogSize(MAX_LOG_SIZE), maxEntryRatio(MAX_ENTRY_RATIO),
154 queueCap(LOG_COMPACTOR_QUEUE_CAP),
155 sleepTime(MUTATION_LOG_COMPACTOR_FREQ) { }
157 MutationLogCompactorConfig(size_t max_log_size,
158 size_t max_entry_ratio,
161 maxLogSize(max_log_size), maxEntryRatio(max_entry_ratio),
162 queueCap(queue_cap), sleepTime(stime) { }
164 void setMaxLogSize(size_t max_log_size) {
165 maxLogSize = max_log_size;
168 size_t getMaxLogSize() const {
172 void setMaxEntryRatio(size_t max_entry_ratio) {
173 maxEntryRatio = max_entry_ratio;
176 size_t getMaxEntryRatio() const {
177 return maxEntryRatio;
180 void setQueueCap(size_t queue_cap) {
181 queueCap = queue_cap;
184 size_t getQueueCap() const {
188 void setSleepTime(size_t stime) {
192 size_t getSleepTime() const {
198 size_t maxEntryRatio;
204 * The MutationLog records major key events to allow ep-engine to more
205 * quickly restore the server to its previous state upon restart.
209 MutationLog(const std::string& path, const size_t bs = MIN_LOG_HEADER_SIZE);
213 void newItem(uint16_t vbucket, const DocKey& key);
225 bool isEnabled() const {
229 bool isOpen() const {
230 return file != INVALID_FILE_VALUE;
233 LogHeaderBlock header() const {
237 void setSyncConfig(uint8_t sconf) {
241 uint8_t getSyncConfig() const {
242 return syncConfig & SYNC_FULL;
245 uint8_t getFlushConfig() const {
246 return syncConfig & FLUSH_FULL;
249 size_t getBlockSize() const {
255 const std::string &getLogFile() const { return logPath; }
258 * Open and initialize the log.
260 * This typically happens automatically.
262 void open(bool _readOnly = false);
265 * Close the log file.
275 * Replace the current log with a given log.
277 bool replaceWith(MutationLog &mlog);
279 bool setSyncConfig(const std::string &s);
280 bool setFlushConfig(const std::string &s);
283 * Reset the item type counts to the given values.
285 * This is used by the loader as part of initialization.
287 void resetCounts(size_t *);
290 * Exception thrown upon failure to write a mutation log.
292 class WriteException : public std::runtime_error {
294 WriteException(const std::string &s) : std::runtime_error(s) {}
298 * Exception thrown upon failure to read a mutation log.
300 class ReadException : public std::runtime_error {
302 ReadException(const std::string &s) : std::runtime_error(s) {}
305 class FileNotFoundException : public ReadException {
307 FileNotFoundException(const std::string &s) : ReadException(s) {}
311 * Exception thrown when a CRC mismatch is read in the log.
313 class CRCReadException : public ReadException {
315 CRCReadException() : ReadException("CRC Mismatch") {}
319 * Exception thrown when a short read occurred.
321 class ShortReadException : public ReadException {
323 ShortReadException() : ReadException("Short Read") {}
327 * The MutationLog::iterator will return MutationLogEntryHolder objects
328 * which handle resource destruction if necessary. In some cases the entry
329 * being read is a temporary heap allocation which will need deleting.
330 * Sometimes the entry is owned by the iterator and the iterator will
333 class MutationLogEntryHolder {
336 * @param _mle A pointer to a buffer which contains a MutationLogEntry
337 * @param _destroye Set to true if the _mle buffer nust be deleted once
338 * the holder's life is complete.
340 MutationLogEntryHolder(const uint8_t* _mle, bool _destroy)
341 : mle(_mle), destroy(_destroy) {
344 MutationLogEntryHolder(MutationLogEntryHolder&& rhs)
345 : mle(rhs.mle), destroy(rhs.destroy) {
349 MutationLogEntryHolder(const MutationLogEntryHolder& rhs) = delete;
352 * Destructor will delete the mle data only if we're told to by the
355 ~MutationLogEntryHolder() {
361 const MutationLogEntry* operator->() const {
362 return reinterpret_cast<const MutationLogEntry*>(mle);
371 * An iterator for the mutation log.
373 * A ReadException may be thrown at any point along iteration.
375 class iterator : public std::iterator<std::input_iterator_tag,
376 const MutationLogEntry> {
379 iterator(const iterator& mit);
381 iterator& operator=(const iterator& other);
385 iterator& operator++();
387 bool operator==(const iterator& rhs) const;
389 bool operator!=(const iterator& rhs) const;
391 MutationLogEntryHolder operator*();
395 friend class MutationLog;
397 iterator(const MutationLog* l, bool e=false);
399 /// @returns the length of the entry the iterator is currently at
400 size_t getCurrentEntryLen() const;
402 size_t bufferBytesRemaining();
406 * Upgrades the entry the iterator is currently at and returns it
407 * via a MutationLogEntryHolder
409 MutationLogEntryHolder upgradeEntry() const;
411 const MutationLog* log;
412 std::vector<uint8_t> entryBuf;
413 std::vector<uint8_t> buf;
414 std::vector<uint8_t>::const_iterator p;
421 * An iterator pointing to the beginning of the log file.
424 iterator it(iterator(this));
430 * An iterator pointing at the end of the log file.
433 return iterator(this, true);
436 //! Items logged by type.
437 std::atomic<size_t> itemsLogged[int(MutationLogType::NumberOfTypes)];
438 //! Histogram of block padding sizes.
439 Histogram<uint32_t> paddingHisto;
440 //! Flush time histogram.
441 Histogram<hrtime_t> flushTimeHisto;
442 //! Sync time histogram.
443 Histogram<hrtime_t> syncTimeHisto;
445 std::atomic<size_t> logSize;
448 void needWriteAccess(void) {
450 throw WriteException("Invalid access (file opened read only)");
453 void writeEntry(MutationLogEntry *mle);
455 bool writeInitialBlock();
456 void readInitialBlock();
457 void updateInitialBlock(void);
459 bool prepareWrites();
461 file_handle_t fd() const { return file; }
463 LogHeaderBlock headerBlock;
464 const std::string logPath;
470 std::unique_ptr<uint8_t[]> entryBuffer;
471 std::unique_ptr<uint8_t[]> blockBuffer;
475 DISALLOW_COPY_AND_ASSIGN(MutationLog);
480 //! rowid, (uint8_t)mutation_log_type_t
481 typedef std::pair<uint64_t, uint8_t> mutation_log_event_t;
486 * MutationLogHarvester::apply callback type.
488 typedef bool (*mlCallback)(void*, uint16_t, const DocKey&);
489 typedef bool (*mlCallbackWithQueue)(uint16_t,
490 const std::set<StoredDocKey>&,
494 * Type for mutation log leftovers.
496 struct mutation_log_uncommitted_t {
499 MutationLogType type;
503 class EventuallyPersistentEngine;
506 * Read log entries back from the log to reconstruct the state.
508 class MutationLogHarvester {
510 MutationLogHarvester(MutationLog &ml, EventuallyPersistentEngine *e = NULL) :
513 memset(itemsSeen, 0, sizeof(itemsSeen));
517 * Set a vbucket before loading.
519 void setVBucket(uint16_t vb) {
524 * Load the entries from the file.
526 * @return true if the file was clean and can likely be trusted.
531 * Load a batch of entries from the file, starting from the given iterator.
532 * Loaded entries are inserted into `committed`, which is cleared at the
533 * start of each call.
535 * @param start Iterator of where to start loading from.
536 * @param limit Limit of now many entries should be loaded. Zero means no
538 * @return iterator of where to resume in the log (if the end was not
539 * reached), or MutationLog::iterator::end().
541 MutationLog::iterator loadBatch(const MutationLog::iterator& start,
545 * Apply the processed log entries through the given function.
547 void apply(void *arg, mlCallback mlc);
548 void apply(void *arg, mlCallbackWithQueue mlc);
551 * Get the total number of entries found in the log.
556 * Get all of the counts of log entries by type.
558 size_t *getItemsSeen() {
565 EventuallyPersistentEngine *engine;
566 std::set<uint16_t> vbid_set;
568 std::unordered_map<uint16_t, std::set<StoredDocKey>> committed;
569 std::unordered_map<uint16_t, std::set<StoredDocKey>> loading;
570 size_t itemsSeen[int(MutationLogType::NumberOfTypes)];