Streamline configuration.h
[ep-engine.git] / src / mutation_log.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #pragma once
19
20 /**
21  * 'Mutation' Log
22  *
23  * The MutationLog is used to maintain a log of mutations which have occurred
24  * in one or more vbuckets. It only records the additions or removals of keys,
25  * and then only the key of the item (no value).
26  *
27  * The original intent of this class was to record a log in parallel with the
28  * normal couchstore snapshots, see docs/klog.org, however this has not been
29  * used since MB-7590 (March 2013).
30  *
31  * The current use of MutationLog is for the access.log. This is a slightly
32  * different use-case - periodically (default daily) the AccessScanner walks
33  * each vBucket's HashTable and records the set of keys currently resident.
34  * This doesn't make use of the MutationLog's commit functionality - its simply
35  * a list of keys which were resident. When we later come to read the Access log
36  * during warmup there's no guarantee that the keys listed still exist - the
37  * contents of the Access log is essentially just a hint / suggestion.
38  *
39  */
40
41 #include "config.h"
42
43 #include "mutation_log_entry.h"
44
45 #include <array>
46 #include <cstring>
47 #include <memory>
48 #include <set>
49 #include <string>
50 #include <unordered_map>
51 #include <vector>
52
53 #include <atomic>
54 #include <platform/histogram.h>
55 #include "utility.h"
56
57 #define ML_BUFLEN (128 * 1024 * 1024)
58
59 #ifdef WIN32
60 typedef HANDLE file_handle_t;
61 #define INVALID_FILE_VALUE INVALID_HANDLE_VALUE
62 #else
63 typedef int file_handle_t;
64 #define INVALID_FILE_VALUE -1
65 #endif
66
67
68 const size_t MAX_LOG_SIZE((size_t)(unsigned int)-1);
69 const size_t MAX_ENTRY_RATIO(10);
70 const size_t LOG_COMPACTOR_QUEUE_CAP(500000);
71 const int MUTATION_LOG_COMPACTOR_FREQ(3600);
72
73 const size_t MIN_LOG_HEADER_SIZE(4096);
74 const size_t HEADER_RESERVED(4);
75
76 enum class MutationLogVersion { V1 = 1, V2 = 2, Current = V2 };
77
78 const size_t LOG_ENTRY_BUF_SIZE(512);
79
80 const uint8_t SYNC_COMMIT_1(1);
81 const uint8_t SYNC_COMMIT_2(2);
82 const uint8_t SYNC_FULL(SYNC_COMMIT_1 | SYNC_COMMIT_2);
83 const uint8_t FLUSH_COMMIT_1(4);
84 const uint8_t FLUSH_COMMIT_2(8);
85 const uint8_t FLUSH_FULL(FLUSH_COMMIT_1 | FLUSH_COMMIT_2);
86
87 const uint8_t DEFAULT_SYNC_CONF(FLUSH_COMMIT_2 | SYNC_COMMIT_2);
88
89 /**
90  * The header block representing the first 4k (or so) of a MutationLog
91  * file.
92  */
93 class LogHeaderBlock {
94 public:
95     LogHeaderBlock(MutationLogVersion version = MutationLogVersion::Current)
96         : _version(htonl(int(version))),
97           _blockSize(0),
98           _blockCount(0),
99           _rdwr(1) {
100     }
101
102     void set(uint32_t bs, uint32_t bc=1) {
103         _blockSize = htonl(bs);
104         _blockCount = htonl(bc);
105     }
106
107     void set(const std::array<uint8_t, MIN_LOG_HEADER_SIZE>& buf) {
108         int offset(0);
109         memcpy(&_version, buf.data() + offset, sizeof(_version));
110         offset += sizeof(_version);
111         memcpy(&_blockSize, buf.data() + offset, sizeof(_blockSize));
112         offset += sizeof(_blockSize);
113         memcpy(&_blockCount, buf.data() + offset, sizeof(_blockCount));
114         offset += sizeof(_blockCount);
115         memcpy(&_rdwr, buf.data() + offset, sizeof(_rdwr));
116     }
117
118     MutationLogVersion version() const {
119         return MutationLogVersion(ntohl(_version));
120     }
121
122     uint32_t blockSize() const {
123         return ntohl(_blockSize);
124     }
125
126     uint32_t blockCount() const {
127         return ntohl(_blockCount);
128     }
129
130     uint32_t rdwr() const {
131         return ntohl(_rdwr);
132     }
133
134     void setRdwr(uint32_t nval) {
135         _rdwr = htonl(nval);
136     }
137
138 private:
139
140     uint32_t _version;
141     uint32_t _blockSize;
142     uint32_t _blockCount;
143     uint32_t _rdwr;
144 };
145
146 /**
147  * Mutation log compactor config that is used to control the scheduling of
148  * the log compactor
149  */
150 class MutationLogCompactorConfig {
151 public:
152     MutationLogCompactorConfig() :
153         maxLogSize(MAX_LOG_SIZE), maxEntryRatio(MAX_ENTRY_RATIO),
154         queueCap(LOG_COMPACTOR_QUEUE_CAP),
155         sleepTime(MUTATION_LOG_COMPACTOR_FREQ) { }
156
157     MutationLogCompactorConfig(size_t max_log_size,
158                                size_t max_entry_ratio,
159                                size_t queue_cap,
160                                size_t stime) :
161         maxLogSize(max_log_size), maxEntryRatio(max_entry_ratio),
162         queueCap(queue_cap), sleepTime(stime) { }
163
164     void setMaxLogSize(size_t max_log_size) {
165         maxLogSize = max_log_size;
166     }
167
168     size_t getMaxLogSize() const {
169         return maxLogSize;
170     }
171
172     void setMaxEntryRatio(size_t max_entry_ratio) {
173         maxEntryRatio = max_entry_ratio;
174     }
175
176     size_t getMaxEntryRatio() const {
177         return maxEntryRatio;
178     }
179
180     void setQueueCap(size_t queue_cap) {
181         queueCap = queue_cap;
182     }
183
184     size_t getQueueCap() const {
185         return queueCap;
186     }
187
188     void setSleepTime(size_t stime) {
189         sleepTime = stime;
190     }
191
192     size_t getSleepTime() const {
193         return sleepTime;
194     }
195
196 private:
197     size_t maxLogSize;
198     size_t maxEntryRatio;
199     size_t queueCap;
200     size_t sleepTime;
201 };
202
203 /**
204  * The MutationLog records major key events to allow ep-engine to more
205  * quickly restore the server to its previous state upon restart.
206  */
207 class MutationLog {
208 public:
209     MutationLog(const std::string& path, const size_t bs = MIN_LOG_HEADER_SIZE);
210
211     ~MutationLog();
212
213     void newItem(uint16_t vbucket, const DocKey& key);
214
215     void commit1();
216
217     void commit2();
218
219     bool flush();
220
221     void sync();
222
223     void disable();
224
225     bool isEnabled() const {
226         return !disabled;
227     }
228
229     bool isOpen() const {
230         return file != INVALID_FILE_VALUE;
231     }
232
233     LogHeaderBlock header() const {
234         return headerBlock;
235     }
236
237     void setSyncConfig(uint8_t sconf) {
238         syncConfig = sconf;
239     }
240
241     uint8_t getSyncConfig() const {
242         return syncConfig & SYNC_FULL;
243     }
244
245     uint8_t getFlushConfig() const {
246         return syncConfig & FLUSH_FULL;
247     }
248
249     size_t getBlockSize() const {
250         return blockSize;
251     }
252
253     bool exists() const;
254
255     const std::string &getLogFile() const { return logPath; }
256
257     /**
258      * Open and initialize the log.
259      *
260      * This typically happens automatically.
261      */
262     void open(bool _readOnly = false);
263
264     /**
265      * Close the log file.
266      */
267     void close();
268
269     /**
270      * Reset the log.
271      */
272     bool reset();
273
274     /**
275      * Replace the current log with a given log.
276      */
277     bool replaceWith(MutationLog &mlog);
278
279     bool setSyncConfig(const std::string &s);
280     bool setFlushConfig(const std::string &s);
281
282     /**
283      * Reset the item type counts to the given values.
284      *
285      * This is used by the loader as part of initialization.
286      */
287     void resetCounts(size_t *);
288
289     /**
290      * Exception thrown upon failure to write a mutation log.
291      */
292     class WriteException : public std::runtime_error {
293     public:
294         WriteException(const std::string &s) : std::runtime_error(s) {}
295     };
296
297     /**
298      * Exception thrown upon failure to read a mutation log.
299      */
300     class ReadException : public std::runtime_error {
301     public:
302         ReadException(const std::string &s) : std::runtime_error(s) {}
303     };
304
305     class FileNotFoundException : public ReadException {
306     public:
307         FileNotFoundException(const std::string &s) : ReadException(s) {}
308     };
309
310     /**
311      * Exception thrown when a CRC mismatch is read in the log.
312      */
313     class CRCReadException : public ReadException {
314     public:
315         CRCReadException() : ReadException("CRC Mismatch") {}
316     };
317
318     /**
319      * Exception thrown when a short read occurred.
320      */
321     class ShortReadException : public ReadException {
322     public:
323         ShortReadException() : ReadException("Short Read") {}
324     };
325
326     /**
327      * The MutationLog::iterator will return MutationLogEntryHolder objects
328      * which handle resource destruction if necessary. In some cases the entry
329      * being read is a temporary heap allocation which will need deleting.
330      * Sometimes the entry is owned by the iterator and the iterator will
331      * sort the deletion.
332      */
333     class MutationLogEntryHolder {
334     public:
335         /**
336          * @param _mle A pointer to a buffer which contains a MutationLogEntry
337          * @param _destroye Set to true if the _mle buffer nust be deleted once
338          *        the holder's life is complete.
339          */
340         MutationLogEntryHolder(const uint8_t* _mle, bool _destroy)
341             : mle(_mle), destroy(_destroy) {
342         }
343
344         MutationLogEntryHolder(MutationLogEntryHolder&& rhs)
345             : mle(rhs.mle), destroy(rhs.destroy) {
346             rhs.mle = nullptr;
347         }
348
349         MutationLogEntryHolder(const MutationLogEntryHolder& rhs) = delete;
350
351         /**
352          * Destructor will delete the mle data only if we're told to by the
353          * constructing code
354          */
355         ~MutationLogEntryHolder() {
356             if (destroy) {
357                 delete[] mle;
358             }
359         }
360
361         const MutationLogEntry* operator->() const {
362             return reinterpret_cast<const MutationLogEntry*>(mle);
363         }
364
365     private:
366         const uint8_t* mle;
367         bool destroy;
368     };
369
370     /**
371      * An iterator for the mutation log.
372      *
373      * A ReadException may be thrown at any point along iteration.
374      */
375     class iterator  : public std::iterator<std::input_iterator_tag,
376                                            const MutationLogEntry> {
377     public:
378
379         iterator(const iterator& mit);
380
381         iterator& operator=(const iterator& other);
382
383         ~iterator();
384
385         iterator& operator++();
386
387         bool operator==(const iterator& rhs) const;
388
389         bool operator!=(const iterator& rhs) const;
390
391         MutationLogEntryHolder operator*();
392
393     private:
394
395         friend class MutationLog;
396
397         iterator(const MutationLog* l, bool e=false);
398
399         /// @returns the length of the entry the iterator is currently at
400         size_t getCurrentEntryLen() const;
401         void nextBlock();
402         size_t bufferBytesRemaining();
403         void prepItem();
404
405         /**
406          * Upgrades the entry the iterator is currently at and returns it
407          * via a MutationLogEntryHolder
408          */
409         MutationLogEntryHolder upgradeEntry() const;
410
411         const MutationLog* log;
412         std::vector<uint8_t> entryBuf;
413         std::vector<uint8_t> buf;
414         std::vector<uint8_t>::const_iterator p;
415         off_t              offset;
416         uint16_t           items;
417         bool               isEnd;
418     };
419
420     /**
421      * An iterator pointing to the beginning of the log file.
422      */
423     iterator begin() {
424         iterator it(iterator(this));
425         it.nextBlock();
426         return it;
427     }
428
429     /**
430      * An iterator pointing at the end of the log file.
431      */
432     iterator end() {
433         return iterator(this, true);
434     }
435
436     //! Items logged by type.
437     std::atomic<size_t> itemsLogged[int(MutationLogType::NumberOfTypes)];
438     //! Histogram of block padding sizes.
439     Histogram<uint32_t> paddingHisto;
440     //! Flush time histogram.
441     Histogram<hrtime_t> flushTimeHisto;
442     //! Sync time histogram.
443     Histogram<hrtime_t> syncTimeHisto;
444     //! Size of the log
445     std::atomic<size_t> logSize;
446
447 private:
448     void needWriteAccess(void) {
449         if (readOnly) {
450             throw WriteException("Invalid access (file opened read only)");
451         }
452     }
453     void writeEntry(MutationLogEntry *mle);
454
455     bool writeInitialBlock();
456     void readInitialBlock();
457     void updateInitialBlock(void);
458
459     bool prepareWrites();
460
461     file_handle_t fd() const { return file; }
462
463     LogHeaderBlock     headerBlock;
464     const std::string  logPath;
465     size_t             blockSize;
466     size_t             blockPos;
467     file_handle_t      file;
468     bool               disabled;
469     uint16_t           entries;
470     std::unique_ptr<uint8_t[]> entryBuffer;
471     std::unique_ptr<uint8_t[]> blockBuffer;
472     uint8_t            syncConfig;
473     bool               readOnly;
474
475     DISALLOW_COPY_AND_ASSIGN(MutationLog);
476 };
477
478 /// @cond DETAILS
479
480 //! rowid, (uint8_t)mutation_log_type_t
481 typedef std::pair<uint64_t, uint8_t> mutation_log_event_t;
482
483 /// @endcond
484
485 /**
486  * MutationLogHarvester::apply callback type.
487  */
488 typedef bool (*mlCallback)(void*, uint16_t, const DocKey&);
489 typedef bool (*mlCallbackWithQueue)(uint16_t,
490                                     const std::set<StoredDocKey>&,
491                                     void *arg);
492
493 /**
494  * Type for mutation log leftovers.
495  */
496 struct mutation_log_uncommitted_t {
497     StoredDocKey        key;
498     uint64_t            rowid;
499     MutationLogType     type;
500     uint16_t            vbucket;
501 };
502
503 class EventuallyPersistentEngine;
504
505 /**
506  * Read log entries back from the log to reconstruct the state.
507  */
508 class MutationLogHarvester {
509 public:
510     MutationLogHarvester(MutationLog &ml, EventuallyPersistentEngine *e = NULL) :
511         mlog(ml), engine(e)
512     {
513         memset(itemsSeen, 0, sizeof(itemsSeen));
514     }
515
516     /**
517      * Set a vbucket before loading.
518      */
519     void setVBucket(uint16_t vb) {
520         vbid_set.insert(vb);
521     }
522
523     /**
524      * Load the entries from the file.
525      *
526      * @return true if the file was clean and can likely be trusted.
527      */
528     bool load();
529
530     /**
531      * Load a batch of entries from the file, starting from the given iterator.
532      * Loaded entries are inserted into `committed`, which is cleared at the
533      * start of each call.
534      *
535      * @param start Iterator of where to start loading from.
536      * @param limit Limit of now many entries should be loaded. Zero means no
537      *              limit.
538      * @return iterator of where to resume in the log (if the end was not
539      *         reached), or MutationLog::iterator::end().
540      */
541     MutationLog::iterator loadBatch(const MutationLog::iterator& start,
542                                         size_t limit);
543
544     /**
545      * Apply the processed log entries through the given function.
546      */
547     void apply(void *arg, mlCallback mlc);
548     void apply(void *arg, mlCallbackWithQueue mlc);
549
550     /**
551      * Get the total number of entries found in the log.
552      */
553     size_t total();
554
555     /**
556      * Get all of the counts of log entries by type.
557      */
558     size_t *getItemsSeen() {
559         return itemsSeen;
560     }
561
562 private:
563
564     MutationLog &mlog;
565     EventuallyPersistentEngine *engine;
566     std::set<uint16_t> vbid_set;
567
568     std::unordered_map<uint16_t, std::set<StoredDocKey>> committed;
569     std::unordered_map<uint16_t, std::set<StoredDocKey>> loading;
570     size_t itemsSeen[int(MutationLogType::NumberOfTypes)];
571 };