74d774a6b53fad9fbab3765f8ff7044008c6c41e
[ep-engine.git] / src / mutation_log.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #pragma once
19
20 /**
21  * 'Mutation' Log
22  *
23  * The MutationLog is used to maintain a log of mutations which have occurred
24  * in one or more vbuckets. It only records the additions or removals of keys,
25  * and then only the key of the item (no value).
26  *
27  * The original intent of this class was to record a log in parallel with the
28  * normal couchstore snapshots, see docs/klog.org, however this has not been
29  * used since MB-7590 (March 2013).
30  *
31  * The current use of MutationLog is for the access.log. This is a slightly
32  * different use-case - periodically (default daily) the AccessScanner walks
33  * each vBucket's HashTable and records the set of keys currently resident.
34  * This doesn't make use of the MutationLog's commit functionality - its simply
35  * a list of keys which were resident. When we later come to read the Access log
36  * during warmup there's no guarantee that the keys listed still exist - the
37  * contents of the Access log is essentially just a hint / suggestion.
38  *
39  */
40
41 #include "config.h"
42
43 #include "mutation_log_entry.h"
44
45 #include <array>
46 #include <cstring>
47 #include <memory>
48 #include <string>
49 #include <unordered_map>
50 #include <vector>
51
52 #include <atomic>
53 #include <platform/histogram.h>
54 #include "utility.h"
55
56 #define ML_BUFLEN (128 * 1024 * 1024)
57
58 #ifdef WIN32
59 typedef HANDLE file_handle_t;
60 #define INVALID_FILE_VALUE INVALID_HANDLE_VALUE
61 #else
62 typedef int file_handle_t;
63 #define INVALID_FILE_VALUE -1
64 #endif
65
66
67 const size_t MAX_LOG_SIZE((size_t)(unsigned int)-1);
68 const size_t MAX_ENTRY_RATIO(10);
69 const size_t LOG_COMPACTOR_QUEUE_CAP(500000);
70 const int MUTATION_LOG_COMPACTOR_FREQ(3600);
71
72 const size_t MIN_LOG_HEADER_SIZE(4096);
73 const size_t HEADER_RESERVED(4);
74
75 enum class MutationLogVersion { V1 = 1, V2 = 2, Current = V2 };
76
77 const size_t LOG_ENTRY_BUF_SIZE(512);
78
79 const uint8_t SYNC_COMMIT_1(1);
80 const uint8_t SYNC_COMMIT_2(2);
81 const uint8_t SYNC_FULL(SYNC_COMMIT_1 | SYNC_COMMIT_2);
82 const uint8_t FLUSH_COMMIT_1(4);
83 const uint8_t FLUSH_COMMIT_2(8);
84 const uint8_t FLUSH_FULL(FLUSH_COMMIT_1 | FLUSH_COMMIT_2);
85
86 const uint8_t DEFAULT_SYNC_CONF(FLUSH_COMMIT_2 | SYNC_COMMIT_2);
87
88 /**
89  * The header block representing the first 4k (or so) of a MutationLog
90  * file.
91  */
92 class LogHeaderBlock {
93 public:
94     LogHeaderBlock(MutationLogVersion version = MutationLogVersion::Current)
95         : _version(htonl(int(version))),
96           _blockSize(0),
97           _blockCount(0),
98           _rdwr(1) {
99     }
100
101     void set(uint32_t bs, uint32_t bc=1) {
102         _blockSize = htonl(bs);
103         _blockCount = htonl(bc);
104     }
105
106     void set(const std::array<uint8_t, MIN_LOG_HEADER_SIZE>& buf) {
107         int offset(0);
108         memcpy(&_version, buf.data() + offset, sizeof(_version));
109         offset += sizeof(_version);
110         memcpy(&_blockSize, buf.data() + offset, sizeof(_blockSize));
111         offset += sizeof(_blockSize);
112         memcpy(&_blockCount, buf.data() + offset, sizeof(_blockCount));
113         offset += sizeof(_blockCount);
114         memcpy(&_rdwr, buf.data() + offset, sizeof(_rdwr));
115     }
116
117     MutationLogVersion version() const {
118         return MutationLogVersion(ntohl(_version));
119     }
120
121     uint32_t blockSize() const {
122         return ntohl(_blockSize);
123     }
124
125     uint32_t blockCount() const {
126         return ntohl(_blockCount);
127     }
128
129     uint32_t rdwr() const {
130         return ntohl(_rdwr);
131     }
132
133     void setRdwr(uint32_t nval) {
134         _rdwr = htonl(nval);
135     }
136
137 private:
138
139     uint32_t _version;
140     uint32_t _blockSize;
141     uint32_t _blockCount;
142     uint32_t _rdwr;
143 };
144
145 /**
146  * Mutation log compactor config that is used to control the scheduling of
147  * the log compactor
148  */
149 class MutationLogCompactorConfig {
150 public:
151     MutationLogCompactorConfig() :
152         maxLogSize(MAX_LOG_SIZE), maxEntryRatio(MAX_ENTRY_RATIO),
153         queueCap(LOG_COMPACTOR_QUEUE_CAP),
154         sleepTime(MUTATION_LOG_COMPACTOR_FREQ) { }
155
156     MutationLogCompactorConfig(size_t max_log_size,
157                                size_t max_entry_ratio,
158                                size_t queue_cap,
159                                size_t stime) :
160         maxLogSize(max_log_size), maxEntryRatio(max_entry_ratio),
161         queueCap(queue_cap), sleepTime(stime) { }
162
163     void setMaxLogSize(size_t max_log_size) {
164         maxLogSize = max_log_size;
165     }
166
167     size_t getMaxLogSize() const {
168         return maxLogSize;
169     }
170
171     void setMaxEntryRatio(size_t max_entry_ratio) {
172         maxEntryRatio = max_entry_ratio;
173     }
174
175     size_t getMaxEntryRatio() const {
176         return maxEntryRatio;
177     }
178
179     void setQueueCap(size_t queue_cap) {
180         queueCap = queue_cap;
181     }
182
183     size_t getQueueCap() const {
184         return queueCap;
185     }
186
187     void setSleepTime(size_t stime) {
188         sleepTime = stime;
189     }
190
191     size_t getSleepTime() const {
192         return sleepTime;
193     }
194
195 private:
196     size_t maxLogSize;
197     size_t maxEntryRatio;
198     size_t queueCap;
199     size_t sleepTime;
200 };
201
202 /**
203  * The MutationLog records major key events to allow ep-engine to more
204  * quickly restore the server to its previous state upon restart.
205  */
206 class MutationLog {
207 public:
208     MutationLog(const std::string& path, const size_t bs = MIN_LOG_HEADER_SIZE);
209
210     ~MutationLog();
211
212     void newItem(uint16_t vbucket, const DocKey& key);
213
214     void commit1();
215
216     void commit2();
217
218     bool flush();
219
220     void sync();
221
222     void disable();
223
224     bool isEnabled() const {
225         return !disabled;
226     }
227
228     bool isOpen() const {
229         return file != INVALID_FILE_VALUE;
230     }
231
232     LogHeaderBlock header() const {
233         return headerBlock;
234     }
235
236     void setSyncConfig(uint8_t sconf) {
237         syncConfig = sconf;
238     }
239
240     uint8_t getSyncConfig() const {
241         return syncConfig & SYNC_FULL;
242     }
243
244     uint8_t getFlushConfig() const {
245         return syncConfig & FLUSH_FULL;
246     }
247
248     size_t getBlockSize() const {
249         return blockSize;
250     }
251
252     bool exists() const;
253
254     const std::string &getLogFile() const { return logPath; }
255
256     /**
257      * Open and initialize the log.
258      *
259      * This typically happens automatically.
260      */
261     void open(bool _readOnly = false);
262
263     /**
264      * Close the log file.
265      */
266     void close();
267
268     /**
269      * Reset the log.
270      */
271     bool reset();
272
273     /**
274      * Replace the current log with a given log.
275      */
276     bool replaceWith(MutationLog &mlog);
277
278     bool setSyncConfig(const std::string &s);
279     bool setFlushConfig(const std::string &s);
280
281     /**
282      * Reset the item type counts to the given values.
283      *
284      * This is used by the loader as part of initialization.
285      */
286     void resetCounts(size_t *);
287
288     /**
289      * Exception thrown upon failure to write a mutation log.
290      */
291     class WriteException : public std::runtime_error {
292     public:
293         WriteException(const std::string &s) : std::runtime_error(s) {}
294     };
295
296     /**
297      * Exception thrown upon failure to read a mutation log.
298      */
299     class ReadException : public std::runtime_error {
300     public:
301         ReadException(const std::string &s) : std::runtime_error(s) {}
302     };
303
304     class FileNotFoundException : public ReadException {
305     public:
306         FileNotFoundException(const std::string &s) : ReadException(s) {}
307     };
308
309     /**
310      * Exception thrown when a CRC mismatch is read in the log.
311      */
312     class CRCReadException : public ReadException {
313     public:
314         CRCReadException() : ReadException("CRC Mismatch") {}
315     };
316
317     /**
318      * Exception thrown when a short read occurred.
319      */
320     class ShortReadException : public ReadException {
321     public:
322         ShortReadException() : ReadException("Short Read") {}
323     };
324
325     /**
326      * The MutationLog::iterator will return MutationLogEntryHolder objects
327      * which handle resource destruction if necessary. In some cases the entry
328      * being read is a temporary heap allocation which will need deleting.
329      * Sometimes the entry is owned by the iterator and the iterator will
330      * sort the deletion.
331      */
332     class MutationLogEntryHolder {
333     public:
334         /**
335          * @param _mle A pointer to a buffer which contains a MutationLogEntry
336          * @param _destroye Set to true if the _mle buffer nust be deleted once
337          *        the holder's life is complete.
338          */
339         MutationLogEntryHolder(const uint8_t* _mle, bool _destroy)
340             : mle(_mle), destroy(_destroy) {
341         }
342
343         MutationLogEntryHolder(MutationLogEntryHolder&& rhs)
344             : mle(rhs.mle), destroy(rhs.destroy) {
345             rhs.mle = nullptr;
346         }
347
348         MutationLogEntryHolder(const MutationLogEntryHolder& rhs) = delete;
349
350         /**
351          * Destructor will delete the mle data only if we're told to by the
352          * constructing code
353          */
354         ~MutationLogEntryHolder() {
355             if (destroy) {
356                 delete[] mle;
357             }
358         }
359
360         const MutationLogEntry* operator->() const {
361             return reinterpret_cast<const MutationLogEntry*>(mle);
362         }
363
364     private:
365         const uint8_t* mle;
366         bool destroy;
367     };
368
369     /**
370      * An iterator for the mutation log.
371      *
372      * A ReadException may be thrown at any point along iteration.
373      */
374     class iterator  : public std::iterator<std::input_iterator_tag,
375                                            const MutationLogEntry> {
376     public:
377
378         iterator(const iterator& mit);
379
380         iterator& operator=(const iterator& other);
381
382         ~iterator();
383
384         iterator& operator++();
385
386         bool operator==(const iterator& rhs) const;
387
388         bool operator!=(const iterator& rhs) const;
389
390         MutationLogEntryHolder operator*();
391
392     private:
393
394         friend class MutationLog;
395
396         iterator(const MutationLog* l, bool e=false);
397
398         /// @returns the length of the entry the iterator is currently at
399         size_t getCurrentEntryLen() const;
400         void nextBlock();
401         size_t bufferBytesRemaining();
402         void prepItem();
403
404         /**
405          * Upgrades the entry the iterator is currently at and returns it
406          * via a MutationLogEntryHolder
407          */
408         MutationLogEntryHolder upgradeEntry() const;
409
410         const MutationLog* log;
411         std::vector<uint8_t> entryBuf;
412         std::vector<uint8_t> buf;
413         std::vector<uint8_t>::const_iterator p;
414         off_t              offset;
415         uint16_t           items;
416         bool               isEnd;
417     };
418
419     /**
420      * An iterator pointing to the beginning of the log file.
421      */
422     iterator begin() {
423         iterator it(iterator(this));
424         it.nextBlock();
425         return it;
426     }
427
428     /**
429      * An iterator pointing at the end of the log file.
430      */
431     iterator end() {
432         return iterator(this, true);
433     }
434
435     //! Items logged by type.
436     std::atomic<size_t> itemsLogged[int(MutationLogType::NumberOfTypes)];
437     //! Histogram of block padding sizes.
438     Histogram<uint32_t> paddingHisto;
439     //! Flush time histogram.
440     Histogram<hrtime_t> flushTimeHisto;
441     //! Sync time histogram.
442     Histogram<hrtime_t> syncTimeHisto;
443     //! Size of the log
444     std::atomic<size_t> logSize;
445
446 private:
447     void needWriteAccess(void) {
448         if (readOnly) {
449             throw WriteException("Invalid access (file opened read only)");
450         }
451     }
452     void writeEntry(MutationLogEntry *mle);
453
454     bool writeInitialBlock();
455     void readInitialBlock();
456     void updateInitialBlock(void);
457
458     bool prepareWrites();
459
460     file_handle_t fd() const { return file; }
461
462     LogHeaderBlock     headerBlock;
463     const std::string  logPath;
464     size_t             blockSize;
465     size_t             blockPos;
466     file_handle_t      file;
467     bool               disabled;
468     uint16_t           entries;
469     std::unique_ptr<uint8_t[]> entryBuffer;
470     std::unique_ptr<uint8_t[]> blockBuffer;
471     uint8_t            syncConfig;
472     bool               readOnly;
473
474     DISALLOW_COPY_AND_ASSIGN(MutationLog);
475 };
476
477 /// @cond DETAILS
478
479 //! rowid, (uint8_t)mutation_log_type_t
480 typedef std::pair<uint64_t, uint8_t> mutation_log_event_t;
481
482 /// @endcond
483
484 /**
485  * MutationLogHarvester::apply callback type.
486  */
487 typedef bool (*mlCallback)(void*, uint16_t, const DocKey&);
488 typedef bool (*mlCallbackWithQueue)(uint16_t,
489                                     const std::set<StoredDocKey>&,
490                                     void *arg);
491
492 /**
493  * Type for mutation log leftovers.
494  */
495 struct mutation_log_uncommitted_t {
496     StoredDocKey        key;
497     uint64_t            rowid;
498     MutationLogType     type;
499     uint16_t            vbucket;
500 };
501
502 class EventuallyPersistentEngine;
503
504 /**
505  * Read log entries back from the log to reconstruct the state.
506  */
507 class MutationLogHarvester {
508 public:
509     MutationLogHarvester(MutationLog &ml, EventuallyPersistentEngine *e = NULL) :
510         mlog(ml), engine(e)
511     {
512         memset(itemsSeen, 0, sizeof(itemsSeen));
513     }
514
515     /**
516      * Set a vbucket before loading.
517      */
518     void setVBucket(uint16_t vb) {
519         vbid_set.insert(vb);
520     }
521
522     /**
523      * Load the entries from the file.
524      *
525      * @return true if the file was clean and can likely be trusted.
526      */
527     bool load();
528
529     /**
530      * Load a batch of entries from the file, starting from the given iterator.
531      * Loaded entries are inserted into `committed`, which is cleared at the
532      * start of each call.
533      *
534      * @param start Iterator of where to start loading from.
535      * @param limit Limit of now many entries should be loaded. Zero means no
536      *              limit.
537      * @return iterator of where to resume in the log (if the end was not
538      *         reached), or MutationLog::iterator::end().
539      */
540     MutationLog::iterator loadBatch(const MutationLog::iterator& start,
541                                         size_t limit);
542
543     /**
544      * Apply the processed log entries through the given function.
545      */
546     void apply(void *arg, mlCallback mlc);
547     void apply(void *arg, mlCallbackWithQueue mlc);
548
549     /**
550      * Get the total number of entries found in the log.
551      */
552     size_t total();
553
554     /**
555      * Get all of the counts of log entries by type.
556      */
557     size_t *getItemsSeen() {
558         return itemsSeen;
559     }
560
561 private:
562
563     MutationLog &mlog;
564     EventuallyPersistentEngine *engine;
565     std::set<uint16_t> vbid_set;
566
567     std::unordered_map<uint16_t, std::set<StoredDocKey>> committed;
568     std::unordered_map<uint16_t, std::set<StoredDocKey>> loading;
569     size_t itemsSeen[int(MutationLogType::NumberOfTypes)];
570 };