Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / couch-kvstore / couch-kvstore.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2012 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_COUCH_KVSTORE_COUCH_KVSTORE_H_
19 #define SRC_COUCH_KVSTORE_COUCH_KVSTORE_H_ 1
20
21 #include "config.h"
22 #include "libcouchstore/couch_db.h"
23 #include <relaxed_atomic.h>
24
25 #include <map>
26 #include <string>
27 #include <vector>
28
29 #include "configuration.h"
30 #include "couch-kvstore/couch-fs-stats.h"
31 #include "histo.h"
32 #include "item.h"
33 #include "kvstore.h"
34 #include "tasks.h"
35 #include "atomicqueue.h"
36
37 #define COUCHSTORE_NO_OPTIONS 0
38
39 /**
40  * Stats and timings for couchKVStore
41  */
42 class CouchKVStoreStats {
43
44 public:
45     /**
46      * Default constructor
47      */
48     CouchKVStoreStats() :
49       docsCommitted(0), numOpen(0), numClose(0),
50       numLoadedVb(0), numGetFailure(0), numSetFailure(0),
51       numDelFailure(0), numOpenFailure(0), numVbSetFailure(0),
52       io_num_read(0), io_num_write(0), io_read_bytes(0), io_write_bytes(0),
53       readSizeHisto(ExponentialGenerator<size_t>(1, 2), 25),
54       writeSizeHisto(ExponentialGenerator<size_t>(1, 2), 25) {
55     }
56
57     void reset() {
58         docsCommitted.store(0);
59         numOpen.store(0);
60         numClose.store(0);
61         numLoadedVb.store(0);
62         numGetFailure.store(0);
63         numSetFailure.store(0);
64         numDelFailure.store(0);
65         numOpenFailure.store(0);
66         numVbSetFailure.store(0);
67
68         readTimeHisto.reset();
69         readSizeHisto.reset();
70         writeTimeHisto.reset();
71         writeSizeHisto.reset();
72         delTimeHisto.reset();
73         compactHisto.reset();
74         commitHisto.reset();
75         saveDocsHisto.reset();
76         batchSize.reset();
77         fsStats.reset();
78     }
79
80     // the number of docs committed
81     AtomicValue<size_t> docsCommitted;
82     // the number of open() calls
83     AtomicValue<size_t> numOpen;
84     // the number of close() calls
85     AtomicValue<size_t> numClose;
86     // the number of vbuckets loaded
87     AtomicValue<size_t> numLoadedVb;
88
89     //stats tracking failures
90     AtomicValue<size_t> numGetFailure;
91     AtomicValue<size_t> numSetFailure;
92     AtomicValue<size_t> numDelFailure;
93     AtomicValue<size_t> numOpenFailure;
94     AtomicValue<size_t> numVbSetFailure;
95
96     //! Number of read related io operations
97     AtomicValue<size_t> io_num_read;
98     //! Number of write related io operations
99     AtomicValue<size_t> io_num_write;
100     //! Number of bytes read
101     AtomicValue<size_t> io_read_bytes;
102     //! Number of bytes written
103     AtomicValue<size_t> io_write_bytes;
104
105     /* for flush and vb delete, no error handling in CouchKVStore, such
106      * failure should be tracked in MC-engine  */
107
108     // How long it takes us to complete a read
109     Histogram<hrtime_t> readTimeHisto;
110     // How big are our reads?
111     Histogram<size_t> readSizeHisto;
112     // How long it takes us to complete a write
113     Histogram<hrtime_t> writeTimeHisto;
114     // How big are our writes?
115     Histogram<size_t> writeSizeHisto;
116     // Time spent in delete() calls.
117     Histogram<hrtime_t> delTimeHisto;
118     // Time spent in couchstore commit
119     Histogram<hrtime_t> commitHisto;
120     // Time spent in couchstore compaction
121     Histogram<hrtime_t> compactHisto;
122     // Time spent in couchstore save documents
123     Histogram<hrtime_t> saveDocsHisto;
124     // Batch size of saveDocs calls
125     Histogram<size_t> batchSize;
126
127     // Stats from the underlying OS file operations done by couchstore.
128     CouchstoreStats fsStats;
129 };
130
131 class EventuallyPersistentEngine;
132
133 typedef union {
134     Callback <mutation_result> *setCb;
135     Callback <int> *delCb;
136 } CouchRequestCallback;
137
138 const size_t CONFLICT_RES_META_LEN = 1;
139
140 // Additional 3 Bytes for flex meta, datatype and conflict resolution mode
141 const size_t COUCHSTORE_METADATA_SIZE(2 * sizeof(uint32_t) + sizeof(uint64_t) +
142                                       FLEX_DATA_OFFSET + EXT_META_LEN +
143                                       CONFLICT_RES_META_LEN);
144
145 /**
146  * Class representing a document to be persisted in couchstore.
147  */
148 class CouchRequest
149 {
150 public:
151     /**
152      * Constructor
153      *
154      * @param it Item instance to be persisted
155      * @param rev vbucket database revision number
156      * @param cb persistence callback
157      * @param del flag indicating if it is an item deletion or not
158      */
159     CouchRequest(const Item &it, uint64_t rev, CouchRequestCallback &cb, bool del);
160
161
162     virtual ~CouchRequest() {}
163
164     /**
165      * Get the vbucket id of a document to be persisted
166      *
167      * @return vbucket id of a document
168      */
169     uint16_t getVBucketId(void) {
170         return vbucketId;
171     }
172
173     /**
174      * Get the revision number of the vbucket database file
175      * where the document is persisted
176      *
177      * @return revision number of the corresponding vbucket database file
178      */
179     uint64_t getRevNum(void) {
180         return fileRevNum;
181     }
182
183     /**
184      * Get the couchstore Doc instance of a document to be persisted
185      *
186      * @return pointer to the couchstore Doc instance of a document
187      */
188     Doc *getDbDoc(void) {
189         if (deleteItem) {
190             return NULL;
191         } else {
192             return &dbDoc;
193         }
194     }
195
196     /**
197      * Get the couchstore DocInfo instance of a document to be persisted
198      *
199      * @return pointer to the couchstore DocInfo instance of a document
200      */
201     DocInfo *getDbDocInfo(void) {
202         return &dbDocInfo;
203     }
204
205     /**
206      * Get the callback instance for SET
207      *
208      * @return callback instance for SET
209      */
210     Callback<mutation_result> *getSetCallback(void) {
211         return callback.setCb;
212     }
213
214     /**
215      * Get the callback instance for DELETE
216      *
217      * @return callback instance for DELETE
218      */
219     Callback<int> *getDelCallback(void) {
220         return callback.delCb;
221     }
222
223     /**
224      * Get the time in ns elapsed since the creation of this instance
225      *
226      * @return time in ns elapsed since the creation of this instance
227      */
228     hrtime_t getDelta() {
229         return (gethrtime() - start) / 1000;
230     }
231
232     /**
233      * Get the length of a document body to be persisted
234      *
235      * @return length of a document body
236      */
237     size_t getNBytes() {
238         return dbDocInfo.rev_meta.size + dbDocInfo.size;
239     }
240
241     /**
242      * Return true if the document to be persisted is for DELETE
243      *
244      * @return true if the document to be persisted is for DELETE
245      */
246     bool isDelete() {
247         return deleteItem;
248     };
249
250     /**
251      * Get the key of a document to be persisted
252      *
253      * @return key of a document to be persisted
254      */
255     const std::string& getKey(void) const {
256         return key;
257     }
258
259 protected:
260     value_t value;
261     uint8_t meta[COUCHSTORE_METADATA_SIZE];
262     uint16_t vbucketId;
263     uint64_t fileRevNum;
264     std::string key;
265     Doc dbDoc;
266     DocInfo dbDocInfo;
267     bool deleteItem;
268     CouchRequestCallback callback;
269
270     hrtime_t start;
271 };
272
273 /**
274  * KVStore with couchstore as the underlying storage system
275  */
276 class CouchKVStore : public KVStore
277 {
278 public:
279     /**
280      * Constructor
281      *
282      * @param stats     Engine stats
283      * @param config    Configuration information
284      * @param read_only flag indicating if this kvstore instance is for read-only operations
285      */
286     CouchKVStore(Configuration &config, bool read_only = false);
287
288     /**
289      * Copy constructor
290      *
291      * @param from the source kvstore instance
292      */
293     CouchKVStore(const CouchKVStore &from);
294
295     /**
296      * Deconstructor
297      */
298     ~CouchKVStore();
299
300     void initialize();
301
302     /**
303      * Reset database to a clean state.
304      */
305     void reset(uint16_t vbucketId);
306
307     /**
308      * Begin a transaction (if not already in one).
309      *
310      * @return true if the transaction is started successfully
311      */
312     bool begin(void) {
313         cb_assert(!isReadOnly());
314         intransaction = true;
315         return intransaction;
316     }
317
318     /**
319      * Commit a transaction (unless not currently in one).
320      *
321      * @return true if the commit is completed successfully.
322      */
323     bool commit(Callback<kvstats_ctx> *cb, uint64_t snapStartSeqno,
324                 uint64_t snapEndSeqno, uint64_t maxCas,
325                 uint64_t driftCounter);
326
327     /**
328      * Rollback a transaction (unless not currently in one).
329      */
330     void rollback(void) {
331         cb_assert(!isReadOnly());
332         if (intransaction) {
333             intransaction = false;
334         }
335     }
336
337     /**
338      * Query the properties of the underlying storage.
339      *
340      * @return properties of the underlying storage system
341      */
342     StorageProperties getStorageProperties(void);
343
344     /**
345      * Insert or update a given document.
346      *
347      * @param itm instance representing the document to be inserted or updated
348      * @param cb callback instance for SET
349      */
350     void set(const Item &itm, Callback<mutation_result> &cb);
351
352     /**
353      * Retrieve the document with a given key from the underlying storage system.
354      *
355      * @param key the key of a document to be retrieved
356      * @param vb vbucket id of a document
357      * @param cb callback instance for GET
358      * @param fetchDelete True if we want to retrieve a deleted item if it not
359      *        purged yet.
360      */
361     void get(const std::string &key, uint16_t vb, Callback<GetValue> &cb,
362              bool fetchDelete = false);
363
364     void getWithHeader(void *dbHandle, const std::string &key,
365                        uint16_t vb, Callback<GetValue> &cb,
366                        bool fetchDelete = false);
367
368     /**
369      * Retrieve the multiple documents from the underlying storage system at once.
370      *
371      * @param vb vbucket id of a document
372      * @param itms list of items whose documents are going to be retrieved
373      */
374     void getMulti(uint16_t vb, vb_bgfetch_queue_t &itms);
375
376     /**
377      * Delete a given document from the underlying storage system.
378      *
379      * @param itm instance representing the document to be deleted
380      * @param cb callback instance for DELETE
381      */
382     void del(const Item &itm, Callback<int> &cb);
383
384     /**
385      * Delete a given vbucket database instance from the underlying storage system
386      *
387      * @param vbucket vbucket id
388      * @param recreate flag to re-create vbucket after deletion
389      */
390     void delVBucket(uint16_t vbucket);
391
392     /**
393      * Retrieve the list of persisted vbucket states
394      *
395      * @return vbucket state vector instance where key is vbucket id and
396      * value is vbucket state
397      */
398    std::vector<vbucket_state *>  listPersistedVbuckets(void);
399
400     /**
401      * Retrieve ths list of persisted engine stats
402      *
403      * @param stats map instance where the persisted engine stats will be added
404      */
405     void getPersistedStats(std::map<std::string, std::string> &stats);
406
407     /**
408      * Persist a snapshot of the engine stats in the underlying storage.
409      *
410      * @param engine_stats map instance that contains all the engine stats
411      * @return true if the snapshot is done successfully
412      */
413     bool snapshotStats(const std::map<std::string, std::string> &engine_stats);
414
415     /**
416      * Persist a snapshot of the vbucket states in the underlying storage system.
417      *
418      * @param vbucketId vbucket id
419      * @param vbstate vbucket state
420      * @param cb - call back for updating kv stats
421      * @return true if the snapshot is done successfully
422      */
423     bool snapshotVBucket(uint16_t vbucketId, vbucket_state &vbstate,
424                          Callback<kvstats_ctx> *cb);
425
426      /**
427      * Compact a vbucket in the underlying storage system.
428      *
429      * @param vbid   - which vbucket needs to be compacted
430      * @param hook_ctx - details of vbucket which needs to be compacted
431      * @param cb - callback to help process newly expired items
432      * @param kvcb - callback to update kvstore stats
433      * @return true if successful
434      */
435     bool compactVBucket(const uint16_t vbid, compaction_ctx *cookie,
436                         Callback<compaction_ctx> &cb,
437                         Callback<kvstats_ctx> &kvcb);
438
439     /**
440      * Does the underlying storage system support key-only retrieval operations?
441      *
442      * @return true if key-only retrieval is supported
443      */
444     bool isKeyDumpSupported() {
445         return true;
446     }
447
448     /**
449      * Get the number of deleted items that are persisted to a vbucket file
450      *
451      * @param vbid The vbucket if of the file to get the number of deletes for
452      */
453     size_t getNumPersistedDeletes(uint16_t vbid);
454
455     /**
456      * Get the vbucket pertaining stats from a vbucket database file
457      *
458      * @param vbid The vbucket of the file to get the number of docs for
459      */
460     DBFileInfo getDbFileInfo(uint16_t vbid);
461
462     /**
463      * Get the number of non-deleted items from a vbucket database file
464      *
465      * @param vbid The vbucket of the file to get the number of docs for
466      * @param min_seq The sequence number to start the count from
467      * @param max_seq The sequence number to stop the count at
468      */
469     size_t getNumItems(uint16_t vbid, uint64_t min_seq, uint64_t max_seq);
470
471     /**
472      * Do a rollback to the specified seqNo on the particular vbucket
473      *
474      * @param vbid The vbucket of the file that's to be rolled back
475      * @param rollbackSeqno The sequence number upto which the engine needs
476      * to be rolled back
477      * @param cb getvalue callback
478      */
479     RollbackResult rollback(uint16_t vbid, uint64_t rollbackSeqno,
480                             shared_ptr<RollbackCB> cb);
481
482     /**
483      * Perform the pre-optimizations before persisting dirty items
484      *
485      * @param items list of dirty items that can be pre-optimized
486      */
487     void optimizeWrites(std::vector<queued_item> &items);
488
489     /**
490      * Perform pending tasks after persisting dirty items
491      */
492     void pendingTasks();
493
494     /**
495      * Add all the kvstore stats to the stat response
496      *
497      * @param prefix stat name prefix
498      * @param add_stat upstream function that allows us to add a stat to the response
499      * @param cookie upstream connection cookie
500      */
501     void addStats(const std::string &prefix, ADD_STAT add_stat, const void *cookie);
502
503     /**
504      * Add all the kvstore timings stats to the stat response
505      *
506      * @param prefix stat name prefix
507      * @param add_stat upstream function that allows us to add a stat to the response
508      * @param cookie upstream connection cookie
509      */
510     void addTimingStats(const std::string &prefix, ADD_STAT add_stat,
511                         const void *c);
512
513     /**
514      * Resets couchstore stats
515      */
516     void resetStats() {
517         st.reset();
518     }
519
520     static int recordDbDump(Db *db, DocInfo *docinfo, void *ctx);
521     static int recordDbStat(Db *db, DocInfo *docinfo, void *ctx);
522     static int getMultiCb(Db *db, DocInfo *docinfo, void *ctx);
523     void readVBState(Db *db, uint16_t vbId);
524
525     couchstore_error_t fetchDoc(Db *db, DocInfo *docinfo,
526                                 GetValue &docValue, uint16_t vbId,
527                                 bool metaOnly, bool fetchDelete = false);
528     ENGINE_ERROR_CODE couchErr2EngineErr(couchstore_error_t errCode);
529
530     CouchKVStoreStats &getCKVStoreStat(void) { return st; }
531
532     uint64_t getLastPersistedSeqno(uint16_t vbid);
533
534     /**
535      * Get all_docs API, to return the list of all keys in the store
536      */
537     ENGINE_ERROR_CODE getAllKeys(uint16_t vbid, std::string &start_key,
538                                  uint32_t count, AllKeysCB *cb);
539
540     ScanContext* initScanContext(shared_ptr<Callback<GetValue> > cb,
541                                  shared_ptr<Callback<CacheLookup> > cl,
542                                  uint16_t vbid, uint64_t startSeqno,
543                                  bool keysOnly, bool noDeletes,
544                                  bool deletesOnly);
545
546     scan_error_t scan(ScanContext* sctx);
547
548     void destroyScanContext(ScanContext* ctx);
549
550     bool setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
551                          Callback<kvstats_ctx> *cb, bool reset=false);
552     bool resetVBucket(uint16_t vbucketId, vbucket_state &vbstate) {
553         cachedDocCount[vbucketId] = 0;
554         return setVBucketState(vbucketId, vbstate, NULL, true);
555     }
556
557     template <typename T>
558     void addStat(const std::string &prefix, const char *nm, T &val,
559                  ADD_STAT add_stat, const void *c);
560
561     void operator=(const CouchKVStore &from);
562
563     void open();
564     void close();
565     bool commit2couchstore(Callback<kvstats_ctx> *cb, uint64_t snapStartSeqno,
566                            uint64_t snapEndSeqno, uint64_t maxCas,
567                            uint64_t driftCounter);
568
569     uint64_t checkNewRevNum(std::string &dbname, bool newFile = false);
570     void populateFileNameMap(std::vector<std::string> &filenames,
571                              std::vector<uint16_t> *vbids);
572     void remVBucketFromDbFileMap(uint16_t vbucketId);
573     void updateDbFileMap(uint16_t vbucketId, uint64_t newFileRev);
574     couchstore_error_t openDB(uint16_t vbucketId, uint64_t fileRev, Db **db,
575                               uint64_t options, uint64_t *newFileRev = NULL,
576                               bool reset=false);
577     couchstore_error_t openDB_retry(std::string &dbfile, uint64_t options,
578                                     const couch_file_ops *ops,
579                                     Db **db, uint64_t *newFileRev);
580     couchstore_error_t saveDocs(uint16_t vbid, uint64_t rev, Doc **docs,
581                                 DocInfo **docinfos, size_t docCount,
582                                 kvstats_ctx &kvctx,
583                                 uint64_t snapStartSeqno,
584                                 uint64_t snapEndSeqno,
585                                 uint64_t maxCas,
586                                 uint64_t driftCounter);
587     void commitCallback(std::vector<CouchRequest *> &committedReqs,
588                         kvstats_ctx &kvctx,
589                         couchstore_error_t errCode);
590     couchstore_error_t saveVBState(Db *db, vbucket_state &vbState);
591     void setDocsCommitted(uint16_t docs);
592     void closeDatabaseHandle(Db *db);
593
594     /**
595      * Unlink selected couch file, which will be removed by the OS,
596      * once all its references close.
597      */
598     void unlinkCouchFile(uint16_t vbucket, uint64_t fRev);
599
600     /**
601      * Remove compact file
602      *
603      * @param dbname
604      * @param vbucket id
605      * @param current db rev number
606      */
607     void removeCompactFile(const std::string &dbname, uint16_t vbid,
608                            uint64_t currentRev);
609
610     void removeCompactFile(const std::string &filename);
611
612     Configuration &configuration;
613     const std::string dbname;
614
615     // Map of the fileRev for each vBucket. Using RelaxedAtomic so
616     // stats gathering (doDcpVbTakeoverStats) can get a snapshot
617     // without having to lock.
618     std::vector<Couchbase::RelaxedAtomic<uint64_t> > dbFileRevMap;
619
620     uint16_t numDbFiles;
621     std::vector<CouchRequest *> pendingReqsQ;
622     bool intransaction;
623
624     /* all stats */
625     CouchKVStoreStats   st;
626     couch_file_ops statCollectingFileOps;
627     /* vbucket state cache*/
628     std::vector<vbucket_state *> cachedVBStates;
629     /* deleted docs in each file*/
630     unordered_map<uint16_t, size_t> cachedDeleteCount;
631     /* non-deleted docs in each file */
632     unordered_map<uint16_t, size_t> cachedDocCount;
633     /* pending file deletions */
634     AtomicQueue<std::string> pendingFileDeletions;
635
636     AtomicValue<size_t> backfillCounter;
637     std::map<size_t, Db*> backfills;
638     Mutex backfillLock;
639 };
640
641 #endif  // SRC_COUCH_KVSTORE_COUCH_KVSTORE_H_